英文:
KafkaMessageListenerContainer Receiving 0 Records
问题
以下是您要翻译的内容:
I have a simple Kafka application that is supposed to consume a json file. The Kafka server is running locally on port 9092. Zookeeper is running as well. I have a data.json file with user data. I have a User.json file containing the user schema with object types and the javatype pointing to a User.java class the matches the User.json schema.
KafkaConfig:
import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util Map;
@Component
@EnableKafka
public class KafkaConfig {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static ProducerFactory<String, User> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
public static ConsumerFactory<String, User> consumerFactory(String groupId) {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(User.class));
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("employeeGroup"));
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
}
User:
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import javax.persistence.Entity;
import javax.persistence.Id;
@Entity
@Getter
@Setter
@AllArgsConstructor
@RequiredArgsConstructor
public class User {
@Id
private int id;
private String firstName;
private String lastName;
private String email;
private String status;
}
KafkaApplication
import com.microservices.kafka.config.KafkaConfig;
import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.core.io ClassPathResource;
import org.springframework.core.io Resource;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.converter.JsonMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer JsonDeserializer;
import java.io IOException;
import java.io InputStreamReader;
import java.io Reader;
import java.nio.file Files;
import java.nio.file Path;
import java.nio.file Paths;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class KafkaApplication {
private static final String TOPIC_NAME = "employeeTopic";
private static final String GROUP_ID = "employeeGroup";
private static final ConsumerRecord<String, User> record = null;
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaApplication.class);
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public JsonMessageConverter jsonConverter() {
return new JsonMessageConverter();
}
@Bean
public JsonDeserializer<User> deserializer() {
return new JsonDeserializer<>(User.class);
}
@Bean
public String loadData() throws IOException {
Resource resource = new ClassPathResource("/data/data.json");
byte[] bytes = Files.readAllBytes(Paths.get(resource.getURI()));
logger.info("Loading data from file " + resource.getFilename());
return new String(bytes);
}
}
KafkaListeners:
import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype Component;
import java io.IOException;
import com.fasterxml.jackson.databind ObjectMapper;
@Component
public class KafkaListeners {
@Autowired
private ObjectMapper objectMapper;
@KafkaListener(
topics = "employeeTopic",
groupId = "group_id",
containerFactory = "kafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record) throws IOException {
String fileContent = record.value();
User user = objectMapper.readValue(fileContent, User.class);
System.out.println("Received user: " + user);
}
}
User.json:
"type": "object",
"javaType": "com.microservices.kafka.model.User",
"properties": {
"id": {
"type": "integer"
},
"firstName": {
"type": "string"
},
"lastName": {
"type": "string"
},
"email": {
"type": "string"
},
"status": {
"type": "string"
}
}
}
The data.json
file contains mock data corresponding to the User.json schema. As I said, it runs but it doesn't return anything. Any suggestions appreciated.
英文:
I have a simple Kafka application that is supposed to consume a json file. The Kafka server is running locally on port 9092. Zookeeper is running as well. I have a data.json file with user data. I have a User.json file containing the user schema with object types and the javatype pointing to a User.java class the matches the User.json schema.
KafkaConfig:
import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
@EnableKafka
public class KafkaConfig {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static ProducerFactory<String, User> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
public static ConsumerFactory<String, User> consumerFactory(String groupId) {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(User.class));
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory("employeeGroup"));
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
}
User:
package com.microservices.kafka.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import javax.persistence.Entity;
import javax.persistence.Id;
@Entity
@Getter
@Setter
@AllArgsConstructor
@RequiredArgsConstructor
public class User {
@Id
private int id;
private String firstName;
private String lastName;
private String email;
private String status;
}
KafkaApplication
package com.microservices.kafka;
import com.microservices.kafka.config.KafkaConfig;
import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.converter.JsonMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class KafkaApplication {
private static final String TOPIC_NAME = "employeeTopic";
private static final String GROUP_ID = "employeeGroup";
private static final ConsumerRecord<String, User> record = null;
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaApplication.class);
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@Bean
public JsonMessageConverter jsonConverter() {
return new JsonMessageConverter();
}
@Bean
public JsonDeserializer<User> deserializer() {
return new JsonDeserializer<>(User.class);
}
@Bean
public String loadData() throws IOException {
Resource resource = new ClassPathResource("/data/data.json");
byte[] bytes = Files.readAllBytes(Paths.get(resource.getURI()));
logger.info("Loading data from file " + resource.getFilename());
return new String(bytes);
}
}
KafkaListeners:
import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* The @Autowired annotation is used to inject an instance of ObjectMapper into the class.
* This is an instance of Jackson's ObjectMapper class, which is used to deserialize JSON data into Java objects.
*
* The @KafkaListener annotation is used to define a Kafka listener that will listen for messages
* on the employeeTopic topic. The groupId parameter specifies the ID of the consumer group that this
* listener is part of. The containerFactory parameter specifies the name of the Kafka listener container factory to use.
*
* The listen() method is the actual message listener method that will be called when a message is
* received. It takes a ConsumerRecord<String, String> parameter that contains the Kafka message.
* In this case, we assume that the message contains a JSON string representing a User object.
*
* The readValue() method of the ObjectMapper class is used to deserialize the JSON string into a User object.
* We assume that the JSON string is stored in the value() property of the ConsumerRecord.
*
* Finally, we print out the received User object to the console.
*/
@Component
public class KafkaListeners {
@Autowired
private ObjectMapper objectMapper;
@KafkaListener(
topics = "employeeTopic",
groupId = "group_id",
containerFactory = "kafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record) throws IOException {
String fileContent = record.value();
User user = objectMapper.readValue(fileContent, User.class);
System.out.println("Received user: " + user);
}
}
User.json:
"type": "object",
"javaType": "com.microservices.kafka.model.User",
"properties": {
"id": {
"type": "integer"
},
"firstName": {
"type": "string"
},
"lastName": {
"type": "string"
},
"email": {
"type": "string"
},
"status": {
"type": "string"
}
}
}
The data.json
file contains mock data corresponding to the User.json schema. As I said, it runs but it doesn't return anything. Any suggestions appreciated.
答案1
得分: 1
尝试设置 ConsumerConfig.AUTO_OFFSET_RESET=earliest
- 默认情况下是 latest
,这意味着只消费消费者消费之后发布的新记录。
另外,由于您正在配置 JsonDeserializer
,您应该只消费对象,而不是 String
,并自行进行反序列化。
英文:
Try setting ConsumerConfig.AUTO_OFFSET_RESET=earliest
- by default, it is latest
which means consume only new records published after the consumer consumes.
Also, since you are configuring a JsonDeserializer
you should just consume the object, not a String
and do the deserialization yourself.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论