如何在NiFi中基于行级特定字段值检测重复记录?

huangapple go评论52阅读模式
英文:

How to detect duplicate records based on specific field values at the row level using NiFi?

问题

假设您正在处理一个或甚至多个文件,这些文件可能是在不同时间段内导入的。如果流入的流文件中包含的记录是来自例如每日平面文件的日常加载,您需要一种方法来检测并根据特定字段删除重复值。

在给定的数据集中,我们希望删除具有唯一公司名称和电子邮件地址组合的重复项:

公司名称 姓名 电子邮件地址 费率
大型机构A 约翰 john@example.com 105
大型机构B 迈克 mike@example.com 130
大型机构A 约翰 john@example.com 140
大型机构C 布拉德 brad@example.com 110

这样,我们将得到基于 UNIQUE(CompanyName,EmailAddress) 的唯一数据集:

公司名称 姓名 电子邮件地址 费率
大型机构A 约翰 john@example.com 105
大型机构B 迈克 mike@example.com 130
大型机构C 布拉德 brad@example.com 110

当接收可能在不同时间段内导入的多个文件,比如每日导入平面文件时,我们该如何实现这一目标呢?

英文:

Let's say you are ingesting a file or maybe even multiple files over different periods of time. If the records contained in the incoming flowfile are a daily load from a flat file for example, you need a way to detect and remove duplicate values based on specific fields.

Given this dataset below, we want to remove duplicates where the company name and the email address combination are unique:

CompanyName Name EmailAddress Rate
Big Org A John john@example.com 105
Big Org B Mike mike@example.com 130
Big Org A John john@example.com 140
Big Org C Brad brad@example.com 110

So that we would be left with this unique dataset based on UNIQUE(CompanyName,EmailAddress):

CompanyName Name EmailAddress Rate
Big Org A John john@example.com 105
Big Org B Mike mike@example.com 130
Big Org C Brad brad@example.com 110

How could we go about achieving this when receiving multiple files over possibly different time periods like a daily flat file import?

答案1

得分: 0

去重记录 NiFi 处理器块

去重记录 处理器 块可以根据您选择的过滤器类型,从包含多个记录的流文件中移除行级重复项,可使用哈希集或布隆过滤器。

布隆过滤器 将提供恒定(高效)的内存空间,但以概率方式检测重复项。该处理器允许您设置精度级别,以确定布隆过滤器的内存利用效率。

您还可以将过滤器策略设置为使用哈希集,以绝对保证重复项的检测,但会增加内存使用量。对于非常大的数据集,如果数据集中有一些误判不是问题的情况,您可以考虑使用布隆过滤器。

然后,您可以在处理器块的动态属性中定义记录路径值,如下所示:

记录路径
/公司名称 ${field.value}
/电子邮件地址 ${field.value}

然后,这两个字段将被连接在一起(默认连接字符为 ~),然后进行哈希处理(或不进行,具体取决于您如何配置处理器)。该值然后与存储在布隆过滤器或哈希集中的值进行比较,以确定记录是否重复。

英文:

DeduplicateRecord NiFi Processor Block

The DeduplicateRecord processor block can remove row-level duplicates from a flowfile containing multiple records using either a hash set or a bloom filter depending on the filter type you choose.

A bloom filter will provide constant (efficient) memory space at the expense of probabilisitic duplicate detection. The processor allows you to set the level of precision which will determine how space efficient the bloom filter will be.

You can also set the filter strategy to use a hash set for absolute guarantees about duplicate detection but at the expense of more memory usage. For very large datasets you may want to consider a bloom filter if a small number of false negatives in your dataset are ok.

So then you would define record path values in the dynamic properties of the processor block like this:

RecordPath Value
/CompanyName ${field.value}
/EmailAddress ${field.value}

These two fields are then concatenated together (the default join character is ~) and then hashed (or not depending on how you configure the processor). That value is then compared and stored in the BloomFilter or HashSet to determine if the record is a duplicate.

答案2

得分: 0

使用MergeContent处理器(使用属性策略保留所有唯一属性关联属性名称为您的合并键CompanyName和EmailAddress)是一种备选方法。
然而,这种方法不适用于以下要求

在接收可能在不同时间段内的多个文件时,如每日扁平文件导入,我们应该如何实现这一目标?

我不确定NiFi是否适用于这种用例 - 为了实现这一目标,我将使用Wait处理器(将当前日期设置为分布式缓存服务键)和/或RouteOnAttribute处理器(持续检查今天是否是currentDate+1)。成功结果应该连接到前面讨论的MergeContent处理器。
注意:MergeContent处理器将需要其键成为属性的一部分(取决于输入正文是JSON/XML/CSV,您将需要一个Split JSON处理器 + EvaluateJsonPath处理器)

英文:

Using MergeContent processor (with Attribute Strategy as Keep all unique attributes and Correlation Attribute Name with your merge keys CompanyName and EmailAddress) is an alternate approach.<br>
However, this approach is not suitable for the below requirement
>How could we go about achieving this when receiving multiple files over possibly different time periods like a daily flat file import?

I'm unsure whether NiFi is meant for this use case - In order to achieve this, I would use a Wait processor (with currentDate set as Distributed Cache Service key) and/or RouteOnAttribute processor (that keeps checking whether today is currentDate+1). The success outcome should be connected to the MergeContent processor discussed earlier.<br>
Note: MergeContent processor will need it's keys to be part of attribute (depending on whether the input body is JSON/XML/CSV you will need a Split JSON processor + EvaluateJsonPath processor)

huangapple
  • 本文由 发表于 2023年5月21日 21:55:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/76300247.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定