是不是可以模拟一个Kafka环境,然后在本地运行Flink的JUnit测试以进行测试?

huangapple go评论62阅读模式
英文:

Is it possible to mock a kafka environment and run Flink junit locally for testing purposes?

问题

Sure, here's the translated code:

我们可以测试下面的flink作业吗如果可以的话我们该如何做

@Component
public class RedisController {

    @Value("${flink.consumer.topic.name}")

    private String flinkConsumerTopic;

    /**
     * Method: run()
     * Description: 运行Flink作业的入口点,从Kafka消费消息,从Redis检索患者详情并将它们合并在一起。
     * @throws Exception 如果Flink作业执行失败
     */

    public void run() throws Exception {

        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置Kafka属性
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        // 创建Kafka消费者
        FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>(flinkConsumerTopic, new JSONKeyValueDeserializationSchema(false), properties);

        // 从Kafka消费者创建数据流
        DataStream<ObjectNode> stream = env.addSource(consumer);

        // 使用窗口大小为10秒和滑动间隔为5秒定义滑动窗口
        DataStream<String> windowedStream = stream
                .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)))
                .apply(new RichAllWindowFunction<ObjectNode, String, TimeWindow>() {
                    private Socket socket;
                    private OutputStream outputStream;
                    private InputStream inputStream;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        String serverAddress = "localhost";
                        int serverPort = 8888;

                        // 创建套接字并连接到服务器
                        socket = new Socket(serverAddress, serverPort);
                        outputStream = socket.getOutputStream();
                        inputStream = socket.getInputStream();
                    }

                    @Override
                    public void apply(TimeWindow timeWindow, Iterable<ObjectNode> input, Collector<String> out) throws Exception {
                        // 将输入数据序列化为JSON字符串
                        String inputJson = input.toString();
                        byte[] inputBytes = inputJson.getBytes();

                        // 将输入数据发送到服务器
                        outputStream.write(inputBytes);
                        outputStream.flush();

                        // 测量开始时间
                        long startTime = System.currentTimeMillis();

                        // 从服务器读取响应
                        byte[] responseBytes = new byte[15034];
                        String response = "";

                        int bytesRead = inputStream.read(responseBytes);

                        if (bytesRead != -1) {
                            response = response + new String(responseBytes, 0, bytesRead);
                            System.out.println("服务器响应:" + response);
                        }

                        // 测量结束时间
                        long endTime = System.currentTimeMillis();

                        // 计算响应时间
                        long responseTime = endTime - startTime;
                        System.out.println("响应时间:" + responseTime + " 毫秒");
                        out.collect(response);
                    }
                });

        // 创建Kafka生产者
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                "output_topic", new SimpleStringSchema(), properties);

        // 将窗口流发送到Kafka生产者
        windowedStream.addSink(kafkaProducer);

        // 执行作业
        env.execute("Tumbling Window Example");
    }
}

Please note that the code contains placeholders like ${flink.consumer.topic.name} and specific configurations that you would need to set according to your environment. Additionally, the code seems to involve network communication with a server on localhost, so make sure the server is set up and running as needed.

英文:

Could we test the below flink job? If so how can we do it?

@Component
public class RedisController {
@Value(&quot;${flink.consumer.topic.name}&quot;)
private String flinkConsumerTopic;
/**
* Method: run()
* Description: Entry point for running Flink job that consumes messages from Kafka,
* enriches them with patient details retrieved from Redis, and merges them together.
*
* @throws Exception if Flink job fails to execute
*/
public void run() throws Exception {
// Set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the Kafka properties
Properties properties = new Properties();
properties.setProperty(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
// Create a Kafka consumer
FlinkKafkaConsumer&lt;ObjectNode&gt; consumer = new FlinkKafkaConsumer&lt;&gt;(flinkConsumerTopic, new JSONKeyValueDeserializationSchema(false), properties);
// Create a data stream from the Kafka consumer
DataStream&lt;ObjectNode&gt; stream = env.addSource(consumer);
// Define the sliding window with a window size of 10 seconds and slide interval of 5 seconds
DataStream&lt;String&gt; windowedStream = stream
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)))
.apply(new RichAllWindowFunction&lt;ObjectNode, String, TimeWindow&gt;() {
private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
@Override
public void open(Configuration parameters) throws Exception {
String serverAddress = &quot;localhost&quot;;
int serverPort = 8888;
// Create a socket and connect to the server
socket = new Socket(serverAddress, serverPort);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
}
@Override
public void apply(TimeWindow timeWindow, Iterable&lt;ObjectNode&gt; input, Collector&lt;String&gt; out) throws Exception {
// Serialize the input data to a JSON string
String inputJson = input.toString();
byte[] inputBytes = inputJson.getBytes();
// Send the input data to the server
outputStream.write(inputBytes);
outputStream.flush();
// Measure the start time
long startTime = System.currentTimeMillis();
// Read the response from the server
byte[] responseBytes = new byte[15034];
String response = &quot;&quot;;
int bytesRead = inputStream.read(responseBytes);
if (bytesRead != -1) {
response = response + new String(responseBytes, 0, bytesRead);
System.out.println(&quot;Response from server: &quot; + response);
}
// Measure the end time
long endTime = System.currentTimeMillis();
// Calculate the time taken for the response
long responseTime = endTime - startTime;
System.out.println(&quot;Response time: &quot; + responseTime + &quot; ms&quot;);
out.collect(response);
}
});
// Create a Kafka producer
FlinkKafkaProducer&lt;String&gt; kafkaProducer = new FlinkKafkaProducer&lt;&gt;(
&quot;output_topic&quot;, new SimpleStringSchema(), properties);
// Send the windowed stream to the Kafka producer
windowedStream.addSink(kafkaProducer);
// Execute the job
env.execute(&quot;Tumbling Window Example&quot;);
}
}

I need to run the junit for this flink. The dependency versions are not working properly.

答案1

得分: 1

你可以使用kafka-junit来启动一个嵌入式Kafka主题,这对于测试非常有用。依赖项如下:

<dependency>
    <groupId>net.mguenther.kafka</groupId>
    <artifactId>kafka-junit</artifactId>
    <version>2.8.0</version>
    <scope>test</scope>
</dependency>

你需要根据你的Flink版本设置合适的版本。

注意,你没有提到你的Flink版本(在问题中包含这个信息通常很有用),但是FlinkKafkaProducer已经被弃用了一段时间。你应该使用KafkaSink

英文:

You can use kafka-junit to spin up an embedded Kafka topic that's useful for testing. The dependency looks like:

		&lt;dependency&gt;
&lt;groupId&gt;net.mguenther.kafka&lt;/groupId&gt;
&lt;artifactId&gt;kafka-junit&lt;/artifactId&gt;
&lt;version&gt;2.8.0&lt;/version&gt;
&lt;scope&gt;test&lt;/scope&gt;
&lt;/dependency&gt;

You'll want to set the version to something appropriate given your version of Flink.

Note that you didn't mention your Flink version (which is always good info to include in a question), but FlinkKafkaProducer has been deprecated for a while. You should be using KafkaSink.

huangapple
  • 本文由 发表于 2023年6月6日 16:15:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/76412661.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定