英文:
Apache Beam exit pipeline on condition Python SDK
问题
我有一个使用 Apache Beam 构建的数据流水线,该管道从 BigQuery 表中读取数据并进行一些处理。Cloud Function 将触发数据流作业。
我的要求是在 BigQuery 中检查一个日期列,如果该日期与今天相同,则停止管道继续进行下一步处理。
data = (
pipeline
| beam.io.ReadFromBigQuery(query='''
SELECT date, unique_key, case_number FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
# 进一步的数据处理
)
count = (data
| beam.Filter(lambda line : line['date'] == datetime.now())
| beam.combiners.Count.Globally()
)
# 进一步的数据处理
'data' PCollection 是我的实际处理数据的部分。
我考虑的解决方案是创建一个 PCollection 'count',检查日期是否与今天相同。但如何添加逻辑来检查计数是否大于 0,然后记录必要的信息并退出管道呢?
或者是否有更好的方法来实现这个目标呢?
英文:
I have an Apache Beam pipeline which reads from a BigQuery table and does a few processing. The dataflow job would be triggered by a Cloud Function.
My requirement is to check a date column in the BigQuery which it reads at the first step and stop the pipeline from proceeding to the next stages if the date is same as today.
data = (
pipeline
| beam.io.ReadFromBigQuery(query='''
SELECT date, unique_key, case_number FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
# Further data processing
)
count = (data
| beam.Filter(lambda line : line['date'] == datetime.now())
| beam.combiners.Count.Globally()
)
# Further data processing
The 'data' PCollection is my actual processing.
The solution which I had in mind was creating a PCollection 'count' which checks if the date is same as today. But how can I add a logic to check if the count is greater than 0 and then step exit the pipeline by logging necessary information?
Or is there a better way to do this instead?
答案1
得分: 1
你可以使用以下代码来解决你的问题:
class BadLineException(Exception):
pass
def line_can_be_processed(self, line) -> bool:
if line['date'] == datetime.now():
logging.info("My info about processing if date is now, the line can't be processed")
raise BadLineException("The line can't be processed !!!", line)
else:
return True
def test_pipeline(self):
with TestPipeline() as p:
data = (
p
| beam.io.ReadFromBigQuery(query='''
SELECT date, unique_key, case_number FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
# 进一步的数据处理
)
(data
| beam.Filter(self.line_can_be_processed)
| beam.Map(self.fake_transform)
)
一些解释:
- 对于
Filter
,我将逻辑放在一个独立的方法中,如果当前日期是现在,我记录所需的信息并引发自定义异常,它将停止管道并标记为失败。否则,我返回True
。
如果你想对无法处理的行有更多控制,你也可以应用多个 sinks 并将它们存储在文件或 BigQuery
表中。
使用多个 sinks 你会有:
Sink 1 => 好的行
Sink 2 => 无法处理的行
英文:
You can solve your issue with the following code :
class BadLineException(Exception):
pass
def line_can_be_processed(self, line) -> bool:
if line['date'] == datetime.now():
logging.info("My info about processing if date is now, the line can't be processed")
raise BadLineException("The line can't be processed !!!", line)
else:
return True
def test_pipeline(self):
with TestPipeline() as p:
data = (
p
| beam.io.ReadFromBigQuery(query='''
SELECT date, unique_key, case_number FROM `bigquery-public-data.chicago_crime.crime` LIMIT 100
''', use_standard_sql=True)
# Further data processing
)
(data
| beam.Filter(self.line_can_be_processed)
| beam.Map(self.fake_transform)
)
Some explanations :
- For the
Filter
, I put the logic in a separated method, if the current date is now, I log the needed information and raise a custom Exception, it will stop the pipeline and mark it as failed. Otherwise I returnTrue
If you want to have more control on the lines that could not be processed, you can also apply a multi sinks and store them in a file or a BigQuery
table.
With multi sinks you will have :
Sink 1 => Good lines
Sink 2 => Lines that could not be processed
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论