如何将Beam SQL窗口查询与KafkaIO集成?

huangapple go评论65阅读模式
英文:

How to integrate Beam SQL windowing query with KafkaIO?

问题

import avro.shaded.com.google.common.collect.ImmutableMap;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.util.ArrayList;
import java.util.List;

public class KafkaBeamSqlTest {

    private static DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) {
        PipelineOptions kafkaOption = PipelineOptionsFactory.fromArgs(args)
                .withoutStrictParsing()
                .as(PipelineOptions.class);
        Pipeline pipeline = Pipeline.create(kafkaOption);

        KafkaIO.Read<String, String> kafkaRead =
                KafkaIO.<String, String>read()
                        .withBootstrapServers("127.0.0.1:9092")
                        .withTopic("beamKafkaTest")
                        .withConsumerConfigUpdates(ImmutableMap.of("group.id", "client-1"))
                        .withReadCommitted()
                        .withKeyDeserializer(StringDeserializer.class)
                        .withValueDeserializer(StringDeserializer.class)
                        .commitOffsetsInFinalize();

        PCollection<KV<String, String>> messages = pipeline.apply(kafkaRead.withoutMetadata());

        Schema inputSchema = Schema.builder()
                .addStringField("word")
                .addDateTimeField("event_time")
                .addInt32Field("cnt")
                .build();

        PCollection<Row> rows = messages.apply(ParDo.of(new DoFn<KV<String, String>, Row>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String jsonData = c.element().getValue();

                JSONObject jsonObject = JSON.parseObject(jsonData);

                List<Object> list = new ArrayList<>();
                list.add(jsonObject.get("word"));
                list.add(dtf.parseDateTime((String) jsonObject.get("event_time")));
                list.add(jsonObject.get("cnt"));
                Row row = Row.withSchema(inputSchema)
                        .addValues(list)
                        .build();

                c.output(row);
            }
        }))
                .setRowSchema(inputSchema);

        PCollection<Row> result = PCollectionTuple.of(new TupleTag<>("t_count_stats"), rows)
                .apply(
                        SqlTransform.query(
                                "SELECT word, SUM(cnt), TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start\n" +
                                        "FROM t_count_stats\n" +
                                        "GROUP BY word, TUMBLE(event_time, INTERVAL '1' MINUTE)"
                        )
                );

        result.apply(MapElements.via(new SimpleFunction<Row, KV<String, String>>() {
            @Override
            public KV<String, String> apply(Row input) {
                return KV.of(input.getString("word"), "result=" + input.getValues());
            }
        }))
                .apply(KafkaIO.<String, String>write()
                        .withBootstrapServers("127.0.0.1:9092")
                        .withTopic("beamPrint")
                        .withKeySerializer(StringSerializer.class)
                        .withValueSerializer(StringSerializer.class));

        pipeline.run();
    }
}

(Note: This is the code portion you provided with the HTML entities like &quot; replaced with actual quotation marks " for improved code readability. However, please be aware that this code alone might not work standalone as-is, as it's part of a larger system and could require proper setup and integration with Kafka, Beam, and any necessary dependencies.)

英文:

First, we have a kafka input source in JSON format:

{&quot;event_time&quot;: &quot;2020-08-23 18:36:10&quot;, &quot;word&quot;: &quot;apple&quot;, &quot;cnt&quot;: 1}
{&quot;event_time&quot;: &quot;2020-08-23 18:36:20&quot;, &quot;word&quot;: &quot;banana&quot;, &quot;cnt&quot;: 1}
{&quot;event_time&quot;: &quot;2020-08-23 18:37:30&quot;, &quot;word&quot;: &quot;apple&quot;, &quot;cnt&quot;: 2}
{&quot;event_time&quot;: &quot;2020-08-23 18:37:40&quot;, &quot;word&quot;: &quot;apple&quot;, &quot;cnt&quot;: 1}
... ...

What I'm trying to do is to aggregate the sum of the count of each word by every minute:

+---------+----------+---------------------+
| word    | SUM(cnt) |   window_start      |
+---------+----------+---------------------+
| apple   |    1     | 2020-08-23 18:36:00 |
+---------+----------+---------------------+
| banana  |    1     | 2020-08-23 18:36:00 |
+---------+----------+---------------------+
| apple   |    3     | 2020-08-23 18:37:00 |
+---------+----------+---------------------+

So this case would be perfectly fit the following Beam SQL statement:

SELECT word, SUM(cnt), TUMBLE_START(event_time, INTERVAL &#39;1&#39; MINUTE) as window_start
FROM t_count_stats
GROUP BY word, TUMBLE(event_time, INTERVAL &#39;1&#39; MINUTE)

And below is my current working code using Beam's Java SDK to execute this streaming SQL query:

import avro.shaded.com.google.common.collect.ImmutableMap;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
public class KafkaBeamSqlTest {
private static DateTimeFormatter dtf = DateTimeFormat.forPattern(&quot;yyyy-MM-dd HH:mm:ss&quot;);
public static void main(String[] args) {
// create pipeline
PipelineOptions kafkaOption = PipelineOptionsFactory.fromArgs(args)
.withoutStrictParsing()
.as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(kafkaOption);
// create kafka IO
KafkaIO.Read&lt;String, String&gt; kafkaRead =
KafkaIO.&lt;String, String&gt;read()
.withBootstrapServers(&quot;127.0.0.1:9092&quot;)
.withTopic(&quot;beamKafkaTest&quot;)
.withConsumerConfigUpdates(ImmutableMap.of(&quot;group.id&quot;, &quot;client-1&quot;))
.withReadCommitted()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.commitOffsetsInFinalize();
// read from kafka
PCollection&lt;KV&lt;String, String&gt;&gt; messages = pipeline.apply(kafkaRead.withoutMetadata());
// build input schema
Schema inputSchema = Schema.builder()
.addStringField(&quot;word&quot;)
.addDateTimeField(&quot;event_time&quot;)
.addInt32Field(&quot;cnt&quot;)
.build();
// convert kafka message to Row
PCollection&lt;Row&gt; rows = messages.apply(ParDo.of(new DoFn&lt;KV&lt;String, String&gt;, Row&gt;(){
@ProcessElement
public void processElement(ProcessContext c) {
String jsonData = c.element().getValue();
// parse json
JSONObject jsonObject = JSON.parseObject(jsonData);
// build row
List&lt;Object&gt; list = new ArrayList&lt;&gt;();
list.add(jsonObject.get(&quot;word&quot;));
list.add(dtf.parseDateTime((String) jsonObject.get(&quot;event_time&quot;)));
list.add(jsonObject.get(&quot;cnt&quot;));
Row row = Row.withSchema(inputSchema)
.addValues(list)
.build();
System.out.println(row);
// emit row
c.output(row);
}
}))
.setRowSchema(inputSchema);
// sql query
PCollection&lt;Row&gt; result = PCollectionTuple.of(new TupleTag&lt;&gt;(&quot;t_count_stats&quot;), rows)
.apply(
SqlTransform.query(
&quot;SELECT word, SUM(cnt), TUMBLE_START(event_time, INTERVAL &#39;1&#39; MINUTE) as window_start\n&quot; +
&quot;FROM t_count_stats\n&quot; +
&quot;GROUP BY word, TUMBLE(event_time, INTERVAL &#39;1&#39; MINUTE)&quot;
)
);
// sink results back to another kafka topic
result.apply(MapElements.via(new SimpleFunction&lt;Row, KV&lt;String, String&gt;&gt;() {
@Override
public KV&lt;String, String&gt; apply(Row input) {
System.out.println(&quot;result: &quot; + input.getValues());
return KV.of(input.getValue(&quot;word&quot;), &quot;result=&quot; + input.getValues());
}
}))
.apply(KafkaIO.&lt;String, String&gt;write()
.withBootstrapServers(&quot;127.0.0.1:9092&quot;)
.withTopic(&quot;beamPrint&quot;)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
// run
pipeline.run();
}
}

My problem is that when I run this code and feed some messages into Kafka, there's no exception throwing out and it has received some messages from Kafka, but I cannot see it trigger the process for the windowing aggregation. And there's no result coming out as expected (like the table I shown before).

So does Beam SQL currently support the window syntax on unbounded Kafka input source? If it does, what's wrong with my current code? How can I debug and fix it? And is there any code example that integrates Beam SQL with KafkaIO?

Please help me! Thanks a lot!!

答案1

得分: 1

似乎在 https://lists.apache.org/thread.html/rea75c0eb665f90b8483e64bee96740ebb01942c606f065066c2ecc56%40%3Cuser.beam.apache.org%3E 得到了答复。

英文:

Looks like this was answered at https://lists.apache.org/thread.html/rea75c0eb665f90b8483e64bee96740ebb01942c606f065066c2ecc56%40%3Cuser.beam.apache.org%3E

huangapple
  • 本文由 发表于 2020年8月25日 01:40:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/63566057.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定