Apache Beam查找前N个元素Python SDK

huangapple go评论65阅读模式
英文:

Apache Beam Find Top N elements Python SDK

问题

我的要求是从一个BQ表中读取数据,进行一些处理,选择基于“score”列的前N行,并将其写入BQ表,同时将这些行发布为PubSub消息。

我制作了下面的示例来创建一个PCollection,并基于“score”的值选择这个PCollection中的前5行。

import apache_beam as beam

with beam.Pipeline() as p:
    elements = (
      p
        | beam.Create([{"name": "Bob", "score": 5}, 
                       {"name":"Adam", "score": 5},
                       {"name":"John", "score": 2},
                       {"name":"Jose", "score": 1},
                       {"name":"Tim", "score": 1},
                       {"name":"Bryan", "score": 1},
                       {"name":"Kim", "score":1}])
        | "Filter Top 5 scores" >> beam.combiners.Top.Of(5, key=lambda t: t['score']) 
        | "Print" >> beam.Map(print)
        )
   # 写入BQ
   # 发布到PubSub

这返回的是一个列表,而不是一个PCollection。因此,以这种格式,我无法将其写回BQ表,也无法发布到PubSub。

选择前N个元素但保持它们作为PCollection的最佳方法是什么?

在我的实际用例中,我可能有大约200万行数据,需要根据一个列选择80万条记录。此外,在其中一个Apache Beam峰会的视频中,我记得听说Top函数将结果保存在一个工作节点上,而不是分布式保存。因此,我认为它返回一个列表。在此之前,它可能会中断的最大行数是多少?

英文:

My requirement is to read from a BQ table, do some processing, select the Top N rows on the "score" column and write it to the BQ Table and also publish the rows as a PubSub message.

I made a sample below to create a PCollection and select top 5 rows from this PCollection based on the value of "score".

import apache_beam as beam

with beam.Pipeline() as p:
    elements = (
      p
        | beam.Create([{"name": "Bob", "score": 5}, 
                       {"name":"Adam", "score": 5},
                       {"name":"John", "score": 2},
                       {"name":"Jose", "score": 1},
                       {"name":"Tim", "score": 1},
                       {"name":"Bryan", "score": 1},
                       {"name":"Kim", "score":1}])
        | "Filter Top 5 scores" >> beam.combiners.Top.Of(5, key=lambda t: t['score']) 
        | "Print" >> beam.Map(print)
        )
   # Write to BQ
   # Publish to PubSub

This returns a list instead of a PCollection. Hence I'm unable to write it back to BQ table nor publish to PubSub in this format.

What is the best way to select the top N elements but keep them as a PCollection?

In my real use case I might have around 2 million rows and I need to select 800k records from it based on a column.
Also in one of the Apache Beam summit videos I remember hearing that the Top function will keep the results in one worker node instead of keeping it across. So thats why i assume it is a list. What would be the maximum number of the rows before which it could break?

答案1

得分: 2

为解决您的问题,您可以在 Top 操作之后添加一个 FlatMap 转换:

def test_pipeline(self):
    with TestPipeline() as p:
        elements = (
            p
            | beam.Create([{"name": "Bob", "score": 5},
                           {"name": "Adam", "score": 5},
                           {"name": "John", "score": 2},
                           {"name": "Jose", "score": 1},
                           {"name": "Tim", "score": 1},
                           {"name": "Bryan", "score": 1},
                           {"name": "Kim", "score": 1}])
            | "Filter Top 5 scores" >> beam.combiners.Top.Of(5, key=lambda t: t['score'])
            | "Flatten" >> beam.FlatMap(lambda e: e)
            | "Print" >> beam.Map(print)
        )

在这种情况下,结果是 DictPCollection,而不是 List[Dict]PCollection

在处理大数据量时,需要小心,因为 beam.combiners.Top.Of 操作只在一个 worker 中执行。

如果需要的话,您还可以通过 Dataflow Prime 进行优化,使您的 Dataflow 作业更加强大(适当调整,工作节点内的垂直自动缩放等等)。

英文:

To solve your issue, you can add a FlatMap transformation after the Top operator :

def test_pipeline(self):
    with TestPipeline() as p:
        elements = (
                p
                | beam.Create([{"name": "Bob", "score": 5},
                               {"name": "Adam", "score": 5},
                               {"name": "John", "score": 2},
                               {"name": "Jose", "score": 1},
                               {"name": "Tim", "score": 1},
                               {"name": "Bryan", "score": 1},
                               {"name": "Kim", "score": 1}])
                | "Filter Top 5 scores" >> beam.combiners.Top.Of(5, key=lambda t: t['score'])
                | "Flatten" >> beam.FlatMap(lambda e: e)
                | "Print" >> beam.Map(print)
        )

In this case the result is a PCollection of Dict instead of a PCollection of List[Dict].

You have to be carreful with high volume because the beam.combiners.Top.Of operation is done only in one worker.

You can also optimize and make your Dataflow job more powerful with Dataflow Prime if needed (Right Fitting, Vertical Autoscaling inside a Worker...).

huangapple
  • 本文由 发表于 2023年2月16日 13:52:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/75468333.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定