英文:
Get the max value from array of integers in nifi
问题
I have an array of values like [9308023, 48243, 429402, 589348934, 4943, 4298040240, 424820482] from the upstream processor (EvalueteJsonPath) and I want to get the max value of it using ExecuteScript -which is the downstream processor and send to the next downstream processor, whatever its.
我有一个类似于[9308023、48243、429402、589348934、4943、4298040240、424820482]的值数组,来自上游处理器(EvalueteJsonPath),我想使用ExecuteScript(下游处理器)获取它们的最大值,并将其发送到下一个下游处理器,无论它是什么。
I tried using python and below is my code.
我尝试使用Python,以下是我的代码。
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
# Define a subclass of StreamCallback to handle the incoming flow file
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
# Read the flow file content as a string
flowFileText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# Split the input text into a list of IDs, trim white spaces, and convert to integers
ids = [int(id.strip()) for id in flowFileText.strip().split(',')]
if len(ids) > 0:
# Sort the IDs in ascending order
sorted_ids = sorted(ids)
# Get the first ID from the sorted list
first_id = sorted_ids[0]
# Convert the first ID back to a string
first_id_str = str(first_id)
# Write the first ID to the output stream
outputStream.write(first_id_str)
else:
# If no IDs are present, write an empty string to the output stream
outputStream.write("")
# Create an instance of the callback class
streamCallback = PyStreamCallback()
# Process incoming flow files
flowFile = session.get()
if flowFile is not None:
try:
# Execute the callback on the flow file
session.read(flowFile, streamCallback)
session.write(flowFile, streamCallback)
# Transfer the flow file to success relationship
session.transfer(flowFile, REL_SUCCESS)
session.commit()
except Exception as e:
# Log the exception
log.error("Failed to process flow file: " + str(e))
session.transfer(flowFile, REL_FAILURE)
session.commit()
Output:
error with the following error message
"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] Failed to process flow file: read(): 2nd arg can't be coerced to org.apache.nifi.processor.io.InputStreamCallback"
输出:
出现以下错误消息的错误
"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] 处理流文件失败:read(): 第二个参数无法强制转换为org.apache.nifi.processor.io.InputStreamCallback"
英文:
I have an array of values like [9308023, 48243, 429402, 589348934, 4943, 4298040240, 424820482] from the upstream processor (EvalueteJsonPath) and I want to get the max value of it using ExecuteScript -which is the downstream processor and send to the next downstream processor, whatever its.
I tried using python and below is my code.
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
# Define a subclass of StreamCallback to handle the incoming flow file
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self, inputStream, outputStream):
# Read the flow file content as a string
flowFileText = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
# Split the input text into a list of IDs, trim white spaces, and convert to integers
ids = [int(id.strip()) for id in flowFileText.strip().split(',')]
if len(ids) > 0:
# Sort the IDs in ascending order
sorted_ids = sorted(ids)
# Get the first ID from the sorted list
first_id = sorted_ids[0]
# Convert the first ID back to a string
first_id_str = str(first_id)
# Write the first ID to the output stream
outputStream.write(first_id_str)
else:
# If no IDs are present, write an empty string to the output stream
outputStream.write("")
# Create an instance of the callback class
streamCallback = PyStreamCallback()
# Process incoming flow files
flowFile = session.get()
if flowFile is not None:
try:
# Execute the callback on the flow file
session.read(flowFile, streamCallback)
session.write(flowFile, streamCallback)
# Transfer the flow file to success relationship
session.transfer(flowFile, REL_SUCCESS)
session.commit()
except Exception as e:
# Log the exception
log.error("Failed to process flow file: " + str(e))
session.transfer(flowFile, REL_FAILURE)
session.commit()
Output:
error with the following error message
"ExecuteScript[id=018810db-110d-17c5-17da-0fdf1ca49296] Failed to process flow file: read(): 2nd arg can't be coerced to org.apache.nifi.processor.io.InputStreamCallback
"
答案1
得分: 0
你可以使用以下示例:
[
{
"operation": "shift",
"spec": {
"*": {
"rating": "rating"
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
// 最小值
"minRating": "=min(@(1,rating))",
// 最大值
"maxRating": "=max(@(1,rating))"
}
}
]
然后,您可以使用EvaluateJsonPath提取最大和最小值,例如添加属性max_value,其值为$.maxRating,以及最小值也是如此。最后,您可以将其连接到下游处理器,如UpdateAttribute。
来源:https://github.com/bazaarvoice/jolt/issues/700
希望对您有帮助。
英文:
You can use the below example
[
{
"operation": "shift",
"spec": {
"*": {
"rating": "rating"
}
}
},
{
"operation": "modify-overwrite-beta",
"spec": {
//Min Value
"minRating": "=min(@(1,rating))",
// Max value
"maxMax": "=max(@(1,rating))"
}
}
]
Then you can extract the max and min values using EvaluateJsonPath such that
add property max_value with value $.maxRating, and same for the min as well. Finally, you can connect it with the downstream processor like UpdateAttribute.
source: https://github.com/bazaarvoice/jolt/issues/700
I hope its helpful.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论