Apache Beam 在 Python SDK 上根据条件退出流水线

huangapple go评论52阅读模式
英文:

Apache Beam exit pipeline on condition Python SDK

问题

我有一个使用 Apache Beam 构建的数据流水线,该管道从 BigQuery 表中读取数据并进行一些处理。Cloud Function 将触发数据流作业。
我的要求是在 BigQuery 中检查一个日期列,如果该日期与今天相同,则停止管道继续进行下一步处理。

data = (
    pipeline
    | beam.io.ReadFromBigQuery(query='''
    SELECT date, unique_key, case_number FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
    ''', use_standard_sql=True)
    # 进一步的数据处理
)

count = (data 
    | beam.Filter(lambda line : line['date'] == datetime.now())
    | beam.combiners.Count.Globally()
)

# 进一步的数据处理

'data' PCollection 是我的实际处理数据的部分。
我考虑的解决方案是创建一个 PCollection 'count',检查日期是否与今天相同。但如何添加逻辑来检查计数是否大于 0,然后记录必要的信息并退出管道呢?

或者是否有更好的方法来实现这个目标呢?

英文:

I have an Apache Beam pipeline which reads from a BigQuery table and does a few processing. The dataflow job would be triggered by a Cloud Function.
My requirement is to check a date column in the BigQuery which it reads at the first step and stop the pipeline from proceeding to the next stages if the date is same as today.

data = (
    pipeline
    | beam.io.ReadFromBigQuery(query='''
    SELECT date, unique_key, case_number FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
    ''', use_standard_sql=True)
    # Further data processing
)

count = (data 
    | beam.Filter(lambda line : line['date'] == datetime.now())
    | beam.combiners.Count.Globally()
)

# Further data processing

The 'data' PCollection is my actual processing.
The solution which I had in mind was creating a PCollection 'count' which checks if the date is same as today. But how can I add a logic to check if the count is greater than 0 and then step exit the pipeline by logging necessary information?

Or is there a better way to do this instead?

答案1

得分: 1

你可以使用以下代码来解决你的问题:

class BadLineException(Exception):
    pass

def line_can_be_processed(self, line) -> bool:
    if line['date'] == datetime.now():
        logging.info("My info about processing if date is now, the line can't be processed")
        raise BadLineException("The line can't be processed !!!", line)
    else:
        return True
        
def test_pipeline(self):
    with TestPipeline() as p:
        data = (
                p
                | beam.io.ReadFromBigQuery(query='''
                        SELECT date, unique_key, case_number FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
                        ''', use_standard_sql=True)
            # 进一步的数据处理
        )
    
        (data
         | beam.Filter(self.line_can_be_processed)
         | beam.Map(self.fake_transform)
         )

一些解释:

  • 对于 Filter,我将逻辑放在一个独立的方法中,如果当前日期是现在,我记录所需的信息并引发自定义异常,它将停止管道并标记为失败。否则,我返回 True

如果你想对无法处理的行有更多控制,你也可以应用多个 sinks 并将它们存储在文件或 BigQuery 表中。

使用多个 sinks 你会有:

Sink 1 => 好的行
Sink 2 => 无法处理的行
英文:

You can solve your issue with the following code :

class BadLineException(Exception):
    pass

def line_can_be_processed(self, line) -> bool:
    if line['date'] == datetime.now():
        logging.info("My info about processing if date is now, the line can't be processed")
        raise BadLineException("The line can't be processed !!!", line)
    else:
        return True
        
def test_pipeline(self):
    with TestPipeline() as p:
        data = (
                p
                | beam.io.ReadFromBigQuery(query='''
                        SELECT date, unique_key, case_number FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
                        ''', use_standard_sql=True)
            # Further data processing
        )
    
        (data
         | beam.Filter(self.line_can_be_processed)
         | beam.Map(self.fake_transform)
         )

Some explanations :

  • For the Filter, I put the logic in a separated method, if the current date is now, I log the needed information and raise a custom Exception, it will stop the pipeline and mark it as failed. Otherwise I return True

If you want to have more control on the lines that could not be processed, you can also apply a multi sinks and store them in a file or a BigQuery table.

With multi sinks you will have :

Sink 1 => Good lines 
Sink 2 => Lines that could not be processed

huangapple
  • 本文由 发表于 2023年2月10日 14:04:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/75407467.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定