Apache Beam 唯一元素的计数

huangapple go评论130阅读模式
英文:

Apache Beam count of unique elements

问题

我有一个Apache Beam作业,从PubSub中摄取数据,然后加载到BigQuery中,
我将PubSub消息转换为具有以下字段的POJO:

> id,
> 名称,计数

计数表示单个摄取中非唯一元素的计数。

如果我从PubSub加载3个元素,其中两个元素相同,那么我需要加载2个元素到BigQuery中,其中一个元素的计数将为2。

我想知道如何在Apache Beam中轻松实现它。
我尝试使用DoFn或MapElements来实现它,但我只能处理单个元素。
我还尝试将元素转换为KV,然后计数,但我有非确定性编码器。

在通常的Java应用程序中,我可以简单地使用equals或通过Map,但在这里在Apache Beam中一切都不同。

英文:

I have an Apache Beam job, which injest data from PubSub and then load into BigQuery,
I transform PubSub message to pojo with fields

> id,
> name, count

Count mean the count of not unique elements into single ingest.

If i load from PubSub 3 elements, two of which are same, then i need to load into BigQuery 2 elements, one of them will have count 2.

I wonder how easily make it in Apache Beam.
I tried to make it wia DoFn or MapElements, but there i can process only single element.
I also tried to convert element to KV, and then count, but i have non determenistics coder.

In usual java app i can simple use equals or via Map, but here in Apache beam all is different.

答案1

得分: 0

简单而正确的方法是使用Count.<T>perElement(),像这样:

Pipeline p = ...;
PCollection<T> elements = p.apply(...); // 读取元素
PCollection<KV<T, Long>> elementsCounts =
    elements.apply(Count.<T>perElement());
PCollection<TableRow> results = elementsCounts.apply(ParDo.of(
    new FormatOutputFn()));

不过,确实需要为此具有确定性元素编码器。因此,如果不是这种情况(根据您上面的说法我理解是这样),您需要在Count之前添加一步,将元素转换为不同的表示形式,在这种表示形式下可以有一个确定性编码器(例如AvroCoder)。

如果出于某些原因不可能实现上述方法,另一种解决方法是为每个元素计算唯一的哈希值(但哈希值也必须是确定性的),为每个元素创建一个带有新哈希作为“键”的KV,并将元素作为“值”,然后在下游使用GroupByKey来获得相同值的分组元组。

此外,请注意,由于PubSub是无界源,您需要通过任何类型的“窗口”策略(除了“全局”策略)对输入进行“窗口化”,因为所有的分组/合并操作都应在窗口内完成。可以参考WindowedWordCount,这是一个类似问题的解决方案示例。

英文:

The simple and right approach would be to use Count.&lt;T&gt;perElement(), like this :

Pipeline p = ...;
PCollection&lt;T&gt; elements = p.apply(...); // read elements
PCollection&lt;KV&lt;T, Long&gt;&gt; elementsCounts =
    elements.apply(Count.&lt;T&gt;perElement());
PCollection&lt;TableRow&gt; results = elementsCounts.apply(ParDo.of(
    new FormatOutputFn()));

Though, right, you need to have a deterministic elements coder for that. So if it's not case (as I understand from what you said above) you need to add a step before Count to transform an element into different representation where it will be possible to have a deterministic coder (like AvroCoder, for example).

If it's not possible for some reasons, then another workaround could be to calculate an uniq hash for every element (but the hash value must be deterministic as well), create a KV for every element with new hash as a Key and element as a Value and use GroupByKey downstream to have a grouped tuple of the same values.

Also, please note, that since PubSub is an unbounded source, you need to "window" your input by any type of Windows strategy (except Global one) since all your group/combine operations should be done inside a window. Take a look on WindowedWordCount as an example of solution for similar problem.

huangapple
  • 本文由 发表于 2020年5月29日 17:31:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/62082803.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定