有没有办法在管道开始之前使用子网络?

huangapple go评论60阅读模式
英文:

is there a way to use a subnetwork before pipeline starts?

问题

I'm writing a Dataflow Pipeline. It deserializes a confluent Avro PubSub subscription, and writes it to Google BigQuery. Confluent Avro has a schema registry that we connect to in order to get the schema definition. We use Private Service Connect to have an IP of the form 192.168.x.x to connect to it.

I have this code:

         | "Write records to BigQuery" >> beam.io.Write(
             beam.io.WriteToBigQuery(
                 table=output_table,
                 dataset=output_dataset,
                 project=output_project,
                 schema=out_schema
                 create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

out_schema should be fetched from the SchemaRegistry using a function fetchSchema(). I want to create the table with out_schema when the pipeline starts if the table does not exist. I need to be able to fetch the schema before the pipeline starts in order to do that.

I can connect to the Schema Registry inside the pipeline, but I can't connect to it outside of the pipeline's functions and ParDo classes. When I try, I get 'connection refused'. I believe this is because the subnetwork I'm using is specified in the pipeline options.

Is there a way to use the subnetwork outside of the pipeline so I can connect to the schema registry before starting the pipeline?

def helper(record):
   logging.getInfo(fetchSchema()) #Works
   return record

fetchSchema() # Does not work
with beam.Pipeline(options=options) as pipeline:
   (pipeline | ... | beam.Map(lambda r: helper(r)))
英文:

I'm writing a Dataflow Pipeline. It deserializes a confluent Avro PubSub subscription, and writes it to Google Big Query. Confluent Avro has a schema registry that we connect to in order to get the schema definition. We use Private Service Connect to have an IP of the form 192.168.x.x to connect to it.

I have this code:

         | "Write records to BigQuery" >> beam.io.Write(
             beam.io.WriteToBigQuery(
                 table=output_table,
                 dataset=output_dataset,
                 project=output_project,
                 schema=out_schema
                 create_disposition=beam.io.BigQueryDisposition.
                 CREATE_IF_NEEDED,
                 write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

out_schema should be fetched from the SchemaRegistry using a function fetchSchema(). I want to create the table with out_schema when the pipeline starts if the table does not exist. I need to be able to fetch the schema before the pipeline starts in order to do that.

I can connect to the Schema Registry inside the pipeline, but I can't connect to it outside of the pipeline's functions and ParDo classes. When I try, I get 'connection refused'. I believe this is because the subnetwork I'm using is specified in the pipeline options.

Is there a way to use the subnetwork outside of the pipeline so I can connect to the schema registry before starting the pipeline?

def helper(record):
   logging.getInfo(fetchSchema()) #Works
   return record

fetchSchema() # Does not work
with beam.Pipeline(options=options) as pipeline:
   (pipeline | ... | beam.Map(lambda r: helper(r)))

答案1

得分: 1

如果我理解您的问题正确,我认为处理这个问题的一个好方法是使用 Flex Templates。当您从 Flex 模板创建作业时,将创建一个 GCE VM 来运行您的程序,该程序创建实际的数据管道(必须打包为 Docker 容器)。该启动器 VM 将位于您的 VPC/网络内,并应能够与模式注册表进行通信。

英文:

If I understand your problem correctly, I think a good approach to handle this would be to use Flex Templates.

When you create a job from a Flex Template, a GCE VM is created to run your program that creates the actual pipeline (which has to be packaged as a Docker container).

That launcher VM will be inside your VPC/network and should be able to talk to the schema registry.

huangapple
  • 本文由 发表于 2023年2月16日 04:32:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/75465178.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定