可以使用Flink作业根据通过Kafka传入的数据来分割并发送数据到多个作业吗?

huangapple go评论65阅读模式
英文:

Can I use a flink job to split and send data to multiple jobs according to the incoming data via kafka?

问题

我是新来的Flink。所以,这是一个Flink作业,它从Kafka消费数据并根据一些键和值拆分数据。

这是SplitJob类:

public class SplitJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ObjectMapper mapper = new ObjectMapper();
        long startTime = System.currentTimeMillis();

        DataStream<String> sourceStream = env.fromElements("{\"deviceId\":\"0\",\"sessionId\":\"0\",\"component\":[DATAS....}]}");
        DataStream<ObjectNode> jsonStream = sourceStream.map(value -> mapper.readTree(value).deepCopy());

        DataStream<String> data1 = processComponentTwo(jsonStream, "data1", "hr");
        DataStream<String> data2 = processComponentTwo(jsonStream, "data2", "hr");
        DataStream<String> data3 = processComponentTwo(jsonStream, "data3", "value");

        Job1.consumeDataStream(data1, data3);
        Job2.consumeDataStream(data2, data3);

        long endTime = System.currentTimeMillis();
        long elapsedTime = endTime - startTime;
        System.out.println("Elapsed time in splitjob:::::: " + elapsedTime + "ms");

        env.execute("SplitJob");
    }

    // ... 其他方法
}

这是Job1类:

public class Job1 {
    public static DataStream<String> consumeDataStream(DataStream<String> data, DataStream<String> pdata) throws Exception {
        StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
        env2.setParallelism(2);

        ArrayList<String> cdata = new ArrayList<>();
        Iterator<String> cOutput = DataStreamUtils.collect(data);
        while (cOutput.hasNext())
            cdata.add(cOutput.next());

        ArrayList<String> pList = new ArrayList<>();
        Iterator<String> pOutput = DataStreamUtils.collect(pdata);
        while (pOutput.hasNext())
            pList.add(pOutput.next());

        HashMap<String, ArrayList<String>> mergedData = new HashMap<>();
        mergedData.put("fhr1", cdata);
        mergedData.put("utrine", pList);

        DataStream<String> streams = env2.fromElements(mergedData.toString());
        DataStream<String> processedStream = streams.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                RequestSpecification request = RestAssured.given();
                request.header("Content-Type", "application/json");
                request.body(mergedData.toString());
                request.post("http://localhost:5002/fhr1");
                return value.concat(" job 1");
            }
        });

        env2.execute("Job 1");

        return processedStream;
    }

    // ... 其他方法
}

类似于这样,我们还有Job2类。我对Flink不太熟悉,所以不确定这是否是正确的使用方式。无论如何,我正在尝试从Kafka获取数据,将其拆分并发送到多个类似于Job1的作业,我希望从Job1发送数据到一个Flask API。将数据发送到Flask API应该在创建的其他作业中执行。目前这个过程耗时太长,这些方法都是从互联网上找到的。

我已经在SplitJob类中尝试了多线程,但我认为它没有起作用。目前使用的是硬编码数据而不是来自Kafka的数据。总体结构可以工作,但性能非常差,需要很长时间才能完成。我需要将时间缩短到10毫秒以下,目前需要10秒。这是否可能,或者我应该尝试其他方法?

英文:

I'm new to Flink. So, this is a flink job which consumes data from kafka and split the data according to some keys and values.

This is the SplitJob Class

public class SplitJob {
public static void main(String[] args) throws Exception {   
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ObjectMapper mapper = new ObjectMapper();
long startTime = System.currentTimeMillis();
//For recieving data from kafka
//        KafkaSource&lt;String&gt; source = KafkaSource.&lt;String&gt;builder().setBootstrapServers(&quot;localhost:9092&quot;).setTopics(&quot;input-topic&quot;).setGroupId(&quot;my-group&quot;).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();
//        DataStream&lt;String&gt; sourceStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), &quot;Kafka Source&quot;);
DataStream&lt;String&gt; sourceStream = env.fromElements(&quot;{\&quot;deviceId\&quot;:\&quot;0\&quot;,\&quot;sessionId\&quot;:\&quot;0\&quot;,\&quot;component\&quot;:[DATAS....}]}&quot;);
DataStream&lt;ObjectNode&gt; jsonStream = sourceStream.map(value -&gt; mapper.readTree(value).deepCopy());
DataStream&lt;String&gt; data1 = processComponentTwo(jsonStream, &quot;data1&quot;, &quot;hr&quot;);
DataStream&lt;String&gt; data2 = processComponentTwo(jsonStream, &quot;data2&quot;, &quot;hr&quot;);
DataStream&lt;String&gt; data3 = processComponentTwo(jsonStream, &quot;data3&quot;, &quot;value&quot;);
Job1.consumeDataStream(data1, data3);
Job2.consumeDataStream(data2, data3);
long endTime = System.currentTimeMillis();
long elapsedTime = endTime - startTime;
System.out.println(&quot;Elapsed time in splitjob:::::: &quot; + elapsedTime + &quot;ms&quot;);
env.execute(&quot;SplitJob&quot;);
}
private static void processComponent(ObjectNode jsonNodes, String code, String field, Collector&lt;String&gt; collector) {
List&lt;String&gt; resultList = new ArrayList&lt;&gt;();
for (JsonNode component : jsonNodes.get(&quot;component&quot;)) {
String componentCode = component.get(&quot;code&quot;).asText();
if (componentCode.equals(code)) {
resultList.add(component.get(field).toString());
}
}
collector.collect(resultList.toString());
}
private static DataStream&lt;String&gt; processComponentTwo(DataStream&lt;ObjectNode&gt; jsonStream, String componentCode, String field) {
return jsonStream.flatMap(new FlatMapFunction&lt;ObjectNode, String&gt;() {
@Override
public void flatMap(ObjectNode jsonNodes, Collector&lt;String&gt; collector) throws Exception {
List&lt;String&gt; resultList = new ArrayList&lt;&gt;();
for (JsonNode component : jsonNodes.get(&quot;component&quot;)) {
String code = component.get(&quot;code&quot;).asText();
if (code.equals(componentCode)) {
resultList.add(component.get(field).toString());
}
}
collector.collect(resultList.toString());
}
});
}
}
}

> This is the job1 class

public class Job1 {
public static DataStream&lt;String&gt; consumeDataStream(DataStream&lt;String&gt; data, DataStream&lt;String&gt; pdata) throws Exception {
StreamExecutionEnvironment env2 = StreamExecutionEnvironment.getExecutionEnvironment();
env2.setParallelism(2);
ArrayList&lt;String&gt; cdata = new ArrayList&lt;&gt;();
Iterator&lt;String&gt; cOutput = DataStreamUtils.collect(data);
while (cOutput.hasNext())
cdata.add(cOutput.next());
ArrayList&lt;String&gt; pList = new ArrayList&lt;&gt;();
Iterator&lt;String&gt; pOutput = DataStreamUtils.collect(pdata);
while (parentOutput.hasNext())
pList.add(pOutput.next());
HashMap&lt;String, ArrayList&lt;String&gt;&gt; mergedData = new HashMap&lt;&gt;();
mergedData.put(&quot;fhr1&quot;, cdata);
mergedData.put(&quot;utrine&quot;, pList);
DataStream&lt;String&gt; streams = env2.fromElements(mergedData.toString());
DataStream&lt;String&gt; processedStream = streams.map(new MapFunction&lt;String, String&gt;() {
@Override
public String map(String value) throws Exception {
RequestSpecification request = RestAssured.given();
request.header(&quot;Content-Type&quot;, &quot;application/json&quot;);
request.body(mergedData.toString());
request.post(&quot;http://localhost:5002/fhr1&quot;);
return value.concat(&quot; job 1&quot;);
}
});
env2.execute(&quot;Job 1&quot;);
return fhr;
}
}

Similar to this we have job2 class. Not familiar with flink so I don't know if this is the correct way to use it. Anyway what I'm trying to do is, Get data from kafka, split it and send to multiple jobs like job1, and I want to send the data from job1 to a flask api. This sending data to flask API should be on every other jobs created. so this is taking way too much time, the methods used here are scrapped from internet.
Also we need to scale up the jobs like job1 according to the data splitting.

The flink version using is : 1.17.0

I have tried multithreading in splitjob class, I don't think it worked.
Currently using a hardcoded data other that data from kafka.
The overall structure is working but the performance is very poor, and it is taking way too much time to complete. I need to reduce the time to below 10ms, currently its taking 10s.
Is it even possible or should I try any other methods.

答案1

得分: 1

以下是要翻译的内容:

有多种方法可以实现这一点,可能有很多不同的观点。

在同一应用程序下运行多个作业(execute() 调用)时,实际上会阻塞。因此,对另一个 execute() 的调用将等待前一个 execute() 调用完成。请参见这里

与每个作业独立运行的模式(已弃用)相比,应用程序模式允许提交由多个作业组成的应用程序。作业执行的顺序不受部署模式的影响,而受用于启动作业的调用的影响。使用execute(),这是阻塞的,会建立一个顺序,这将导致“下一个”作业的执行被推迟,直到“这个”作业完成。使用executeAsync(),这是非阻塞的,将导致“下一个”作业在“这个”作业完成之前启动。

应用程序模式允许多次调用execute()的应用程序,但在这些情况下不支持高可用性。应用程序模式下仅支持单次execute()应用程序的高可用性。

此外,在应用程序模式下运行的多个作业中,如果取消其中任何一个作业(例如使用executeAsync()提交的作业),将停止所有作业,并且作业管理器将关闭。支持常规作业完成(由源关闭)。

除了上述之外,我不太喜欢在一个应用程序中运行多个作业,因为它不提供高可用性。所以我通常会选择以下之一:

  1. 我在想你可以在这里使用侧输出。大致的想法是将分割的记录放入不同的侧输出中。然后,从每个独立的侧输出中,您可以进行进一步的处理。

  2. 在您的主应用程序中,只需将记录拆分并生产到不同的Kafka主题中,以供每种作业类型使用。然后,您可以创建另一个作业,从特定主题中消费并进行处理。

英文:

Well, there are multiple ways to achieve this which might be really opinionated.

When running multiple jobs (execute() call) under the same application, it actually blocks. So your call to another execute() will wait the previous execute() call to complete. See here.

> Compared to the Per-Job (deprecated) mode, the Application Mode allows the submission of applications consisting of multiple jobs. The order of job execution is not affected by the deployment mode but by the call used to launch the job. Using execute(), which is blocking, establishes an order and it will lead to the execution of the “next” job being postponed until “this” job finishes. Using executeAsync(), which is non-blocking, will lead to the “next” job starting before “this” job finishes.

>The Application Mode allows for multi-execute() applications but High-Availability is not supported in these cases. High-Availability in Application Mode is only supported for single-execute() applications.

>Additionally, when any of multiple running jobs in Application Mode (submitted for example using executeAsync()) gets cancelled, all jobs will be stopped and the JobManager will shut down. Regular job completions (by the sources shutting down) are supported.

Apart of the above, I am not fan of running multiple job inside one application since it does not offer high-availability. So I would normally do either:

  1. I am thinking you can use side outputs here. The rough idea would be to put splitted records into different side output. Then from each independent side output, you can do your further processing.

  2. In your main app, just split the records and produce back to different kafka topic for each job type. Then you can create another job that consume from that particular topic and do the processing.

huangapple
  • 本文由 发表于 2023年6月12日 19:10:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76456079.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定