Is there any way to ensure all CheckpointListeners notified about checkpoint completion on Flink on job cancel with savepoint?

huangapple go评论57阅读模式

Is there any way to ensure all CheckpointListeners notified about checkpoint completion on Flink on job cancel with savepoint?


我正在使用flink 1.9版本和REST API /jobs/:jobid/savepoints 来触发保存点并取消作业(优雅地停止作业以稍后从保存点运行)。


从源代码中我能看到,只有CheckpointCoordinator中的snapshotState()部分是同步的 -

// 将消息发送给触发其检查点的任务
for (Execution execution: executions) {
    if (props.isSynchronous()) {
        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
    } else {
        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);





I'm using flink 1.9 and the REST API /jobs/:jobid/savepoints to trigger the savepoint and cancel job (stop the job gracefully to run later on from savepoint).

I use a two-phase commit in source function so my source implements both CheckpointedFunction and CheckpointListener interfaces. On snapshotState() method call I snapshot the internal state and on notifyCheckpointComplete() I checkpoint state to 3rd party system.

From what I can see from source code, only the snapshotState() part is synchronous in CheckpointCoordinator -

// send the messages to the tasks that trigger their checkpoint
				for (Execution execution: executions) {
					if (props.isSynchronous()) {
						execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
					} else {
						execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);

The checkpoint acknowledge and completion notification is asynchronous in AsyncCheckpointRunnable.

That being said, when the savepoint with cancel-job set to true is triggered, after the snapshot is taken, some of the Task Managers keep up to receive completion notification before the job cancelling and execute notifyCheckpointComplete(), and some not.

The question is whether there is a way to cancel job with savepoint so that the notifyCheckpointComplete() is guaranteed to be invoked by all Task Managers before job cancelled or there is no way to achieve this at the moment ?


得分: 2

已经有一段时间没有看过Flink 1.9了,所以请对我的答案保持一些谨慎。



class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
    private volatile boolean canceled = false;
    private volatile boolean pendingCheckpoint = false;

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        pendingCheckpoint = true;
        // 启动两阶段提交

    public void initializeState(FunctionInitializationContext context) throws Exception {


    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // 完成两阶段提交
        pendingCheckpoint = false;

    public void run(SourceContext<Object> ctx) throws Exception {
        while (!canceled) {
            // 进行正常的数据源操作
        // 在取消后继续保持任务运行
        while (pendingCheckpoint) {
            try {
            } catch (InterruptedException e) {
                // 忽略中断直到两阶段提交完成

    public void cancel() {
        canceled = true;



It's been a while since I looked at Flink 1.9 so please take my answer with some caution.

My guess is that your sources cancel too early. So notifyCheckpointComplete is actually sent to all tasks, but some SourceFunctions already quit the run and the respective task is cleaned up.

Afaik, what you described should be possible if you ignore cancellation and interruptions until you have received the last notifyCheckpointComplete.

class YourSource implements SourceFunction&lt;Object&gt;, CheckpointListener, CheckpointedFunction {
	private volatile boolean canceled = false;
	private volatile boolean pendingCheckpoint = false;

	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		pendingCheckpoint = true;
		// start two-phase commit

	public void initializeState(FunctionInitializationContext context) throws Exception {


	public void notifyCheckpointComplete(long checkpointId) throws Exception {
		// finish two-phase commit
		pendingCheckpoint = false;

	public void run(SourceContext&lt;Object&gt; ctx) throws Exception {
		while (!canceled) {
			// do normal source stuff
		// keep the task running after cancellation
		while (pendingCheckpoint) {
			try {
			} catch (InterruptedException e) {
				// ignore interruptions until two-phase commit is done

	public void cancel() {
		canceled = true;


得分: 1




Wouldn't using stop-with-savepoint[1][2] solve the problem?


  • 本文由 发表于 2020年8月7日 17:20:56
  • 转载请务必保留本文链接:



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
