将Apache Beam的行转换为Java DataFlow管道中的BigQuery TableFlows该怎么做?

huangapple go评论43阅读模式
英文:

How can I convert Apache Beam Rows to BigQuery TableFlows in a Java DataFlow pipeline?

问题

我需要使用Java在DataFlow管道中将数据写入BigQuery,因此需要获取一个TableRow的PCollection。

TableSchema tableSchema = // 省略
BigQueryIO.Write<TableRow> bigQuerySink =  BigQueryIO.<TableRow>write()
   .to(new TableReference()
     .setProjectId(options.getProject())
     .setDatasetId(options.getBigQueryDataset())
     .setTableId(options.getBigQueryTable()))
    .withSchema(tableSchema)
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

到目前为止,在我的管道中,我有来自Apache Beam org.apache.beam.sdk.values.Row 的通用Row对象的PCollection。我还有Beam模式Schema

因此,我需要实现一个DoFn<Row, TableRow>,我可能会想出一些东西,但是支持多值类型如ARRAY(INT64)INT64以及REPLICATED等并不容易。

我觉得我正在重新发明轮子,这个类已经存在了吗?我是否需要这样的映射?

我查看了Dataflow模板和Google云库中的这个映射,但没有找到它。

英文:

I need to write data into a BigQuery within a DataFlow pipeline using Java. As such i need to get a PCollection of TableRow

TableSchema tableSchema = // omitted
BigQueryIO.Write&lt;TableRow&gt; bigQuerySink =  BigQueryIO.&lt;TableRow&gt;write()
   .to(new TableReference()
     .setProjectId(options.getProject())
     .setDatasetId(options.getBigQueryDataset())
     .setTableId(options.getBigQueryTable()))
    .withSchema(tableSchema)
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);

As of now, in my pipeline, I have a PCollection of generic Row objects from Apache Beam org.apache.beam.sdk.values.Row. I also have the beam schema Schema.

So i need to implement a DoFn&lt;Row, TableRow&gt; and I will probably come up with something but it is not easy supporting multi valued types ARRAY(INT64) to INT64 with REPLICATED etc.

My feeling is I am reinventing the wheel, does this class already exist ? Do I need such a mapping ?

I look at this mapper in the Dataflow templates and the google cloud libraries without finding it.

答案1

得分: 0

"I found the Mapper I was looking in BigQueryUtils class here
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java#L508-L516

Required dependency is:

&lt;dependency&gt;
 &lt;groupId&gt;org.apache.beam&lt;/groupId&gt;
 &lt;artifactId&gt;beam-sdks-java-io-google-cloud-platform&lt;/artifactId&gt;
&lt;/dependency&gt;
```"

<details>
<summary>英文:</summary>

Answering my question (seems like posting out your issue makes you twice shaper)

I found the Mapper I was looking in `BigQueryUtils` class here
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java#L508-L516

Required dependency is:

```xml
&lt;dependency&gt;
 &lt;groupId&gt;org.apache.beam&lt;/groupId&gt;
 &lt;artifactId&gt;beam-sdks-java-io-google-cloud-platform&lt;/artifactId&gt;
&lt;/dependency&gt;

huangapple
  • 本文由 发表于 2023年5月26日 00:29:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76334447.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定