
huangapple go评论56阅读模式

Can I use a flink job to split and send data to multiple jobs according to the incoming data via kafka?




public class SplitJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ObjectMapper mapper = new ObjectMapper();
        long startTime = System.currentTimeMillis();

        DataStream<String> sourceStream = env.fromElements("{\"deviceId\":\"0\",\"sessionId\":\"0\",\"component\":[DATAS....}]}");
        DataStream<ObjectNode> jsonStream = sourceStream.map(value -> mapper.readTree(value).deepCopy());

        DataStream<String> data1 = processComponentTwo(jsonStream, "data1", "hr");
        DataStream<String> data2 = processComponentTwo(jsonStream, "data2", "hr");
        DataStream<String> data3 = processComponentTwo(jsonStream, "data3", "value");

        Job1.consumeDataStream(data1, data3);
        Job2.consumeDataStream(data2, data3);

        long endTime = System.currentTimeMillis();
        long elapsedTime = endTime - startTime;
        System.out.println("Elapsed time in splitjob:::::: " + elapsedTime + "ms");


    // ... 其他方法


public class Job1 {
    public static DataStream<String> consumeDataStream(DataStream<String> data, DataStream<String> pdata) throws Exception {
        StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();

        ArrayList<String> cdata = new ArrayList<>();
        Iterator<String> cOutput = DataStreamUtils.collect(data);
        while (cOutput.hasNext())

        ArrayList<String> pList = new ArrayList<>();
        Iterator<String> pOutput = DataStreamUtils.collect(pdata);
        while (pOutput.hasNext())

        HashMap<String, ArrayList<String>> mergedData = new HashMap<>();
        mergedData.put("fhr1", cdata);
        mergedData.put("utrine", pList);

        DataStream<String> streams = env2.fromElements(mergedData.toString());
        DataStream<String> processedStream = streams.map(new MapFunction<String, String>() {
            public String map(String value) throws Exception {
                RequestSpecification request = RestAssured.given();
                request.header("Content-Type", "application/json");
                return value.concat(" job 1");

        env2.execute("Job 1");

        return processedStream;

    // ... 其他方法

类似于这样,我们还有Job2类。我对Flink不太熟悉,所以不确定这是否是正确的使用方式。无论如何,我正在尝试从Kafka获取数据,将其拆分并发送到多个类似于Job1的作业,我希望从Job1发送数据到一个Flask API。将数据发送到Flask API应该在创建的其他作业中执行。目前这个过程耗时太长,这些方法都是从互联网上找到的。



I'm new to Flink. So, this is a flink job which consumes data from kafka and split the data according to some keys and values.

This is the SplitJob Class

public class SplitJob {
public static void main(String[] args) throws Exception {   
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper mapper = new ObjectMapper();
long startTime = System.currentTimeMillis();
//For recieving data from kafka
//        KafkaSource&lt;String&gt; source = KafkaSource.&lt;String&gt;builder().setBootstrapServers(&quot;localhost:9092&quot;).setTopics(&quot;input-topic&quot;).setGroupId(&quot;my-group&quot;).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();
//        DataStream&lt;String&gt; sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), &quot;Kafka Source&quot;);
DataStream&lt;String&gt; sourceStream = env.fromElements(&quot;{\&quot;deviceId\&quot;:\&quot;0\&quot;,\&quot;sessionId\&quot;:\&quot;0\&quot;,\&quot;component\&quot;:[DATAS....}]}&quot;);
DataStream&lt;ObjectNode&gt; jsonStream = sourceStream.map(value -&gt; mapper.readTree(value).deepCopy());
DataStream&lt;String&gt; data1 = processComponentTwo(jsonStream, &quot;data1&quot;, &quot;hr&quot;);
DataStream&lt;String&gt; data2 = processComponentTwo(jsonStream, &quot;data2&quot;, &quot;hr&quot;);
DataStream&lt;String&gt; data3 = processComponentTwo(jsonStream, &quot;data3&quot;, &quot;value&quot;);
Job1.consumeDataStream(data1, data3);
Job2.consumeDataStream(data2, data3);
long endTime = System.currentTimeMillis();
long elapsedTime = endTime - startTime;
System.out.println(&quot;Elapsed time in splitjob:::::: &quot; + elapsedTime + &quot;ms&quot;);
private static void processComponent(ObjectNode jsonNodes, String code, String field, Collector&lt;String&gt; collector) {
List&lt;String&gt; resultList = new ArrayList&lt;&gt;();
for (JsonNode component : jsonNodes.get(&quot;component&quot;)) {
String componentCode = component.get(&quot;code&quot;).asText();
if (componentCode.equals(code)) {
private static DataStream&lt;String&gt; processComponentTwo(DataStream&lt;ObjectNode&gt; jsonStream, String componentCode, String field) {
return jsonStream.flatMap(new FlatMapFunction&lt;ObjectNode, String&gt;() {
public void flatMap(ObjectNode jsonNodes, Collector&lt;String&gt; collector) throws Exception {
List&lt;String&gt; resultList = new ArrayList&lt;&gt;();
for (JsonNode component : jsonNodes.get(&quot;component&quot;)) {
String code = component.get(&quot;code&quot;).asText();
if (code.equals(componentCode)) {

> This is the job1 class

public class Job1 {
public static DataStream&lt;String&gt; consumeDataStream(DataStream&lt;String&gt; data, DataStream&lt;String&gt; pdata) throws Exception {
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
ArrayList&lt;String&gt; cdata = new ArrayList&lt;&gt;();
Iterator&lt;String&gt; cOutput = DataStreamUtils.collect(data);
while (cOutput.hasNext())
ArrayList&lt;String&gt; pList = new ArrayList&lt;&gt;();
Iterator&lt;String&gt; pOutput = DataStreamUtils.collect(pdata);
while (parentOutput.hasNext())
HashMap&lt;String, ArrayList&lt;String&gt;&gt; mergedData = new HashMap&lt;&gt;();
mergedData.put(&quot;fhr1&quot;, cdata);
mergedData.put(&quot;utrine&quot;, pList);
DataStream&lt;String&gt; streams = env2.fromElements(mergedData.toString());
DataStream&lt;String&gt; processedStream = streams.map(new MapFunction&lt;String, String&gt;() {
public String map(String value) throws Exception {
RequestSpecification request = RestAssured.given();
request.header(&quot;Content-Type&quot;, &quot;application/json&quot;);
return value.concat(&quot; job 1&quot;);
env2.execute(&quot;Job 1&quot;);
return fhr;

Similar to this we have job2 class. Not familiar with flink so I don't know if this is the correct way to use it. Anyway what I'm trying to do is, Get data from kafka, split it and send to multiple jobs like job1, and I want to send the data from job1 to a flask api. This sending data to flask API should be on every other jobs created. so this is taking way too much time, the methods used here are scrapped from internet.
Also we need to scale up the jobs like job1 according to the data splitting.

The flink version using is : 1.17.0

I have tried multithreading in splitjob class, I don't think it worked.
Currently using a hardcoded data other that data from kafka.
The overall structure is working but the performance is very poor, and it is taking way too much time to complete. I need to reduce the time to below 10ms, currently its taking 10s.
Is it even possible or should I try any other methods.


得分: 1



在同一应用程序下运行多个作业(execute() 调用)时,实际上会阻塞。因此,对另一个 execute() 的调用将等待前一个 execute() 调用完成。请参见这里





  1. 我在想你可以在这里使用侧输出。大致的想法是将分割的记录放入不同的侧输出中。然后,从每个独立的侧输出中,您可以进行进一步的处理。

  2. 在您的主应用程序中,只需将记录拆分并生产到不同的Kafka主题中,以供每种作业类型使用。然后,您可以创建另一个作业,从特定主题中消费并进行处理。


Well, there are multiple ways to achieve this which might be really opinionated.

When running multiple jobs (execute() call) under the same application, it actually blocks. So your call to another execute() will wait the previous execute() call to complete. See here.

> Compared to the Per-Job (deprecated) mode, the Application Mode allows the submission of applications consisting of multiple jobs. The order of job execution is not affected by the deployment mode but by the call used to launch the job. Using execute(), which is blocking, establishes an order and it will lead to the execution of the “next” job being postponed until “this” job finishes. Using executeAsync(), which is non-blocking, will lead to the “next” job starting before “this” job finishes.

>The Application Mode allows for multi-execute() applications but High-Availability is not supported in these cases. High-Availability in Application Mode is only supported for single-execute() applications.

>Additionally, when any of multiple running jobs in Application Mode (submitted for example using executeAsync()) gets cancelled, all jobs will be stopped and the JobManager will shut down. Regular job completions (by the sources shutting down) are supported.

Apart of the above, I am not fan of running multiple job inside one application since it does not offer high-availability. So I would normally do either:

  1. I am thinking you can use side outputs here. The rough idea would be to put splitted records into different side output. Then from each independent side output, you can do your further processing.

  2. In your main app, just split the records and produce back to different kafka topic for each job type. Then you can create another job that consume from that particular topic and do the processing.

  • 本文由 发表于 2023年6月12日 19:10:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76456079.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
