Is there any way to ensure all CheckpointListeners notified about checkpoint completion on Flink on job cancel with savepoint?

huangapple go评论57阅读模式
英文:

Is there any way to ensure all CheckpointListeners notified about checkpoint completion on Flink on job cancel with savepoint?

问题

我正在使用flink 1.9版本和REST API /jobs/:jobid/savepoints 来触发保存点并取消作业(优雅地停止作业以稍后从保存点运行)。

我在源函数中使用了两阶段提交,因此我的源代码同时实现了CheckpointedFunctionCheckpointListener接口。在snapshotState()方法调用时,我会对内部状态进行快照,而在notifyCheckpointComplete()中,我会将状态保存到第三方系统。

从源代码中我能看到,只有CheckpointCoordinator中的snapshotState()部分是同步的 -

// 将消息发送给触发其检查点的任务
for (Execution execution: executions) {
    if (props.isSynchronous()) {
        execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
    } else {
        execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
    }
}

检查点的确认和完成通知在AsyncCheckpointRunnable中是异步的。

也就是说,当以cancel-job设置为true触发保存点时,快照被获取后,某些任务管理器会在取消作业之前一直等待接收完成通知并执行notifyCheckpointComplete(),而某些则不会。

问题是,是否有一种方法可以取消作业并确保所有任务管理器在作业取消之前都会调用notifyCheckpointComplete(),或者目前没有办法实现这一点?

英文:

I'm using flink 1.9 and the REST API /jobs/:jobid/savepoints to trigger the savepoint and cancel job (stop the job gracefully to run later on from savepoint).

I use a two-phase commit in source function so my source implements both CheckpointedFunction and CheckpointListener interfaces. On snapshotState() method call I snapshot the internal state and on notifyCheckpointComplete() I checkpoint state to 3rd party system.

From what I can see from source code, only the snapshotState() part is synchronous in CheckpointCoordinator -

// send the messages to the tasks that trigger their checkpoint
				for (Execution execution: executions) {
					if (props.isSynchronous()) {
						execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
					} else {
						execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
					}
				}

The checkpoint acknowledge and completion notification is asynchronous in AsyncCheckpointRunnable.

That being said, when the savepoint with cancel-job set to true is triggered, after the snapshot is taken, some of the Task Managers keep up to receive completion notification before the job cancelling and execute notifyCheckpointComplete(), and some not.

The question is whether there is a way to cancel job with savepoint so that the notifyCheckpointComplete() is guaranteed to be invoked by all Task Managers before job cancelled or there is no way to achieve this at the moment ?

答案1

得分: 2

已经有一段时间没有看过Flink 1.9了,所以请对我的答案保持一些谨慎。

我猜测你的数据源可能会取消得太早了。因此,“notifyCheckpointComplete”实际上会发送给所有任务,但一些“SourceFunction”可能已经退出了“run”方法,并且相应的任务已被清理。

据我所知,如果你在收到最后一个“notifyCheckpointComplete”之前忽略取消和中断,你描述的情况应该是可能的。

class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
    private volatile boolean canceled = false;
    private volatile boolean pendingCheckpoint = false;

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        pendingCheckpoint = true;
        // 启动两阶段提交
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {

    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        // 完成两阶段提交
        pendingCheckpoint = false;
    }

    @Override
    public void run(SourceContext<Object> ctx) throws Exception {
        while (!canceled) {
            // 进行正常的数据源操作
        }
        // 在取消后继续保持任务运行
        while (pendingCheckpoint) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                // 忽略中断直到两阶段提交完成
            }
        }
    }

    @Override
    public void cancel() {
        canceled = true;
    }
}

希望这对你有所帮助。

英文:

It's been a while since I looked at Flink 1.9 so please take my answer with some caution.

My guess is that your sources cancel too early. So notifyCheckpointComplete is actually sent to all tasks, but some SourceFunctions already quit the run and the respective task is cleaned up.

Afaik, what you described should be possible if you ignore cancellation and interruptions until you have received the last notifyCheckpointComplete.

class YourSource implements SourceFunction&lt;Object&gt;, CheckpointListener, CheckpointedFunction {
	private volatile boolean canceled = false;
	private volatile boolean pendingCheckpoint = false;

	@Override
	public void snapshotState(FunctionSnapshotContext context) throws Exception {
		pendingCheckpoint = true;
		// start two-phase commit
	}

	@Override
	public void initializeState(FunctionInitializationContext context) throws Exception {

	}

	@Override
	public void notifyCheckpointComplete(long checkpointId) throws Exception {
		// finish two-phase commit
		pendingCheckpoint = false;
	}

	@Override
	public void run(SourceContext&lt;Object&gt; ctx) throws Exception {
		while (!canceled) {
			// do normal source stuff
		}
		// keep the task running after cancellation
		while (pendingCheckpoint) {
			try {
				Thread.sleep(1);
			} catch (InterruptedException e) {
				// ignore interruptions until two-phase commit is done
			}
		}
	}

	@Override
	public void cancel() {
		canceled = true;
	}
}

答案2

得分: 1

使用stop-with-savepoint[1][2]是否能解决问题?

[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop
[2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html

英文:

Wouldn't using stop-with-savepoint[1][2] solve the problem?

[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop
[2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html

huangapple
  • 本文由 发表于 2020年8月7日 17:20:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/63298860.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定