英文:
Apache Beam count of unique elements
问题
我有一个Apache Beam作业,从PubSub中摄取数据,然后加载到BigQuery中,
我将PubSub消息转换为具有以下字段的POJO:
> id,
> 名称,计数
计数表示单个摄取中非唯一元素的计数。
如果我从PubSub加载3个元素,其中两个元素相同,那么我需要加载2个元素到BigQuery中,其中一个元素的计数将为2。
我想知道如何在Apache Beam中轻松实现它。
我尝试使用DoFn或MapElements来实现它,但我只能处理单个元素。
我还尝试将元素转换为KV,然后计数,但我有非确定性编码器。
在通常的Java应用程序中,我可以简单地使用equals或通过Map,但在这里在Apache Beam中一切都不同。
英文:
I have an Apache Beam job, which injest data from PubSub and then load into BigQuery,
I transform PubSub message to pojo with fields
> id,
> name, count
Count mean the count of not unique elements into single ingest.
If i load from PubSub 3 elements, two of which are same, then i need to load into BigQuery 2 elements, one of them will have count 2.
I wonder how easily make it in Apache Beam.
I tried to make it wia DoFn or MapElements, but there i can process only single element.
I also tried to convert element to KV, and then count, but i have non determenistics coder.
In usual java app i can simple use equals or via Map, but here in Apache beam all is different.
答案1
得分: 0
简单而正确的方法是使用Count.<T>perElement()
,像这样:
Pipeline p = ...;
PCollection<T> elements = p.apply(...); // 读取元素
PCollection<KV<T, Long>> elementsCounts =
elements.apply(Count.<T>perElement());
PCollection<TableRow> results = elementsCounts.apply(ParDo.of(
new FormatOutputFn()));
不过,确实需要为此具有确定性元素编码器。因此,如果不是这种情况(根据您上面的说法我理解是这样),您需要在Count
之前添加一步,将元素转换为不同的表示形式,在这种表示形式下可以有一个确定性编码器(例如AvroCoder
)。
如果出于某些原因不可能实现上述方法,另一种解决方法是为每个元素计算唯一的哈希值(但哈希值也必须是确定性的),为每个元素创建一个带有新哈希作为“键”的KV
,并将元素作为“值”,然后在下游使用GroupByKey
来获得相同值的分组元组。
此外,请注意,由于PubSub
是无界源,您需要通过任何类型的“窗口”策略(除了“全局”策略)对输入进行“窗口化”,因为所有的分组/合并操作都应在窗口内完成。可以参考WindowedWordCount,这是一个类似问题的解决方案示例。
英文:
The simple and right approach would be to use Count.<T>perElement()
, like this :
Pipeline p = ...;
PCollection<T> elements = p.apply(...); // read elements
PCollection<KV<T, Long>> elementsCounts =
elements.apply(Count.<T>perElement());
PCollection<TableRow> results = elementsCounts.apply(ParDo.of(
new FormatOutputFn()));
Though, right, you need to have a deterministic elements coder for that. So if it's not case (as I understand from what you said above) you need to add a step before Count
to transform an element into different representation where it will be possible to have a deterministic coder (like AvroCoder
, for example).
If it's not possible for some reasons, then another workaround could be to calculate an uniq hash for every element (but the hash value must be deterministic as well), create a KV
for every element with new hash as a Key
and element as a Value
and use GroupByKey
downstream to have a grouped tuple of the same values.
Also, please note, that since PubSub
is an unbounded source, you need to "window" your input by any type of Windows
strategy (except Global
one) since all your group/combine operations should be done inside a window. Take a look on WindowedWordCount as an example of solution for similar problem.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论