flink 两阶段的提交

作者 chauncy 日期 2020-05-16
flink 两阶段的提交

流程

flink 1.4 以前都是一直在宣传自己是能够保证 Exactly-Once,但是这里的exactly once 的语义是指的它自己的内部的一个保证,即是输入数据(source)然后计算(operator),最后输出(sink) 这个内部是不会有重复的计算或者漏掉数据没有计算的,直到1.4过后有了end-to-end 的一致性保证

我们通过例子来看这怎么保证end-to-end 的exactly once

一个 从kafka consumer 数据的 data source
通过一个窗口,进行一个聚合
最后的数据sink到 kafka

要想完成准确的一次处理,需要外部系统支持 事务的提交 回滚操作


当数据在开始的时候job manager 会注入一个 barrier ,然后跟着数据流进行传递到

当每一个节点收到数据barrier 过后,他会存储当前算子的state,当我们的sink 算子收到barrier 的时候它会把数据进行send(producer flush) 到kafka 里面去,这也是就是 2PC量阶段提交的pre commit 阶段

当所有的 算子都已经处理完inject barrier 的时候,job manager 会调用 每个算在的 CheckpointLister 的
notifyCheckpointComplete 方法,在sink算子里面在这个方法会进行 commit 提交( kafka producer commitTransactin)

简单的进行一个总结,然后后面对这几个情况进行源代码进行分析

  1. beginTransaction -我们可以创建一个临时目录,用户存放我们的数据
  2. preCommit -我们在这个阶段吧数据进行flush 到我们创建的目录里面,然后也不会在去更改他
  3. commit-我们原子性的将pre-commit file 到我们的目的目录下面
  4. abort-如果失败了,我们将会进行delte 我们之前创建的目录

源码分析

public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
protected final LinkedHashMap<Long, TransactionHolder<TXN>> pendingCommitTransactions = new LinkedHashMap<>();
protected transient Optional<CONTEXT> userContext;
protected transient ListState<State<TXN, CONTEXT>> state;
private final Clock clock;
private TransactionHolder<TXN> currentTransactionHolder;
private long transactionTimeout = Long.MAX_VALUE;
private boolean ignoreFailuresAfterTransactionTimeout;
private double transactionTimeoutWarningRatio = -1;
protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;
/**
* Method that starts a new transaction.
*
* @return newly created transaction.
*/
protected abstract TXN beginTransaction() throws Exception;
/**
* Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the
* transaction for a commit that might happen in the future. After this point the transaction might still be
* aborted, but underlying implementation must ensure that commit calls on already pre committed transactions
* will always succeed.
*
* <p>Usually implementation involves flushing the data.
*/
protected abstract void preCommit(TXN transaction) throws Exception;
/**
* Commit a pre-committed transaction. If this method fail, Flink application will be
* restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
* same transaction.
*/
protected abstract void commit(TXN transaction);
/**
* Invoked on recovered transactions after a failure. User implementation must ensure that this call will eventually
* succeed. If it fails, Flink application will be restarted and it will be invoked again. If it does not succeed
* eventually, a data loss will occur. Transactions will be recovered in an order in which they were created.
*/
protected void recoverAndCommit(TXN transaction) {
commit(transaction);
}
/**
* Abort a transaction.
*/
protected abstract void abort(TXN transaction);
/**
* Abort a transaction that was rejected by a coordinator after a failure.
*/
protected void recoverAndAbort(TXN transaction) {
abort(transaction);
}
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
firstError);
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// this is like the pre-commit of a 2-phase-commit transaction
// we are ready to commit and remember the transaction
checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
LOG.debug("{} - checkpoint {} triggered, flushing transaction '{}'", name(), context.getCheckpointId(), currentTransactionHolder);
preCommit(currentTransactionHolder.handle);
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
LOG.debug("{} - stored pending transactions {}", name(), pendingCommitTransactions);
currentTransactionHolder = beginTransactionInternal();
LOG.debug("{} - started new transaction '{}'", name(), currentTransactionHolder);
state.clear();
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
state = context.getOperatorStateStore().getListState(stateDescriptor);
boolean recoveredUserContext = false;
if (context.isRestored()) {
LOG.info("{} - restoring state", name());
for (State<TXN, CONTEXT> operatorState : state.get()) {
userContext = operatorState.getContext();
List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();
for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
// If this fails to succeed eventually, there is actually data loss
recoverAndCommitInternal(recoveredTransaction);
LOG.info("{} committed recovered transaction {}", name(), recoveredTransaction);
}
recoverAndAbort(operatorState.getPendingTransaction().handle);
LOG.info("{} aborted recovered transaction {}", name(), operatorState.getPendingTransaction());
if (userContext.isPresent()) {
finishRecoveringContext();
recoveredUserContext = true;
}
}
}
// if in restore we didn't get any userContext or we are initializing from scratch
if (!recoveredUserContext) {
LOG.info("{} - no state to restore", name());
userContext = initializeUserContext();
}
this.pendingCommitTransactions.clear();
currentTransactionHolder = beginTransactionInternal();
}
}

上面抽取了TwoPhaseCommitSinkFunction 核心的结构,也基本就是我们上面设计的模型

  • beginTransaction
  • preCommit
  • commit
  • abort
    这些方法需啊哟在子类自行实现,在下面kafka 里面会看一下他的具体实现
    当然为了能够进行容错,管理恢复我们需要实现一些中间状态的管理
    在任务进行恢复的时候,进行一个状态的初始化,以及状态的恢复
public void initializeState(FunctionInitializationContext context) throws Exception {
//获取operator 的状态
state = context.getOperatorStateStore().getListState(stateDescriptor);
boolean recoveredUserContext = false;
if (context.isRestored()) {
for (State<TXN, CONTEXT> operatorState : state.get()) {
userContext = operatorState.getContext();
//从operator 状态中获取,需要commit 的transaction
List<TransactionHolder<TXN>> recoveredTransactions = operatorState.getPendingCommitTransactions();
for (TransactionHolder<TXN> recoveredTransaction : recoveredTransactions) {
// 这里会调用 子类的commit 实现方法,进行tnx 进行commit,里面还有一个检查,如果超时会打印日志
recoverAndCommitInternal(recoveredTransaction);
}
//在operator 里面会 pending tnx 这一部分tnx 是不能进行提交的,因为他们进入per commit 阶段,其实简单的理解,只要进入了完成了precommit 阶段,就会进入commit 最后
recoverAndAbort(operatorState.getPendingTransaction().handle);
if (userContext.isPresent()) {
finishRecoveringContext();
recoveredUserContext = true;
}
}
}
userContext = initializeUserContext();
}
//最后就是将pending commit 数据进行一个清空
this.pendingCommitTransactions.clear();
//生产一个新的holder 来保存下一个tnx
currentTransactionHolder = beginTransactionInternal();
}

当收到一个barrier 的时候,会进行状态的持久化

public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkState(currentTransactionHolder != null, "bug: no transaction object when performing state snapshot");
long checkpointId = context.getCheckpointId();
//这里进行一个pre commit 的提交,具体在kafka 里面是直接flush
preCommit(currentTransactionHolder.handle);
//这里将当前的tnx 放在 pend comit 列表里面,这个需要持久化的,如果后续恢复了,需要执行commit
pendingCommitTransactions.put(checkpointId, currentTransactionHolder);
//开启一个新的 holder 用于存储下一个tnx
currentTransactionHolder = beginTransactionInternal();
//clean 一下
state.clear();
//加入到operator 的state ,这里的this.currentTransactionHolder 是没有进行precommit 的所以后续恢复的时候需要abort
//pendingCommitTransactions.values() 是已pre commit 了,在恢复的时候进行commit 的一个提交
state.add(new State<>(
this.currentTransactionHolder,
new ArrayList<>(pendingCommitTransactions.values()),
userContext));
}

当所有的barrire 已经完成,即使是pre commit 已经完成了,job manager (flink) 会notify 各个 operator

public final void notifyCheckpointComplete(long checkpointId) throws Exception {
//1: 正常的完成一个事物 ---> commit
// 2: 因为checkpoint 的失败(有可能会这样的情况),所以会有多个需要提交的事务
// checkpoint 失败的原因如下:
// master 不能持久化 checkpoint 的数据(可能存储系统临时失败),但是还是会调用checklister 的方法
// 其他的task mananger 不能持久化上一个ck 的数据,同时也没有上报失败,因为它们可以在后续检查点中成功地保持其态
// 但是我们需要提交之前没有达到commit 状态的事务
//3: 有多个pending 的事物,但是他不是当前最新的一个,因为notify 会有延迟
// 4: 当触发这个方法的时候,也就表明了这里没有个tnx不是已经pre commit 的,其实就是所有的tnx 都可以commiit
Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
Throwable firstError = null;
while (pendingTransactionIterator.hasNext()) {
Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
Long pendingTransactionCheckpointId = entry.getKey();
TransactionHolder<TXN> pendingTransaction = entry.getValue();
//这里就是有可能 notify 会有延迟,导致需要ckpid > 当前的checkpid
if (pendingTransactionCheckpointId > checkpointId) {
continue;
}
//这里就是一个延迟log
logWarningIfTimeoutAlmostReached(pendingTransaction);
try {
//这里直接进行commit
commit(pendingTransaction.handle);
} catch (Throwable t) {
if (firstError == null) {
firstError = t;
}
}
//移除掉 已经comm
pendingTransactionIterator.remove();
}
if (firstError != null) {
throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
firstError);
}
}

我们可以看一下FlinkKafkaProducer011 是怎么实现刚才那个2阶段提交的模型的
KafkaTransactionState 里面保存了 transaction Id producer 的信息 这里不做详细的说明

protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
switch (semantic) {
//如果数据是准确一次,那我么就启动producer的 beginTransaction
case EXACTLY_ONCE:
FlinkKafkaProducer<byte[], byte[]> producer = createTransactionalProducer();
producer.beginTransaction();
return new KafkaTransactionState(producer.getTransactionalId(), producer);
…...
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
}
protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
//这里直接call flush,最后调用 producer 里面,他会吧recorder 发送到kafka 的broker 上面,这里可能需要结合kafka 的
//leo watermark 来管理 consumer 的消费
flush(transaction);
break;
case NONE:
break;
default:
throw new UnsupportedOperationException("Not implemented semantic");
}
checkErroneous();
}

这个比较简单,直接使用kafka 的producer 的commit transaction 就可以了

protected void commit(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}
//同上
protected void abort(KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
transaction.producer.abortTransaction();
recycleTransactionalProducer(transaction.producer);
}
}

总结,flink 框架已经帮我们实现 量阶段提交 的主要模型抽象了,我么只需要简单的override 几个方法就可以了
但是这里还有一个问题,就是我们本来是使用kafka 作为source的是,那么在source 端是不是应该手动的管理kafka 的offset
这样作为一个整体才是一个end-to-end 的语言保证