
huangapple go评论84阅读模式

Is it possible to mock a kafka environment and run Flink junit locally for testing purposes?


Sure, here's the translated code:


public class RedisController {


    private String flinkConsumerTopic;

     * Method: run()
     * Description: 运行Flink作业的入口点,从Kafka消费消息,从Redis检索患者详情并将它们合并在一起。
     * @throws Exception 如果Flink作业执行失败

    public void run() throws Exception {

        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置Kafka属性
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        // 创建Kafka消费者
        FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>(flinkConsumerTopic, new JSONKeyValueDeserializationSchema(false), properties);

        // 从Kafka消费者创建数据流
        DataStream<ObjectNode> stream = env.addSource(consumer);

        // 使用窗口大小为10秒和滑动间隔为5秒定义滑动窗口
        DataStream<String> windowedStream = stream
                .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)))
                .apply(new RichAllWindowFunction<ObjectNode, String, TimeWindow>() {
                    private Socket socket;
                    private OutputStream outputStream;
                    private InputStream inputStream;

                    public void open(Configuration parameters) throws Exception {
                        String serverAddress = "localhost";
                        int serverPort = 8888;

                        // 创建套接字并连接到服务器
                        socket = new Socket(serverAddress, serverPort);
                        outputStream = socket.getOutputStream();
                        inputStream = socket.getInputStream();

                    public void apply(TimeWindow timeWindow, Iterable<ObjectNode> input, Collector<String> out) throws Exception {
                        // 将输入数据序列化为JSON字符串
                        String inputJson = input.toString();
                        byte[] inputBytes = inputJson.getBytes();

                        // 将输入数据发送到服务器

                        // 测量开始时间
                        long startTime = System.currentTimeMillis();

                        // 从服务器读取响应
                        byte[] responseBytes = new byte[15034];
                        String response = "";

                        int bytesRead = inputStream.read(responseBytes);

                        if (bytesRead != -1) {
                            response = response + new String(responseBytes, 0, bytesRead);
                            System.out.println("服务器响应:" + response);

                        // 测量结束时间
                        long endTime = System.currentTimeMillis();

                        // 计算响应时间
                        long responseTime = endTime - startTime;
                        System.out.println("响应时间:" + responseTime + " 毫秒");

        // 创建Kafka生产者
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                "output_topic", new SimpleStringSchema(), properties);

        // 将窗口流发送到Kafka生产者

        // 执行作业
        env.execute("Tumbling Window Example");

Please note that the code contains placeholders like ${flink.consumer.topic.name} and specific configurations that you would need to set according to your environment. Additionally, the code seems to involve network communication with a server on localhost, so make sure the server is set up and running as needed.


Could we test the below flink job? If so how can we do it?

public class RedisController {
private String flinkConsumerTopic;
* Method: run()
* Description: Entry point for running Flink job that consumes messages from Kafka,
* enriches them with patient details retrieved from Redis, and merges them together.
* @throws Exception if Flink job fails to execute
public void run() throws Exception {
// Set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the Kafka properties
Properties properties = new Properties();
properties.setProperty(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
// Create a Kafka consumer
FlinkKafkaConsumer&lt;ObjectNode&gt; consumer = new FlinkKafkaConsumer&lt;&gt;(flinkConsumerTopic, new JSONKeyValueDeserializationSchema(false), properties);
// Create a data stream from the Kafka consumer
DataStream&lt;ObjectNode&gt; stream = env.addSource(consumer);
// Define the sliding window with a window size of 10 seconds and slide interval of 5 seconds
DataStream&lt;String&gt; windowedStream = stream
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)))
.apply(new RichAllWindowFunction&lt;ObjectNode, String, TimeWindow&gt;() {
private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
public void open(Configuration parameters) throws Exception {
String serverAddress = &quot;localhost&quot;;
int serverPort = 8888;
// Create a socket and connect to the server
socket = new Socket(serverAddress, serverPort);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
public void apply(TimeWindow timeWindow, Iterable&lt;ObjectNode&gt; input, Collector&lt;String&gt; out) throws Exception {
// Serialize the input data to a JSON string
String inputJson = input.toString();
byte[] inputBytes = inputJson.getBytes();
// Send the input data to the server
// Measure the start time
long startTime = System.currentTimeMillis();
// Read the response from the server
byte[] responseBytes = new byte[15034];
String response = &quot;&quot;;
int bytesRead = inputStream.read(responseBytes);
if (bytesRead != -1) {
response = response + new String(responseBytes, 0, bytesRead);
System.out.println(&quot;Response from server: &quot; + response);
// Measure the end time
long endTime = System.currentTimeMillis();
// Calculate the time taken for the response
long responseTime = endTime - startTime;
System.out.println(&quot;Response time: &quot; + responseTime + &quot; ms&quot;);
// Create a Kafka producer
FlinkKafkaProducer&lt;String&gt; kafkaProducer = new FlinkKafkaProducer&lt;&gt;(
&quot;output_topic&quot;, new SimpleStringSchema(), properties);
// Send the windowed stream to the Kafka producer
// Execute the job
env.execute(&quot;Tumbling Window Example&quot;);

I need to run the junit for this flink. The dependency versions are not working properly.


得分: 1






You can use kafka-junit to spin up an embedded Kafka topic that's useful for testing. The dependency looks like:


You'll want to set the version to something appropriate given your version of Flink.

Note that you didn't mention your Flink version (which is always good info to include in a question), but FlinkKafkaProducer has been deprecated for a while. You should be using KafkaSink.

  • 本文由 发表于 2023年6月6日 16:15:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/76412661.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
