KafkaMessageListenerContainer 收到 0 条记录

huangapple go评论60阅读模式
英文:

KafkaMessageListenerContainer Receiving 0 Records

问题

以下是您要翻译的内容:

I have a simple Kafka application that is supposed to consume a json file. The Kafka server is running locally on port 9092. Zookeeper is running as well. I have a data.json file with user data. I have a User.json file containing the user schema with object types and the javatype pointing to a User.java class the matches the User.json schema.

KafkaConfig:


import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util Map;

@Component
@EnableKafka
public class KafkaConfig {

    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static ProducerFactory<String, User> producerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }

    public static ConsumerFactory<String, User> consumerFactory(String groupId) {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new JsonDeserializer<>(User.class));
    }

    @Bean
    public KafkaTemplate<String, User> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, User> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("employeeGroup"));
        factory.setMessageConverter(new StringJsonMessageConverter());
        return factory;
    }
}

User:


import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

import javax.persistence.Entity;
import javax.persistence.Id;

@Entity
@Getter
@Setter
@AllArgsConstructor
@RequiredArgsConstructor
public class User {

    @Id
    private int id;

    private String firstName;

    private String lastName;

    private String email;

    private String status;

}

KafkaApplication


import com.microservices.kafka.config.KafkaConfig;
import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.core.io ClassPathResource;
import org.springframework.core.io Resource;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.converter.JsonMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer JsonDeserializer;

import java.io IOException;
import java.io InputStreamReader;
import java.io Reader;
import java.nio.file Files;
import java.nio.file Path;
import java.nio.file Paths;

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})

public class KafkaApplication {

    private static final String TOPIC_NAME = "employeeTopic";
    private static final String GROUP_ID = "employeeGroup";

    private static final ConsumerRecord<String, User> record = null;

    private static final Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

    @Bean
    public RecordMessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    @Bean
    public JsonMessageConverter jsonConverter() {
        return new JsonMessageConverter();
    }

    @Bean
    public JsonDeserializer<User> deserializer() {
        return new JsonDeserializer<>(User.class);
    }

    @Bean
    public String loadData() throws IOException {
        Resource resource = new ClassPathResource("/data/data.json");
        byte[] bytes = Files.readAllBytes(Paths.get(resource.getURI()));
        logger.info("Loading data from file " + resource.getFilename());
        return new String(bytes);
    }
}

KafkaListeners:


import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype Component;
import java io.IOException;
import com.fasterxml.jackson.databind ObjectMapper;

@Component
public class KafkaListeners {

    @Autowired
    private ObjectMapper objectMapper;

    @KafkaListener(
            topics = "employeeTopic",
            groupId = "group_id",
            containerFactory = "kafkaListenerContainerFactory"
    )
    public void listen(ConsumerRecord<String, String> record) throws IOException {
        String fileContent = record.value();
        User user = objectMapper.readValue(fileContent, User.class);
        System.out.println("Received user: " + user);
    }
}

User.json:

  "type": "object",
  "javaType": "com.microservices.kafka.model.User",
  "properties": {
    "id": {
      "type": "integer"
    },
    "firstName": {
      "type": "string"
    },
    "lastName": {
      "type": "string"
    },
    "email": {
      "type": "string"
    },
    "status": {
      "type": "string"
    }
  }
}

The data.json file contains mock data corresponding to the User.json schema. As I said, it runs but it doesn't return anything. Any suggestions appreciated.

英文:

I have a simple Kafka application that is supposed to consume a json file. The Kafka server is running locally on port 9092. Zookeeper is running as well. I have a data.json file with user data. I have a User.json file containing the user schema with object types and the javatype pointing to a User.java class the matches the User.json schema.

KafkaConfig:


import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
@EnableKafka
public class KafkaConfig {

    private static final String BOOTSTRAP_SERVERS = &quot;localhost:9092&quot;;

    public static ProducerFactory&lt;String, User&gt; producerFactory() {
        Map&lt;String, Object&gt; config = new HashMap&lt;&gt;();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory&lt;&gt;(config);
    }

    public static ConsumerFactory&lt;String, User&gt; consumerFactory(String groupId) {
        Map&lt;String, Object&gt; config = new HashMap&lt;&gt;();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory&lt;&gt;(config, new StringDeserializer(),
                new JsonDeserializer&lt;&gt;(User.class));
    }

    @Bean
    public KafkaTemplate&lt;String, User&gt; kafkaTemplate() {
        return new KafkaTemplate&lt;&gt;(producerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory&lt;String, User&gt; kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory&lt;String, User&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
        factory.setConsumerFactory(consumerFactory(&quot;employeeGroup&quot;));
        factory.setMessageConverter(new StringJsonMessageConverter());
        return factory;
    }
}

User:

package com.microservices.kafka.model;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;

import javax.persistence.Entity;
import javax.persistence.Id;

@Entity
@Getter
@Setter
@AllArgsConstructor
@RequiredArgsConstructor
public class User {

    @Id
    private int id;

    private String firstName;

    private String lastName;

    private String email;

    private String status;



}

KafkaApplication

package com.microservices.kafka;

import com.microservices.kafka.config.KafkaConfig;
import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.converter.JsonMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})

public class KafkaApplication {

	private static final String TOPIC_NAME = &quot;employeeTopic&quot;;
	private static final String GROUP_ID = &quot;employeeGroup&quot;;

	private static final ConsumerRecord&lt;String, User&gt; record = null;

	private static final Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaApplication.class);

	public static void main(String[] args) {
		SpringApplication.run(KafkaApplication.class, args);
	}

	@Bean
	public RecordMessageConverter converter() {
		return new StringJsonMessageConverter();
	}

	@Bean
	public JsonMessageConverter jsonConverter() {
		return new JsonMessageConverter();
	}

	@Bean
	public JsonDeserializer&lt;User&gt; deserializer() {
		return new JsonDeserializer&lt;&gt;(User.class);
	}

	@Bean
	public String loadData() throws IOException {
		Resource resource = new ClassPathResource(&quot;/data/data.json&quot;);
		byte[] bytes = Files.readAllBytes(Paths.get(resource.getURI()));
		logger.info(&quot;Loading data from file &quot; + resource.getFilename());
		return new String(bytes);
	}

}

KafkaListeners:


import com.microservices.kafka.model.User;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * The @Autowired annotation is used to inject an instance of ObjectMapper into the class.
 * This is an instance of Jackson&#39;s ObjectMapper class, which is used to deserialize JSON data into Java objects.
 *
 * The @KafkaListener annotation is used to define a Kafka listener that will listen for messages
 * on the employeeTopic topic. The groupId parameter specifies the ID of the consumer group that this
 * listener is part of. The containerFactory parameter specifies the name of the Kafka listener container factory to use.
 *
 * The listen() method is the actual message listener method that will be called when a message is
 * received. It takes a ConsumerRecord&lt;String, String&gt; parameter that contains the Kafka message.
 * In this case, we assume that the message contains a JSON string representing a User object.
 *
 * The readValue() method of the ObjectMapper class is used to deserialize the JSON string into a User object.
 * We assume that the JSON string is stored in the value() property of the ConsumerRecord.
 *
 * Finally, we print out the received User object to the console.
 */
@Component
public class KafkaListeners {

    @Autowired
    private ObjectMapper objectMapper;

    @KafkaListener(
            topics = &quot;employeeTopic&quot;,
            groupId = &quot;group_id&quot;,
            containerFactory = &quot;kafkaListenerContainerFactory&quot;
    )
    public void listen(ConsumerRecord&lt;String, String&gt; record) throws IOException {
        String fileContent = record.value();
        User user = objectMapper.readValue(fileContent, User.class);
        System.out.println(&quot;Received user: &quot; + user);
    }
}

User.json:

  &quot;type&quot;: &quot;object&quot;,
  &quot;javaType&quot;: &quot;com.microservices.kafka.model.User&quot;,
  &quot;properties&quot;: {
    &quot;id&quot;: {
      &quot;type&quot;: &quot;integer&quot;
    },
    &quot;firstName&quot;: {
      &quot;type&quot;: &quot;string&quot;
    },
    &quot;lastName&quot;: {
      &quot;type&quot;: &quot;string&quot;
    },
    &quot;email&quot;: {
      &quot;type&quot;: &quot;string&quot;
    },
    &quot;status&quot;: {
      &quot;type&quot;: &quot;string&quot;
    }
  }
}

The data.json file contains mock data corresponding to the User.json schema. As I said, it runs but it doesn't return anything. Any suggestions appreciated.

答案1

得分: 1

尝试设置 ConsumerConfig.AUTO_OFFSET_RESET=earliest - 默认情况下是 latest,这意味着只消费消费者消费之后发布的新记录。

另外,由于您正在配置 JsonDeserializer,您应该只消费对象,而不是 String,并自行进行反序列化。

英文:

Try setting ConsumerConfig.AUTO_OFFSET_RESET=earliest - by default, it is latest which means consume only new records published after the consumer consumes.

Also, since you are configuring a JsonDeserializer you should just consume the object, not a String and do the deserialization yourself.

huangapple
  • 本文由 发表于 2023年3月9日 13:34:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/75680772.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定