如何在BigQuery Apache Beam Java Dataflow中替换现有行

huangapple go评论109阅读模式
英文:

How to replace existing rows in BigQuery Apache Beam Java Dataflow

问题

目前,我有一个名为cdn_daily_user_playback_requests_1MONTH的BigQuery表。其中包含大量基于每日记录的数据。因此,会有来自整个2023年7月、2023年8月等整月数据。现在,例如,如果我想从2023年7月创建新数据并将其写入该BigQuery表,而该表已经包含来自2023年7月的记录,我应该如何在我的Java Apache Beam代码中执行此操作(替换表中的当前数据为新数据)?

我的管道代码如下:

pipeline
            .apply("Read from cdn_requests BigQuery", BigQueryIO
                    .read(new CdnMediaRequestLogEntity.FromSchemaAndRecord())
                    .fromQuery(cdnRequestsQueryString)
                    .usingStandardSql())
            .apply("Validate and Filter Cdn Media Request Log Objects", Filter.by(new CdnMediaRequestValidator()))
            .apply("Convert Cdn Logs To Key Value Pairs", ParDo.of(new CdnMediaRequestResponseSizeKeyValuePairConverter()))
            .apply("Sum the Response Sizes By Key", Sum.longsPerKey())
            .apply("Convert To New Daily Requests Objects", ParDo.of(new CdnDailyRequestConverter(projectId, kind)))
            .apply("Convert Cdn Media Request Entities to Big Query Objects", ParDo.of(new BigQueryCdnDailyRequestRowConverter()))
            .apply("Write Data To BigQuery", BigQueryIO.writeTableRows()
                .to(writeCdnMediaRequestTable)
                .withSchema(cdnDailyRequestSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

我尝试过并测试了BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE函数,但据我理解,这会删除表中的所有数据,然后写入新创建的数据。但我只想删除仅针对2023年7月的数据,而不是全部数据。

英文:

Currently, I have BigQuery table called cdn_daily_user_playback_requests_1MONTH. This contains large amounts of data based from a daily basis of records. So there would be like data from the whole month of 2023-07, 2023-08, etc. Now, say for example that I want to create new data from 2023-07 and write it into that BigQuery table and that table already has records from the 2023-07 month, how do I do this (replacing the current data in the table to the new one I have) in my Apache Beam code in Java?

My pipeline code is here:

pipeline
            .apply("Read from cdn_requests BigQuery", BigQueryIO
                    .read(new CdnMediaRequestLogEntity.FromSchemaAndRecord())
                    .fromQuery(cdnRequestsQueryString)
                    .usingStandardSql())
            .apply("Validate and Filter Cdn Media Request Log Objects", Filter.by(new CdnMediaRequestValidator()))
            .apply("Convert Cdn Logs To Key Value Pairs", ParDo.of(new CdnMediaRequestResponseSizeKeyValuePairConverter()))
            .apply("Sum the Response Sizes By Key", Sum.longsPerKey())
            .apply("Convert To New Daily Requests Objects", ParDo.of(new CdnDailyRequestConverter(projectId, kind)))
            .apply("Convert Cdn Media Request Entities to Big Query Objects", ParDo.of(new BigQueryCdnDailyRequestRowConverter()))
            .apply("Write Data To BigQuery", BigQueryIO.writeTableRows()
                .to(writeCdnMediaRequestTable)
                .withSchema(cdnDailyRequestSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

I did tried and tested the BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE function but in my understanding, this removes all the data inside that table and then writes the newly created ones in it. But I only want to remove the data only for 2023-07's month and not everything.

答案1

得分: 2

这是解决方案的代码部分,已经被翻译好了,没有其他内容:

public class BigQueryDayPartitionDestinations implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {

    private final String projectId;
    private final String datasetId;
    private final String pattern;
    private final String table;

    public static BigQueryDayPartitionDestinations writePartitionsPerDay(String projectId, String datasetId, String tablePrefix) {
        return new BigQueryDayPartitionDestinations(projectId, datasetId, "yyyyMMdd", tablePrefix + "$");
    }

    private BigQueryDayPartitionDestinations(String projectId, String datasetId, String pattern, String table) {
        this.projectId = projectId;
        this.datasetId = datasetId;
        this.pattern = pattern;
        this.table = table;
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<TableRow> input) {
        DateTimeFormatter partition = DateTimeFormat.forPattern(pattern).withZone(DateTimeZone.forID("Asia/Tokyo"));
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.forID("Asia/Tokyo"));

        TableReference reference = new TableReference();
        reference.setProjectId(this.projectId);
        reference.setDatasetId(this.datasetId);

        var date = input.getValue().get("Date").toString();
        DateTime dateTime = formatter.parseDateTime(date);

        var tableId = table + dateTime.toInstant().toString(partition);

        reference.setTableId(tableId);
        return new TableDestination(reference, null, new TimePartitioning().setType("DAY").setField("Date"));
   }
}
英文:

Solution:
So I found a solution and it worked by creating a SerializableFunction which takes the partition key as an identifier (my table was partitioned on the Date column which has a Datatype of Date) upon writing it in BigQuery. So what happens is that it only takes out parts of the table by Partitioned column.

This is my sample code for the solution:

public class BigQueryDayPartitionDestinations implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {
private final String projectId;
private final String datasetId;
private final String pattern;
private final String table;
public static BigQueryDayPartitionDestinations writePartitionsPerDay(String projectId, String datasetId, String tablePrefix) {
return new BigQueryDayPartitionDestinations(projectId, datasetId, "yyyyMMdd", tablePrefix + "$");
}
private BigQueryDayPartitionDestinations(String projectId, String datasetId, String pattern, String table) {
this.projectId = projectId;
this.datasetId = datasetId;
this.pattern = pattern;
this.table = table;
}
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> input) {
DateTimeFormatter partition = DateTimeFormat.forPattern(pattern).withZone(DateTimeZone.forID("Asia/Tokyo"));
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.forID("Asia/Tokyo"));
TableReference reference = new TableReference();
reference.setProjectId(this.projectId);
reference.setDatasetId(this.datasetId);
var date = input.getValue().get("Date").toString();
DateTime dateTime = formatter.parseDateTime(date);
var tableId = table + dateTime.toInstant().toString(partition);
reference.setTableId(tableId);
return new TableDestination(reference, null, new TimePartitioning().setType("DAY").setField("Date"));
}
}

huangapple
  • 本文由 发表于 2023年8月10日 15:46:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/76873613.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定