英文:
Is there any way to ensure all CheckpointListeners notified about checkpoint completion on Flink on job cancel with savepoint?
问题
我正在使用flink 1.9版本和REST API /jobs/:jobid/savepoints
来触发保存点并取消作业(优雅地停止作业以稍后从保存点运行)。
我在源函数中使用了两阶段提交,因此我的源代码同时实现了CheckpointedFunction
和CheckpointListener
接口。在snapshotState()
方法调用时,我会对内部状态进行快照,而在notifyCheckpointComplete()
中,我会将状态保存到第三方系统。
从源代码中我能看到,只有CheckpointCoordinator
中的snapshotState()
部分是同步的 -
// 将消息发送给触发其检查点的任务
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
检查点的确认和完成通知在AsyncCheckpointRunnable
中是异步的。
也就是说,当以cancel-job
设置为true
触发保存点时,快照被获取后,某些任务管理器会在取消作业之前一直等待接收完成通知并执行notifyCheckpointComplete()
,而某些则不会。
问题是,是否有一种方法可以取消作业并确保所有任务管理器在作业取消之前都会调用notifyCheckpointComplete()
,或者目前没有办法实现这一点?
英文:
I'm using flink 1.9 and the REST API /jobs/:jobid/savepoints
to trigger the savepoint and cancel job (stop the job gracefully to run later on from savepoint).
I use a two-phase commit in source function so my source implements both CheckpointedFunction
and CheckpointListener
interfaces. On snapshotState()
method call I snapshot the internal state and on notifyCheckpointComplete()
I checkpoint state to 3rd party system.
From what I can see from source code, only the snapshotState()
part is synchronous in CheckpointCoordinator
-
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
The checkpoint acknowledge and completion notification is asynchronous in AsyncCheckpointRunnable
.
That being said, when the savepoint
with cancel-job
set to true
is triggered, after the snapshot is taken, some of the Task Managers keep up to receive completion notification before the job cancelling and execute notifyCheckpointComplete()
, and some not.
The question is whether there is a way to cancel job with savepoint so that the notifyCheckpointComplete()
is guaranteed to be invoked by all Task Managers before job cancelled or there is no way to achieve this at the moment ?
答案1
得分: 2
已经有一段时间没有看过Flink 1.9了,所以请对我的答案保持一些谨慎。
我猜测你的数据源可能会取消得太早了。因此,“notifyCheckpointComplete”实际上会发送给所有任务,但一些“SourceFunction”可能已经退出了“run”方法,并且相应的任务已被清理。
据我所知,如果你在收到最后一个“notifyCheckpointComplete”之前忽略取消和中断,你描述的情况应该是可能的。
class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
private volatile boolean canceled = false;
private volatile boolean pendingCheckpoint = false;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
pendingCheckpoint = true;
// 启动两阶段提交
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// 完成两阶段提交
pendingCheckpoint = false;
}
@Override
public void run(SourceContext<Object> ctx) throws Exception {
while (!canceled) {
// 进行正常的数据源操作
}
// 在取消后继续保持任务运行
while (pendingCheckpoint) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// 忽略中断直到两阶段提交完成
}
}
}
@Override
public void cancel() {
canceled = true;
}
}
希望这对你有所帮助。
英文:
It's been a while since I looked at Flink 1.9 so please take my answer with some caution.
My guess is that your sources cancel too early. So notifyCheckpointComplete
is actually sent to all tasks, but some SourceFunction
s already quit the run
and the respective task is cleaned up.
Afaik, what you described should be possible if you ignore cancellation and interruptions until you have received the last notifyCheckpointComplete
.
class YourSource implements SourceFunction<Object>, CheckpointListener, CheckpointedFunction {
private volatile boolean canceled = false;
private volatile boolean pendingCheckpoint = false;
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
pendingCheckpoint = true;
// start two-phase commit
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// finish two-phase commit
pendingCheckpoint = false;
}
@Override
public void run(SourceContext<Object> ctx) throws Exception {
while (!canceled) {
// do normal source stuff
}
// keep the task running after cancellation
while (pendingCheckpoint) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// ignore interruptions until two-phase commit is done
}
}
}
@Override
public void cancel() {
canceled = true;
}
}
答案2
得分: 1
使用stop-with-savepoint[1][2]是否能解决问题?
[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop
[2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
英文:
Wouldn't using stop-with-savepoint[1][2] solve the problem?
[1]https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs-jobid-stop
[2]https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论