英文:
Apache Flink Recommend approach for handling external API calls with KeyedProcessFunction
问题
I am quite new to Flink and have a question regarding making external API calls from a KeyedProcessFunction's processElement
function.
In our current setup, we are making synchronous API calls to an upstream service, while updating the KeyedProcessFunction's
state accordingly based on the response received. This works for now, but I am wondering if it's the best way to go about it.
From what I understand, the recommended approach in Flink for handling external data access is to use the Asynchronous IO API. However, I am not too sure how I'd be able to fit this into our use case without some sort of refactoring, considering RichAsyncFunction
is stateless.
What would be the best way to go about this? Or is KeyedProcessFunction
not suited for this purpose in the first place? Thank you.
Example code to illustrate my point:
public class StoppingLight extends KeyedProcessFunction<String, String> {
private ValueState<String> previousState;
.....
@Override
public void processElement(String newState, Context ctx, Collector<String> out) {
// getNextState() is a synchronous HTTPS API call to some external endpoint
String nextState = getNextState(newState, previousState.value());
if(nextState.equals("Red")) {
this.previousState.update("Yellow");
} else if(nextState.equals("Green")) {
this.previousState.update("Red");
}
}
}
I thought about taking the HTTP request out of the KeyedProcessFunction, and chaining it against an async-io operator somehow, but that probably wouldn't work because the business logic needs to get the return value from the API call in order to work.
英文:
I am quite new to Flink and have a question regarding making external API calls from a KeyedProcessedFunction's processElement
function.
In our current setup, we are making synchronous API calls to an upstream service, while updating the KeyedProcessFunction
's state accordingly based on the response received. This works for now, but I am wondering if it's the best way to go about it.
From what I understand, the recommended approach in Flink for handling external data access is to use the Asynchronous IO API. However, I am not too sure how I'd be able to fit this into our use case without some sort of refactoring, considering RichAsyncFunction
is stateless.
What would be the best way to go about this? Or is KeyedProcessFunction
not suited for this purpose in the first place? Thank you.
Example code to illustrate my point:
public class StoppingLight extends KeyProcessedFunction<String, String> {
private ValueState<String> previousState;
.....
@Override
public void processElement(String newState, Context ctx, Collector<String> out) {
// getNextState() is a synchrnous HTTPS API call to some external endpoint
String nextState = getNextState(newState, previousState.value());
if(nextState == "Red") {
this.previousState = "Yellow";
} else if(nextState == "Green") {
this.previousState= "Red";
}
}
}
I thought about taking the http request out of the KeyedProcessFunction, and chaining it against an async-io operator somehow, but that probably wouldn't work because the business logic needs to get the return value from the API call in order to work.
答案1
得分: 1
从任何 Flink 用户函数(如 KeyedProcessFunction)执行同步 API 调用并不推荐,因为这会干扰检查点。涉及的运算符在用户函数被阻塞等待同步 I/O 时无法执行任何操作。即使大多数情况下正常工作,这会增加检查点失败的风险。更不用说吞吐量和资源利用率差的问题。
然而,我不确定有什么替代建议。(不确定为什么异步 I/O 不适合您的用例。)
英文:
Doing synchronous API calls from any Flink user function (such as a KeyedProcessFunction) is not recommended, because this interferes with checkpointing. The operator involved can't do anything while the user function is blocked waiting on the synchronous I/O. Even if it works fine most of the time, this risks causing checkpoint failures. Not to mention poor throughput and resource utilization.
However, I'm not sure what to recommend as an alternative. (Not sure why asynchronous I/O isn't a good option for your use case.)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论