英文:
is there a way to use a subnetwork before pipeline starts?
问题
I'm writing a Dataflow Pipeline. It deserializes a confluent Avro PubSub subscription, and writes it to Google BigQuery. Confluent Avro has a schema registry that we connect to in order to get the schema definition. We use Private Service Connect to have an IP of the form 192.168.x.x to connect to it.
I have this code:
| "Write records to BigQuery" >> beam.io.Write(
beam.io.WriteToBigQuery(
table=output_table,
dataset=output_dataset,
project=output_project,
schema=out_schema
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
out_schema
should be fetched from the SchemaRegistry using a function fetchSchema()
. I want to create the table with out_schema
when the pipeline starts if the table does not exist. I need to be able to fetch the schema before the pipeline starts in order to do that.
I can connect to the Schema Registry inside the pipeline, but I can't connect to it outside of the pipeline's functions and ParDo classes. When I try, I get 'connection refused'. I believe this is because the subnetwork I'm using is specified in the pipeline options.
Is there a way to use the subnetwork outside of the pipeline so I can connect to the schema registry before starting the pipeline?
def helper(record):
logging.getInfo(fetchSchema()) #Works
return record
fetchSchema() # Does not work
with beam.Pipeline(options=options) as pipeline:
(pipeline | ... | beam.Map(lambda r: helper(r)))
英文:
I'm writing a Dataflow Pipeline. It deserializes a confluent Avro PubSub subscription, and writes it to Google Big Query. Confluent Avro has a schema registry that we connect to in order to get the schema definition. We use Private Service Connect to have an IP of the form 192.168.x.x to connect to it.
I have this code:
| "Write records to BigQuery" >> beam.io.Write(
beam.io.WriteToBigQuery(
table=output_table,
dataset=output_dataset,
project=output_project,
schema=out_schema
create_disposition=beam.io.BigQueryDisposition.
CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
out_schema should be fetched from the SchemaRegistry using a function fetchSchema()
. I want to create the table with out_schema when the pipeline starts if the table does not exist. I need to be able to fetch the schema before the pipeline starts in order to do that.
I can connect to the Schema Registry inside the pipeline, but I can't connect to it outside of the pipeline's functions and ParDo classes. When I try, I get 'connection refused'. I believe this is because the subnetwork I'm using is specified in the pipeline options.
Is there a way to use the subnetwork outside of the pipeline so I can connect to the schema registry before starting the pipeline?
def helper(record):
logging.getInfo(fetchSchema()) #Works
return record
fetchSchema() # Does not work
with beam.Pipeline(options=options) as pipeline:
(pipeline | ... | beam.Map(lambda r: helper(r)))
答案1
得分: 1
如果我理解您的问题正确,我认为处理这个问题的一个好方法是使用 Flex Templates。当您从 Flex 模板创建作业时,将创建一个 GCE VM 来运行您的程序,该程序创建实际的数据管道(必须打包为 Docker 容器)。该启动器 VM 将位于您的 VPC/网络内,并应能够与模式注册表进行通信。
英文:
If I understand your problem correctly, I think a good approach to handle this would be to use Flex Templates.
When you create a job from a Flex Template, a GCE VM is created to run your program that creates the actual pipeline (which has to be packaged as a Docker container).
That launcher VM will be inside your VPC/network and should be able to talk to the schema registry.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论