Apache Flink 使用 KeyedProcessFunction 处理外部 API 调用的推荐方法。

huangapple go评论100阅读模式
英文:

Apache Flink Recommend approach for handling external API calls with KeyedProcessFunction

问题

I am quite new to Flink and have a question regarding making external API calls from a KeyedProcessFunction's processElement function.

In our current setup, we are making synchronous API calls to an upstream service, while updating the KeyedProcessFunction's state accordingly based on the response received. This works for now, but I am wondering if it's the best way to go about it.

From what I understand, the recommended approach in Flink for handling external data access is to use the Asynchronous IO API. However, I am not too sure how I'd be able to fit this into our use case without some sort of refactoring, considering RichAsyncFunction is stateless.

What would be the best way to go about this? Or is KeyedProcessFunction not suited for this purpose in the first place? Thank you.

Example code to illustrate my point:

public class StoppingLight extends KeyedProcessFunction<String, String> {
  
    private ValueState<String> previousState;
     
     .....
   @Override
    public void processElement(String newState, Context ctx, Collector<String> out) {
        // getNextState() is a synchronous HTTPS API call to some external endpoint
        String nextState = getNextState(newState, previousState.value());     
  
        if(nextState.equals("Red")) {
           this.previousState.update("Yellow");
        } else if(nextState.equals("Green")) {
           this.previousState.update("Red");
        } 
    } 
}

I thought about taking the HTTP request out of the KeyedProcessFunction, and chaining it against an async-io operator somehow, but that probably wouldn't work because the business logic needs to get the return value from the API call in order to work.

英文:

I am quite new to Flink and have a question regarding making external API calls from a KeyedProcessedFunction's processElement function.

In our current setup, we are making synchronous API calls to an upstream service, while updating the KeyedProcessFunction's state accordingly based on the response received. This works for now, but I am wondering if it's the best way to go about it.

From what I understand, the recommended approach in Flink for handling external data access is to use the Asynchronous IO API. However, I am not too sure how I'd be able to fit this into our use case without some sort of refactoring, considering RichAsyncFunction is stateless.

What would be the best way to go about this? Or is KeyedProcessFunction not suited for this purpose in the first place? Thank you.

Example code to illustrate my point:

public class StoppingLight extends KeyProcessedFunction&lt;String, String&gt;  {
  
    private ValueState&lt;String&gt; previousState;
     
     .....
   @Override
    public void processElement(String newState, Context ctx, Collector&lt;String&gt; out) {
        // getNextState() is a synchrnous HTTPS API call to some external endpoint
        String nextState = getNextState(newState, previousState.value());     
  
        if(nextState == &quot;Red&quot;) {
           this.previousState = &quot;Yellow&quot;;
        } else if(nextState == &quot;Green&quot;) {
           this.previousState= &quot;Red&quot;;
        } 
    } 
}

I thought about taking the http request out of the KeyedProcessFunction, and chaining it against an async-io operator somehow, but that probably wouldn't work because the business logic needs to get the return value from the API call in order to work.

答案1

得分: 1

从任何 Flink 用户函数(如 KeyedProcessFunction)执行同步 API 调用并不推荐,因为这会干扰检查点。涉及的运算符在用户函数被阻塞等待同步 I/O 时无法执行任何操作。即使大多数情况下正常工作,这会增加检查点失败的风险。更不用说吞吐量和资源利用率差的问题。

然而,我不确定有什么替代建议。(不确定为什么异步 I/O 不适合您的用例。)

英文:

Doing synchronous API calls from any Flink user function (such as a KeyedProcessFunction) is not recommended, because this interferes with checkpointing. The operator involved can't do anything while the user function is blocked waiting on the synchronous I/O. Even if it works fine most of the time, this risks causing checkpoint failures. Not to mention poor throughput and resource utilization.

However, I'm not sure what to recommend as an alternative. (Not sure why asynchronous I/O isn't a good option for your use case.)

huangapple
  • 本文由 发表于 2023年3月31日 03:13:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/75892122.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定