英文:
Wanted to dynamically name and create tables by filtering events in a pipeline using Apache Beam?
问题
以下是您要翻译的内容:
我有一个使用情景,我在事件驱动架构中监听发布/订阅,并希望动态存储和插入数据到表中。如果在流中注意到新的eventName,例如,如果我有一个包含在属性中的用户创建流数据(这是一个Python字典)的流数据,如CREATED,那么应该自动在用于该用例的特定数据集中创建一个表,并开始将数据写入该特定表的新创建表中。我尝试了以下方法,但对我没有起作用,请帮忙:
class FilterEvents(beam.DoFn):
def process(self, element):
events = []
event_name = element['event_name']
for i in event_name:
res.extend(list(i.values()))
res=list(set(res))
if event_name in events:
yield element
filtered_events = events | beam.ParDo(FilterEvents())
# Step 4: Use the Partition transform to split the filtered events into multiple PCollections based on the event name.
def partition_fn(element):
event_name = element['event_name']
if event_name == 'event1':
return 0
elif event_name == 'event2':
return 1
elif event_name == 'event3':
return 2
partitioned_events = filtered_events | beam.Partition(partition_fn, 3)
# Step 5: Write the events in each partition to a separate table.
def write_to_table(elements, table_name):
elements | beam.GroupByKey() | beam.io.WriteToBigQuery(
table=table_name,
# schema=<table_schema>,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
partitioned_events[0] | beam.Map(lambda element: (element['event_name'], element)) | beam.WindowInto(window.FixedWindows(60)) | beam.ParDo(lambda x: write_to_table(x[1], "event1_table"))
partitioned_events[1] | beam.Map(lambda element: (element['event_name'], element)) | beam.WindowInto(window.FixedWindows(60)) | beam.ParDo(lambda x: write_to_table(x[1], "event2_table"))
partitioned_events[2] | beam.Map(lambda element: (element['event_name'], element)) | beam.WindowInto(window.FixedWindows(60)) | beam.ParDo(lambda x: write_to_table(x[1], "event3_table"))
pipeline.run()
英文:
I had a use-case where I was listening from pub/sub in an event driven architecture and wanted to store and insert data into tables dynamically. I wanted to make new table if a new eventName is noticed in the stream, for example if I have a streaming data of user creation with a name in attributes (which is a python dict), as CREATED, so what should happen is this should automatically create a table in that specific dataset which is used for the use case and start writing data into that newly made table of that specific table. I've tried the following thing and it hasn't worked for me, please help:
class FilterEvents(beam.DoFn):
def process(self, element):
events = []
event_name = element['event_name']
for i in event_name:
res.extend(list(i.values()))
res=list(set(res))
if event_name in events:
yield element
filtered_events = events | beam.ParDo(FilterEvents())
# Step 4: Use the Partition transform to split the filtered events into multiple PCollections based on the event name.
def partition_fn(element):
event_name = element['event_name']
if event_name == 'event1':
return 0
elif event_name == 'event2':
return 1
elif event_name == 'event3':
return 2
partitioned_events = filtered_events | beam.Partition(partition_fn, 3)
# Step 5: Write the events in each partition to a separate table.
def write_to_table(elements, table_name):
elements | beam.GroupByKey() | beam.io.WriteToBigQuery(
table=table_name,
# schema=<table_schema>,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
partitioned_events[0] | beam.Map(lambda element: (element['event_name'], element)) | beam.WindowInto(window.FixedWindows(60)) | beam.ParDo(lambda x: write_to_table(x[1], "event1_table"))
partitioned_events[1] | beam.Map(lambda element: (element['event_name'], element)) | beam.WindowInto(window.FixedWindows(60)) | beam.ParDo(lambda x: write_to_table(x[1], "event2_table"))
partitioned_events[2] | beam.Map(lambda element: (element['event_name'], element)) | beam.WindowInto(window.FixedWindows(60)) | beam.ParDo(lambda x: write_to_table(x[1], "event3_table"))
pipeline.run()
答案1
得分: 2
这可以通过使用 Apache Beam 的标准 WriteToBigQuery
sink 完成(参见文档)。
特别是,您可以在 table
参数中提供一个自定义函数,该函数以要写入 BigQuery 的 element
作为参数,并返回表示表名的字符串。因此,您可以跳过整个分区,只需执行类似于以下操作:
filtered_events
| beam.io.gcp.bigquery.WriteToBigQuery(
table=lambda element: f"{element['event_name']}_table", # <- 在此处用自定义逻辑替换
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
在这里,您可以将简单的 lambda 函数替换为更复杂的自定义方法,以适应您的情况。甚至可以提供一个侧面输入,您也可以在此方法中访问它(参见这里)。
英文:
This can be done using the standard WriteToBigQuery
sink of Apache beam (see docs).
In particular, you can provide a custom function in the table
argument, which has as an argument the element
you want to write to BigQuery and returns a string representing the table name. So you can skip the whole partition and just do something like this:
filtered_events
| beam.io.gcp.bigquery.WriteToBigQuery(
table=lambda element: f"{element['event_name']}_table", # <- replace here with custom logic
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
where you may replace the simple lambda with a more sophisticated custom method, which suits your case. It is even possible to provide a side input which you can access in this method as well (see here).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论