英文:
How to detect duplicate records based on specific field values at the row level using NiFi?
问题
假设您正在处理一个或甚至多个文件,这些文件可能是在不同时间段内导入的。如果流入的流文件中包含的记录是来自例如每日平面文件的日常加载,您需要一种方法来检测并根据特定字段删除重复值。
在给定的数据集中,我们希望删除具有唯一公司名称和电子邮件地址组合的重复项:
公司名称 | 姓名 | 电子邮件地址 | 费率 |
---|---|---|---|
大型机构A | 约翰 | john@example.com | 105 |
大型机构B | 迈克 | mike@example.com | 130 |
大型机构A | 约翰 | john@example.com | 140 |
大型机构C | 布拉德 | brad@example.com | 110 |
这样,我们将得到基于 UNIQUE(CompanyName,EmailAddress) 的唯一数据集:
公司名称 | 姓名 | 电子邮件地址 | 费率 |
---|---|---|---|
大型机构A | 约翰 | john@example.com | 105 |
大型机构B | 迈克 | mike@example.com | 130 |
大型机构C | 布拉德 | brad@example.com | 110 |
当接收可能在不同时间段内导入的多个文件,比如每日导入平面文件时,我们该如何实现这一目标呢?
英文:
Let's say you are ingesting a file or maybe even multiple files over different periods of time. If the records contained in the incoming flowfile are a daily load from a flat file for example, you need a way to detect and remove duplicate values based on specific fields.
Given this dataset below, we want to remove duplicates where the company name and the email address combination are unique:
CompanyName | Name | EmailAddress | Rate |
---|---|---|---|
Big Org A | John | john@example.com | 105 |
Big Org B | Mike | mike@example.com | 130 |
Big Org A | John | john@example.com | 140 |
Big Org C | Brad | brad@example.com | 110 |
So that we would be left with this unique dataset based on UNIQUE(CompanyName,EmailAddress):
CompanyName | Name | EmailAddress | Rate |
---|---|---|---|
Big Org A | John | john@example.com | 105 |
Big Org B | Mike | mike@example.com | 130 |
Big Org C | Brad | brad@example.com | 110 |
How could we go about achieving this when receiving multiple files over possibly different time periods like a daily flat file import?
答案1
得分: 0
去重记录 NiFi 处理器块
去重记录 处理器 块可以根据您选择的过滤器类型,从包含多个记录的流文件中移除行级重复项,可使用哈希集或布隆过滤器。
布隆过滤器 将提供恒定(高效)的内存空间,但以概率方式检测重复项。该处理器允许您设置精度级别,以确定布隆过滤器的内存利用效率。
您还可以将过滤器策略设置为使用哈希集,以绝对保证重复项的检测,但会增加内存使用量。对于非常大的数据集,如果数据集中有一些误判不是问题的情况,您可以考虑使用布隆过滤器。
然后,您可以在处理器块的动态属性中定义记录路径值,如下所示:
记录路径 | 值 |
---|---|
/公司名称 |
${field.value} |
/电子邮件地址 |
${field.value} |
然后,这两个字段将被连接在一起(默认连接字符为 ~
),然后进行哈希处理(或不进行,具体取决于您如何配置处理器)。该值然后与存储在布隆过滤器或哈希集中的值进行比较,以确定记录是否重复。
英文:
DeduplicateRecord NiFi Processor Block
The DeduplicateRecord processor block can remove row-level duplicates from a flowfile containing multiple records using either a hash set or a bloom filter depending on the filter type you choose.
A bloom filter will provide constant (efficient) memory space at the expense of probabilisitic duplicate detection. The processor allows you to set the level of precision which will determine how space efficient the bloom filter will be.
You can also set the filter strategy to use a hash set for absolute guarantees about duplicate detection but at the expense of more memory usage. For very large datasets you may want to consider a bloom filter if a small number of false negatives in your dataset are ok.
So then you would define record path values in the dynamic properties of the processor block like this:
RecordPath | Value |
---|---|
/CompanyName |
${field.value} |
/EmailAddress |
${field.value} |
These two fields are then concatenated together (the default join character is ~
) and then hashed (or not depending on how you configure the processor). That value is then compared and stored in the BloomFilter or HashSet to determine if the record is a duplicate.
答案2
得分: 0
使用MergeContent处理器(使用属性策略
为保留所有唯一属性
和关联属性名称
为您的合并键CompanyName和EmailAddress)是一种备选方法。
然而,这种方法不适用于以下要求
在接收可能在不同时间段内的多个文件时,如每日扁平文件导入,我们应该如何实现这一目标?
我不确定NiFi是否适用于这种用例 - 为了实现这一目标,我将使用Wait
处理器(将当前日期设置为分布式缓存服务
键)和/或RouteOnAttribute
处理器(持续检查今天是否是currentDate+1)。成功
结果应该连接到前面讨论的MergeContent
处理器。
注意:MergeContent处理器将需要其键成为属性的一部分(取决于输入正文是JSON/XML/CSV,您将需要一个Split JSON
处理器 + EvaluateJsonPath
处理器)
英文:
Using MergeContent processor (with Attribute Strategy
as Keep all unique attributes
and Correlation Attribute Name
with your merge keys CompanyName and EmailAddress) is an alternate approach.<br>
However, this approach is not suitable for the below requirement
>How could we go about achieving this when receiving multiple files over possibly different time periods like a daily flat file import?
I'm unsure whether NiFi is meant for this use case - In order to achieve this, I would use a Wait
processor (with currentDate set as Distributed Cache Service
key) and/or RouteOnAttribute
processor (that keeps checking whether today is currentDate+1). The success
outcome should be connected to the MergeContent
processor discussed earlier.<br>
Note: MergeContent processor will need it's keys to be part of attribute (depending on whether the input body is JSON/XML/CSV you will need a Split JSON
processor + EvaluateJsonPath
processor)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论