从整数数组中获取最大值在nifi中。

huangapple go评论93阅读模式
英文:

Get the max value from array of integers in nifi

问题

I have an array of values like [9308023, 48243, 429402, 589348934, 4943, 4298040240, 424820482] from the upstream processor (EvalueteJsonPath) and I want to get the max value of it using ExecuteScript -which is the downstream processor and send to the next downstream processor, whatever its.

我有一个类似于[9308023、48243、429402、589348934、4943、4298040240、424820482]的值数组,来自上游处理器(EvalueteJsonPath),我想使用ExecuteScript(下游处理器)获取它们的最大值,并将其发送到下一个下游处理器,无论它是什么。

I tried using python and below is my code.

我尝试使用Python,以下是我的代码。

  1. from org.apache.commons.io import IOUtils
  2. from java.nio.charset import StandardCharsets
  3. from org.apache.nifi.processor.io import StreamCallback
  4. # Define a subclass of StreamCallback to handle the incoming flow file
  5. class PyStreamCallback(StreamCallback):
  6. def __init__(self):
  7. pass
  8. def process(self, inputStream, outputStream):
  9. # Read the flow file content as a string
  10. flowFileText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  11. # Split the input text into a list of IDs, trim white spaces, and convert to integers
  12. ids = [int(id.strip()) for id in flowFileText.strip().split(',')]
  13. if len(ids) > 0:
  14. # Sort the IDs in ascending order
  15. sorted_ids = sorted(ids)
  16. # Get the first ID from the sorted list
  17. first_id = sorted_ids[0]
  18. # Convert the first ID back to a string
  19. first_id_str = str(first_id)
  20. # Write the first ID to the output stream
  21. outputStream.write(first_id_str)
  22. else:
  23. # If no IDs are present, write an empty string to the output stream
  24. outputStream.write("")
  25. # Create an instance of the callback class
  26. streamCallback = PyStreamCallback()
  27. # Process incoming flow files
  28. flowFile = session.get()
  29. if flowFile is not None:
  30. try:
  31. # Execute the callback on the flow file
  32. session.read(flowFile, streamCallback)
  33. session.write(flowFile, streamCallback)
  34. # Transfer the flow file to success relationship
  35. session.transfer(flowFile, REL_SUCCESS)
  36. session.commit()
  37. except Exception as e:
  38. # Log the exception
  39. log.error("Failed to process flow file: " + str(e))
  40. session.transfer(flowFile, REL_FAILURE)
  41. session.commit()

Output:
error with the following error message

"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] Failed to process flow file: read(): 2nd arg can't be coerced to org.apache.nifi.processor.io.InputStreamCallback"
输出:
出现以下错误消息的错误

"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] 处理流文件失败:read(): 第二个参数无法强制转换为org.apache.nifi.processor.io.InputStreamCallback"

英文:

I have an array of values like [9308023, 48243, 429402, 589348934, 4943, 4298040240, 424820482] from the upstream processor (EvalueteJsonPath) and I want to get the max value of it using ExecuteScript -which is the downstream processor and send to the next downstream processor, whatever its.

I tried using python and below is my code.

  1. from org.apache.commons.io import IOUtils
  2. from java.nio.charset import StandardCharsets
  3. from org.apache.nifi.processor.io import StreamCallback
  4. # Define a subclass of StreamCallback to handle the incoming flow file
  5. class PyStreamCallback(StreamCallback):
  6. def __init__(self):
  7. pass
  8. def process(self, inputStream, outputStream):
  9. # Read the flow file content as a string
  10. flowFileText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
  11. # Split the input text into a list of IDs, trim white spaces, and convert to integers
  12. ids = [int(id.strip()) for id in flowFileText.strip().split(',')]
  13. if len(ids) > 0:
  14. # Sort the IDs in ascending order
  15. sorted_ids = sorted(ids)
  16. # Get the first ID from the sorted list
  17. first_id = sorted_ids[0]
  18. # Convert the first ID back to a string
  19. first_id_str = str(first_id)
  20. # Write the first ID to the output stream
  21. outputStream.write(first_id_str)
  22. else:
  23. # If no IDs are present, write an empty string to the output stream
  24. outputStream.write("")
  25. # Create an instance of the callback class
  26. streamCallback = PyStreamCallback()
  27. # Process incoming flow files
  28. flowFile = session.get()
  29. if flowFile is not None:
  30. try:
  31. # Execute the callback on the flow file
  32. session.read(flowFile, streamCallback)
  33. session.write(flowFile, streamCallback)
  34. # Transfer the flow file to success relationship
  35. session.transfer(flowFile, REL_SUCCESS)
  36. session.commit()
  37. except Exception as e:
  38. # Log the exception
  39. log.error("Failed to process flow file: " + str(e))
  40. session.transfer(flowFile, REL_FAILURE)
  41. session.commit()

Output:
error with the following error message

"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] Failed to process flow file: read(): 2nd arg can't be coerced to org.apache.nifi.processor.io.InputStreamCallback
"

答案1

得分: 0

你可以使用以下示例:

  1. [
  2. {
  3. "operation": "shift",
  4. "spec": {
  5. "*": {
  6. "rating": "rating"
  7. }
  8. }
  9. },
  10. {
  11. "operation": "modify-overwrite-beta",
  12. "spec": {
  13. // 最小值
  14. "minRating": "=min(@(1,rating))",
  15. // 最大值
  16. "maxRating": "=max(@(1,rating))"
  17. }
  18. }
  19. ]

然后,您可以使用EvaluateJsonPath提取最大和最小值,例如添加属性max_value,其值为$.maxRating,以及最小值也是如此。最后,您可以将其连接到下游处理器,如UpdateAttribute。

来源:https://github.com/bazaarvoice/jolt/issues/700

希望对您有帮助。

英文:

You can use the below example

  1. [
  2. {
  3. "operation": "shift",
  4. "spec": {
  5. "*": {
  6. "rating": "rating"
  7. }
  8. }
  9. },
  10. {
  11. "operation": "modify-overwrite-beta",
  12. "spec": {
  13. //Min Value
  14. "minRating": "=min(@(1,rating))",
  15. // Max value
  16. "maxMax": "=max(@(1,rating))"
  17. }
  18. }
  19. ]

jolt demo

从整数数组中获取最大值在nifi中。

Then you can extract the max and min values using EvaluateJsonPath such that
add property max_value with value $.maxRating, and same for the min as well. Finally, you can connect it with the downstream processor like UpdateAttribute.

source: https://github.com/bazaarvoice/jolt/issues/700

I hope its helpful.

huangapple
  • 本文由 发表于 2023年6月12日 22:03:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76457437.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定