Apache Nifi – 实现 While True 和 Wait

huangapple go评论112阅读模式
英文:

Apache Nifi - Implementing While True and Wait

问题

In Nifi, I would like to implement the following logic in Nifi.

while(true) {
  prj_result = invoke_prj_Rest_EndPoint();
  prjs_list = prjs_list["prjs"];
  for prj in prjs_list:
       emp_response = invoke_emp_rest_Endpoint(prj["id"]);
       next_cursor = emp_response["meta"]["next_cursor"];
       while next_cursor:
         sleep(10 secs);
         emps = emp_response["emps"];
         for emp in emps:
             emp["mod_time"] = now();
         loadEmpDataToS3()
  sleep(5Mins)
}

Approach I have so far,

Apache Nifi – 实现 While True 和 Wait

For while(true) --> I am initializing a param i = 1 and checking $(i:eq(1)) and not incrementing. Is there a better approach?
For Sleep or not sure what is the best approach?
Also, Not sure how to implement for and prj in prjs_list and internal while.

英文:

In Nifi, I would like to implement the following logic in Nifi.

while(true) {
  prj_result = invoke_prj_Rest_EndPoint();
  prjs_list = prjs_list["prjs"];
  for prj in prjs_list:
       emp_response = invoke_emp_rest_Endpoint(prj["id"]);
       next_cursor = emp_response["meta"]["next_cursor"];
       while next_cursor:
         sleep(10 secs);
         emps = emp_response["emps"];
         for emp in emps:
             emp["mod_time"] = now();
         loadEmpDataToS3()
  sleep(5Mins)
}

Approach I have so far,

Apache Nifi – 实现 While True 和 Wait

For while(true) --> I am initializing a param i = 1 and checking $(i:eq(1)) and not incrementing. Is there a better approach?
For Sleep or not sure what is the best approach?
Also, Not sure how to implement for and prj in prjs_list and internal while.

答案1

得分: 1

生成FlowFile - 已经有点像“while(true)” - 它将根据计划生成flowfile。

在第一次调用后,将响应按“prjs”拆分为多个flowfile。如果响应是JSON,您可以使用SplitJson

拆分后 - 对每个“prj” flowfile 进行所需的转换,然后调用invokehttp - 在调度中,您可以设置每10秒执行一次。

因此,流程可能看起来像这样:

      1 GenerateFlowFile
1 --> 2 InvokeHttp        第一步
2 --> 3 SplitJson         按$.prjs拆分
3 --> 4 EvaluateJsonPath  提取$.prj.id
4 --> 5 InvokeHttp        第二步,每10秒执行一次
5 --> 6 转换并放入s3
5 --> 7 EvaluateJsonPath  提取$.meta.next_cursor
7 --> 8 根据属性进行路由(next_cursor不为空)
8 --> 5

Apache Nifi – 实现 While True 和 Wait

英文:

GenerateFlowFile - is already kind a "while(true)" - it will generate flowfile according to a schedule.

after first invokehttp split response by "prjs" to multiple flowfiles. if response is json you can use SplitJson

after splitting - make required transformation on each "prj" flowfile and then call invokehttp - again in scheduling you can set every 10 sec.

So, flow could look like this:

      1 GenerateFlowFile
1 --> 2 InvokeHttp        first
2 --> 3 SplitJson         by $.prjs
3 --> 4 EvaluateJsonPath  extract $.prj.id
4 --> 5 InvokeHttp        second with 10 sec schedule
5 --> 6 transform and put to s3
5 --> 7 EvaluateJsonPath  extract $.meta.next_cursor
7 --> 8 route on attribute (next_cursor is not empty)
8 --> 5


Apache Nifi – 实现 While True 和 Wait

huangapple
  • 本文由 发表于 2023年4月4日 12:49:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/75925607.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定