将每隔X条来自Pubsub的消息写入Cloud Storage。

huangapple go评论85阅读模式
英文:

Write to Cloud Storage every X messages from Pubsub

问题

以下是翻译好的内容:

public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))) // 将窗口大小设置为5秒钟
            .withAllowedLateness(Duration.standardSeconds(5)) // 允许5秒钟的迟到数据
            .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1))) // 每个窗口内第一个元素处理后1秒触发计算
            .discardingFiredPanes()
        )
        .apply("EnrichDataFromAPI", ParDo.of(
            new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String jsonString = c.element();
                    // 在这部分添加代码,将 JSON 字符串反序列化为 Java 对象,调用外部 API 进行数据丰富,然后再次序列化为 JSON 字符串
                    // ... 反序列化,调用 API,再次序列化 ...
                    String enrichedJSONString = ...; // 设置为丰富后的 JSON 字符串
                    c.output(enrichedJSONString);
                }
            }
        ))
        .apply("WriteToGCS", 
            TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
    ;
    
    
    PipelineResult result = pipeline.run();
}

关于窗口(Windowing)的配置,你已经设定了一个固定窗口(Fixed Window)的窗口大小为5秒,允许迟到数据5秒,且触发计算的条件是每个窗口内的第一个元素处理后1秒。这样的窗口配置对于你的数据处理需求是合理的。

至于代码中的其他部分,你需要在 "EnrichDataFromAPI" 步骤中添加适当的代码来完成 JSON 的反序列化、API 调用和再次序列化。对于 JSON 的序列化和反序列化,你可以使用类似 Jackson 这样的库来帮助处理。

希望这些补充的信息能够帮助你更好地理解和完成你的数据流处理任务。

英文:

I am new to Cloud Dataflow / Apache Beam, so the concept/programming is still hazy to me.

What I want to do is that Dataflow listens to Pubsub and gets messages of this format in JSON:

{
&quot;productId&quot;: &quot;...&quot;,
&quot;productName&quot;: &quot;...&quot;
}

And transform that to:

{
&quot;productId&quot;: &quot;...&quot;,
&quot;productName&quot;: &quot;...&quot;,
&quot;sku&quot;: &quot;...&quot;,
&quot;inventory&quot;: {
&quot;revenue&quot;: &lt;some Double&gt;,
&quot;stocks&quot;:  &lt;some Integer&gt;
}
}

So the steps needed are:

  1. (IngestFromPubsub) Get records from Pubsub by listening to a topic (1 Pubsub message = 1 record)

  2. (EnrichDataFromAPI)

a. Deserialize the payload's JSON string into Java object

b. By calling an external API, using the sku, I can enrich the data of each record by adding the inventory attribute.

c. Serialize the records again.

  1. (WriteToGCS) Then every x number (can be parameterized) records, I need to write these in Cloud Storage.
    Please consider also the trivial case that x=1.
    (Does x=1, a good idea? I am afraid there will be too many Cloud Storage writes)

Even though I am a Python guy, I am already having difficulty doing this in Python, more so that I need to do write in Java. I am getting headache reading Beam's example in Java, it's too verbose and difficult to follow. All I understand is that each step is an .apply to the PCollection.

So far, here is the result of my puny effort:

public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(&quot;IngestFromPubsub&quot;, PubsubIO.readStrings().fromTopic(options.getTopic()))
// I don&#39;t really understand the next part, I just copied from official documentation and filled in some values
.apply(Window.&lt;String&gt;into(FixedWindows.of(Duration.millis(5000)))
.withAllowedLateness(Duration.millis(5000))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
.discardingFiredPanes()
)
.apply(&quot;EnrichDataFromAPI&quot;, ParDo.of(
new DoFn&lt;String, String&gt;() {
@ProcessElement
public void processElement(ProcessContext c) {
c.element();
// help on this part, I heard I need to use Jackson but I don&#39;t know, for API HttpClient is sufficient
// ... deserialize, call API, serialize again ...
c.output(enrichedJSONString);
}
}
))
.apply(&quot;WriteToGCS&quot;, 
TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
;
PipelineResult result = pipeline.run();
}

Please fill in the missing parts, and also give me a tip on Windowing (e.g. what's the appropriate configuration etc.) and in which steps should I insert/apply it.

答案1

得分: 2

  • 我认为你在 IngestFromPubsubEnrichDataFromAPI 中都不需要使用窗口操作。窗口操作的目的是将在时间上相邻的记录分组到窗口中,以便可以对它们进行聚合计算。但由于你并没有进行任何聚合计算,并且只对每个记录进行独立处理,所以你不需要使用窗口。

  • 因为你始终是将一个输入记录转换为一个输出记录,所以你的 EnrichDataFromAPI 应该使用 MapElements。这应该会使代码更加简洁。

  • 有关在 Apache Beam Java 中处理 JSON 的资源可以参考这里:https://stackoverflow.com/questions/50334835/apache-beam-stream-processing-of-json-data

  • 你不一定需要使用 Jackson 将 JSON 映射到 Java 对象。你可能能够直接操作 JSON。你可以使用 Java 的原生 JSON API来解析/操作/序列化。

英文:
  • I don't think you need any of the windowing in your IngestFromPubsub and EnrichDataFromAPI. The purpose of windowing is to group your records that are nearby in time together into windows so you can compute aggregate computations over them. But since you are not doing any aggregate computations, and are interested in dealing with each record independently, you don't need windows.

  • Since you are always converting one input record to one output record, your EnrichDataFromAPI should be a MapElements. This should make the code easier.

  • There are resources out there for processing JSON in Apache Bean Java: https://stackoverflow.com/questions/50334835/apache-beam-stream-processing-of-json-data

  • You don't necessarily need to use Jackson to map the JSON to a Java object. You might be able to manipulate the JSON directly. You can use Java's native JSON API to parse/manipulate/serialize.

huangapple
  • 本文由 发表于 2020年4月5日 21:40:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/61043520.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定