从整数数组中获取最大值在nifi中。

huangapple go评论51阅读模式
英文:

Get the max value from array of integers in nifi

问题

I have an array of values like [9308023, 48243, 429402, 589348934, 4943, 4298040240, 424820482] from the upstream processor (EvalueteJsonPath) and I want to get the max value of it using ExecuteScript -which is the downstream processor and send to the next downstream processor, whatever its.

我有一个类似于[9308023、48243、429402、589348934、4943、4298040240、424820482]的值数组,来自上游处理器(EvalueteJsonPath),我想使用ExecuteScript(下游处理器)获取它们的最大值,并将其发送到下一个下游处理器,无论它是什么。

I tried using python and below is my code.

我尝试使用Python,以下是我的代码。

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

# Define a subclass of StreamCallback to handle the incoming flow file
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        # Read the flow file content as a string
        flowFileText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

        # Split the input text into a list of IDs, trim white spaces, and convert to integers
        ids = [int(id.strip()) for id in flowFileText.strip().split(',')]

        if len(ids) > 0:
            # Sort the IDs in ascending order
            sorted_ids = sorted(ids)

            # Get the first ID from the sorted list
            first_id = sorted_ids[0]

            # Convert the first ID back to a string
            first_id_str = str(first_id)

            # Write the first ID to the output stream
            outputStream.write(first_id_str)
        else:
            # If no IDs are present, write an empty string to the output stream
            outputStream.write("")

# Create an instance of the callback class
streamCallback = PyStreamCallback()

# Process incoming flow files
flowFile = session.get()
if flowFile is not None:
    try:
        # Execute the callback on the flow file
        session.read(flowFile, streamCallback)
        session.write(flowFile, streamCallback)

        # Transfer the flow file to success relationship
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
    except Exception as e:
        # Log the exception
        log.error("Failed to process flow file: " + str(e))
        session.transfer(flowFile, REL_FAILURE)
        session.commit()

Output:
error with the following error message

"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] Failed to process flow file: read(): 2nd arg can't be coerced to org.apache.nifi.processor.io.InputStreamCallback"
输出:
出现以下错误消息的错误

"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] 处理流文件失败:read(): 第二个参数无法强制转换为org.apache.nifi.processor.io.InputStreamCallback"

英文:

I have an array of values like [9308023, 48243, 429402, 589348934, 4943, 4298040240, 424820482] from the upstream processor (EvalueteJsonPath) and I want to get the max value of it using ExecuteScript -which is the downstream processor and send to the next downstream processor, whatever its.

I tried using python and below is my code.

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

# Define a subclass of StreamCallback to handle the incoming flow file
class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        # Read the flow file content as a string
        flowFileText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

        # Split the input text into a list of IDs, trim white spaces, and convert to integers
        ids = [int(id.strip()) for id in flowFileText.strip().split(',')]

        if len(ids) > 0:
            # Sort the IDs in ascending order
            sorted_ids = sorted(ids)

            # Get the first ID from the sorted list
            first_id = sorted_ids[0]

            # Convert the first ID back to a string
            first_id_str = str(first_id)

            # Write the first ID to the output stream
            outputStream.write(first_id_str)
        else:
            # If no IDs are present, write an empty string to the output stream
            outputStream.write("")

# Create an instance of the callback class
streamCallback = PyStreamCallback()

# Process incoming flow files
flowFile = session.get()
if flowFile is not None:
    try:
        # Execute the callback on the flow file
        session.read(flowFile, streamCallback)
        session.write(flowFile, streamCallback)

        # Transfer the flow file to success relationship
        session.transfer(flowFile, REL_SUCCESS)
        session.commit()
    except Exception as e:
        # Log the exception
        log.error("Failed to process flow file: " + str(e))
        session.transfer(flowFile, REL_FAILURE)
        session.commit()

Output:
error with the following error message

"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] Failed to process flow file: read(): 2nd arg can't be coerced to org.apache.nifi.processor.io.InputStreamCallback
"

答案1

得分: 0

你可以使用以下示例:

[
  {
    "operation": "shift",
    "spec": {
      "*": {
        "rating": "rating"
      }
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      // 最小值
      "minRating": "=min(@(1,rating))",
      // 最大值
      "maxRating": "=max(@(1,rating))"
    }
  }
]

然后,您可以使用EvaluateJsonPath提取最大和最小值,例如添加属性max_value,其值为$.maxRating,以及最小值也是如此。最后,您可以将其连接到下游处理器,如UpdateAttribute。

来源:https://github.com/bazaarvoice/jolt/issues/700

希望对您有帮助。

英文:

You can use the below example

[
  {
    "operation": "shift",
    "spec": {
      "*": {
        "rating": "rating"
      }
    }
  },
  {
    "operation": "modify-overwrite-beta",
    "spec": {
      //Min Value
      "minRating": "=min(@(1,rating))",
      // Max value
      "maxMax": "=max(@(1,rating))" 
    }
  }
]

jolt demo

从整数数组中获取最大值在nifi中。

Then you can extract the max and min values using EvaluateJsonPath such that
add property max_value with value $.maxRating, and same for the min as well. Finally, you can connect it with the downstream processor like UpdateAttribute.

source: https://github.com/bazaarvoice/jolt/issues/700

I hope its helpful.

huangapple
  • 本文由 发表于 2023年6月12日 22:03:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76457437.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定