英文:
Is it possible to mock a kafka environment and run Flink junit locally for testing purposes?
问题
Sure, here's the translated code:
我们可以测试下面的flink作业吗?如果可以的话,我们该如何做?
@Component
public class RedisController {
@Value("${flink.consumer.topic.name}")
private String flinkConsumerTopic;
/**
* Method: run()
* Description: 运行Flink作业的入口点,从Kafka消费消息,从Redis检索患者详情并将它们合并在一起。
* @throws Exception 如果Flink作业执行失败
*/
public void run() throws Exception {
// 设置执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置Kafka属性
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// 创建Kafka消费者
FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>(flinkConsumerTopic, new JSONKeyValueDeserializationSchema(false), properties);
// 从Kafka消费者创建数据流
DataStream<ObjectNode> stream = env.addSource(consumer);
// 使用窗口大小为10秒和滑动间隔为5秒定义滑动窗口
DataStream<String> windowedStream = stream
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)))
.apply(new RichAllWindowFunction<ObjectNode, String, TimeWindow>() {
private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
@Override
public void open(Configuration parameters) throws Exception {
String serverAddress = "localhost";
int serverPort = 8888;
// 创建套接字并连接到服务器
socket = new Socket(serverAddress, serverPort);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
}
@Override
public void apply(TimeWindow timeWindow, Iterable<ObjectNode> input, Collector<String> out) throws Exception {
// 将输入数据序列化为JSON字符串
String inputJson = input.toString();
byte[] inputBytes = inputJson.getBytes();
// 将输入数据发送到服务器
outputStream.write(inputBytes);
outputStream.flush();
// 测量开始时间
long startTime = System.currentTimeMillis();
// 从服务器读取响应
byte[] responseBytes = new byte[15034];
String response = "";
int bytesRead = inputStream.read(responseBytes);
if (bytesRead != -1) {
response = response + new String(responseBytes, 0, bytesRead);
System.out.println("服务器响应:" + response);
}
// 测量结束时间
long endTime = System.currentTimeMillis();
// 计算响应时间
long responseTime = endTime - startTime;
System.out.println("响应时间:" + responseTime + " 毫秒");
out.collect(response);
}
});
// 创建Kafka生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"output_topic", new SimpleStringSchema(), properties);
// 将窗口流发送到Kafka生产者
windowedStream.addSink(kafkaProducer);
// 执行作业
env.execute("Tumbling Window Example");
}
}
Please note that the code contains placeholders like ${flink.consumer.topic.name}
and specific configurations that you would need to set according to your environment. Additionally, the code seems to involve network communication with a server on localhost, so make sure the server is set up and running as needed.
英文:
Could we test the below flink job? If so how can we do it?
@Component
public class RedisController {
@Value("${flink.consumer.topic.name}")
private String flinkConsumerTopic;
/**
* Method: run()
* Description: Entry point for running Flink job that consumes messages from Kafka,
* enriches them with patient details retrieved from Redis, and merges them together.
*
* @throws Exception if Flink job fails to execute
*/
public void run() throws Exception {
// Set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the Kafka properties
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// Create a Kafka consumer
FlinkKafkaConsumer<ObjectNode> consumer = new FlinkKafkaConsumer<>(flinkConsumerTopic, new JSONKeyValueDeserializationSchema(false), properties);
// Create a data stream from the Kafka consumer
DataStream<ObjectNode> stream = env.addSource(consumer);
// Define the sliding window with a window size of 10 seconds and slide interval of 5 seconds
DataStream<String> windowedStream = stream
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(1)))
.apply(new RichAllWindowFunction<ObjectNode, String, TimeWindow>() {
private Socket socket;
private OutputStream outputStream;
private InputStream inputStream;
@Override
public void open(Configuration parameters) throws Exception {
String serverAddress = "localhost";
int serverPort = 8888;
// Create a socket and connect to the server
socket = new Socket(serverAddress, serverPort);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
}
@Override
public void apply(TimeWindow timeWindow, Iterable<ObjectNode> input, Collector<String> out) throws Exception {
// Serialize the input data to a JSON string
String inputJson = input.toString();
byte[] inputBytes = inputJson.getBytes();
// Send the input data to the server
outputStream.write(inputBytes);
outputStream.flush();
// Measure the start time
long startTime = System.currentTimeMillis();
// Read the response from the server
byte[] responseBytes = new byte[15034];
String response = "";
int bytesRead = inputStream.read(responseBytes);
if (bytesRead != -1) {
response = response + new String(responseBytes, 0, bytesRead);
System.out.println("Response from server: " + response);
}
// Measure the end time
long endTime = System.currentTimeMillis();
// Calculate the time taken for the response
long responseTime = endTime - startTime;
System.out.println("Response time: " + responseTime + " ms");
out.collect(response);
}
});
// Create a Kafka producer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"output_topic", new SimpleStringSchema(), properties);
// Send the windowed stream to the Kafka producer
windowedStream.addSink(kafkaProducer);
// Execute the job
env.execute("Tumbling Window Example");
}
}
I need to run the junit for this flink. The dependency versions are not working properly.
答案1
得分: 1
你可以使用kafka-junit来启动一个嵌入式Kafka主题,这对于测试非常有用。依赖项如下:
<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>2.8.0</version>
<scope>test</scope>
</dependency>
你需要根据你的Flink版本设置合适的版本。
注意,你没有提到你的Flink版本(在问题中包含这个信息通常很有用),但是FlinkKafkaProducer
已经被弃用了一段时间。你应该使用KafkaSink。
英文:
You can use kafka-junit to spin up an embedded Kafka topic that's useful for testing. The dependency looks like:
<dependency>
<groupId>net.mguenther.kafka</groupId>
<artifactId>kafka-junit</artifactId>
<version>2.8.0</version>
<scope>test</scope>
</dependency>
You'll want to set the version to something appropriate given your version of Flink.
Note that you didn't mention your Flink version (which is always good info to include in a question), but FlinkKafkaProducer
has been deprecated for a while. You should be using KafkaSink.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论