Kafka 消费者未从主题接收任何消息,尽管组 ID 和客户端 ID 已正确设置。

huangapple go评论169阅读模式
英文:

Kafka Consumer not receiving any message from Topic even though group id and client id is set correctly

问题

使用 @InputChannelAdaptor,我正在从主题轮询消息,但如果我从命令行发布 JSON,就不会收到任何消息,但如果我传递任何文本,它会抛出异常。我试图消费 JSON 对象({"name": "foo"})并将其转换为 CreateResponse 类。

发送消息到主题的命令:

  1. C:\kafka_2.13-3.5.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
  2. >{"name" : "foo"}
  3. >sdwadaw

代码:

  1. package com.cae.egca.connector.config;
  2. @Configuration
  3. @EnableKafka
  4. @Slf4j
  5. public class KafkaConfig {
  6. @Bean
  7. RecordMessageConverter messageConverter() {
  8. return new StringJsonMessageConverter();
  9. }
  10. @Bean
  11. @InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
  12. public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
  13. RecordMessageConverter messageConverter) {
  14. KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
  15. new ConsumerProperties("myTopic"));
  16. kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
  17. kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
  18. kafkaMessageSource.setMessageConverter(messageConverter);
  19. kafkaMessageSource.setPayloadType(CreateResponse.class);
  20. return kafkaMessageSource;
  21. }
  22. @Bean
  23. public ConsumerFactory consumerFactory() {
  24. Map<String, Object> props = new HashMap<>();
  25. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  26. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  27. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  28. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
  29. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
  30. props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10000");
  31. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  32. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
  33. props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
  34. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  35. props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);
  36. return new DefaultKafkaConsumerFactory(props);
  37. }
  38. @ServiceActivator(inputChannel = "inputChannel")
  39. void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
  40. log.info("In SERVICE ACTIVATOR ");
  41. MDC.put("transaction.id", String.valueOf(UUID.randomUUID()));
  42. fileServices.processFile(cr);
  43. acknowledgment.acknowledge();
  44. log.info("ACKNOWLEDGED: ");
  45. }
  46. @Bean
  47. QueueChannel inputChannel() {
  48. return new QueueChannel();
  49. }
  50. }

异常:

  1. Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
  2. at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConverter.java:214)
  3. ... 14 more
  4. Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sdwadaw': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
  5. at [Source: (String)"sdwadaw"; line: 1, column: 8]
  6. at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
  7. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
  8. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
  9. ... 21 more
英文:

Using @InputChannelAdaptor , I am polling messages from topic but not receiving any messages if I post json from commandline. but if I pass any text it throws exception. I am trying to consume json object ({"name" : "foo"}) and convert it to CreateResponse class

Command to send message to Topic:

  1. C:\kafka_2.13-3.5.1\bin\windows&gt;kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
  2. &gt;{&quot;name&quot; : &quot;foo&quot;}
  3. &gt;sdwadaw

Code:

  1. package com.cae.egca.connector.config;
  2. @Configuration
  3. @EnableKafka
  4. @Slf4j
  5. public class KafkaConfig {
  6. @Bean
  7. RecordMessageConverter messageConverter() {
  8. return new StringJsonMessageConverter();
  9. }
  10. @Bean
  11. @InboundChannelAdapter(channel = &quot;inputChannel&quot;, poller = @Poller(fixedDelay = &quot;5000&quot;))
  12. public KafkaMessageSource&lt;String, String&gt; consumeMsg(ConsumerFactory&lt;String, String&gt; consumerFactory,
  13. RecordMessageConverter messageConverter) {
  14. KafkaMessageSource&lt;String, String&gt; kafkaMessageSource = new KafkaMessageSource&lt;&gt;(consumerFactory,
  15. new ConsumerProperties(&quot;myTopic&quot;));
  16. kafkaMessageSource.getConsumerProperties().setGroupId(&quot;myGroupId&quot;);
  17. kafkaMessageSource.getConsumerProperties().setClientId(&quot;myClientId&quot;);
  18. kafkaMessageSource.setMessageConverter(messageConverter);
  19. kafkaMessageSource.setPayloadType(CreateResponse.class);
  20. return kafkaMessageSource;
  21. }
  22. @Bean
  23. public ConsumerFactory consumerFactory() {
  24. Map&lt;String, Object&gt; props = new HashMap&lt;&gt;();
  25. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, &quot;localhost:9092&quot;);
  26. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  27. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  28. props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
  29. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
  30. props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, &quot;10000&quot;);
  31. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  32. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, &quot;30000&quot;);
  33. props.put(ConsumerConfig.GROUP_ID_CONFIG, &quot;myGroupID&quot;);
  34. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;latest&quot;);
  35. props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);
  36. return new DefaultKafkaConsumerFactory(props);
  37. }
  38. @ServiceActivator(inputChannel = &quot;inputChannel&quot;)
  39. void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
  40. log.info(&quot;In SERVICE ACTIVATOR &quot;);
  41. MDC.put(&quot;transaction.id&quot;, String.valueOf(UUID.randomUUID()));
  42. fileServices.processFile(cr);
  43. acknowledgment.acknowledge();
  44. log.info(&quot;ACKNOWLEDGED: &quot;);
  45. }
  46. @Bean
  47. QueueChannel inputChannel() {
  48. return new QueueChannel();
  49. }
  50. }

Exception:

  1. Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
  2. at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConvertework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
  3. ... 14 more
  4. Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token &#39;sdwadaw&#39;: was expecting (JSON String, Number, Array, Object or token &#39;null&#39;, &#39;true&#39; or &#39;false&#39;)
  5. at [Source: (String)&quot;sdwadaw&quot;; line: 1, column: 8]
  6. at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
  7. at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
  8. at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
  9. at ... 21 more```
  10. </details>
  11. # 答案1
  12. **得分**: 0
  13. 这对我来说没问题...
  14. ```java
  15. @SpringBootApplication
  16. public class So76876015Application {
  17. public static void main(String[] args) {
  18. SpringApplication.run(So76876015Application.class, args);
  19. }
  20. @Bean
  21. RecordMessageConverter messageConverter() {
  22. return new StringJsonMessageConverter();
  23. }
  24. @Bean
  25. @InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
  26. KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
  27. RecordMessageConverter messageConverter) {
  28. KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
  29. new ConsumerProperties("myTopic"));
  30. kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
  31. kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
  32. kafkaMessageSource.setMessageConverter(messageConverter);
  33. kafkaMessageSource.setPayloadType(CreateResponse.class);
  34. return kafkaMessageSource;
  35. }
  36. @ServiceActivator(inputChannel = "inputChannel")
  37. void consumeIt(CreateResponse cr) {
  38. System.out.println(cr);
  39. }
  40. @Bean
  41. public NewTopic topic() {
  42. return TopicBuilder.name("myTopic").partitions(1).replicas(1).build();
  43. }
  44. @Bean
  45. ApplicationRunner runner(KafkaTemplate<String, String> template) {
  46. return args -> {
  47. template.send("myTopic", "{\"name\":\"foo\"}");
  48. };
  49. }
  50. public static class CreateResponse {
  51. private String name;
  52. protected String getName() {
  53. return this.name;
  54. }
  55. protected void setName(String name) {
  56. this.name = name;
  57. }
  58. @Override
  59. public String toString() {
  60. return "CreateResponse [name=" + this.name + "]";
  61. }
  62. }
  63. }
  1. spring.kafka.consumer.auto-offset-reset=earliest
  1. CreateResponse [name=foo]
英文:

This works fine for me...

  1. @SpringBootApplication
  2. public class So76876015Application {
  3. public static void main(String[] args) {
  4. SpringApplication.run(So76876015Application.class, args);
  5. }
  6. @Bean
  7. RecordMessageConverter messageConverter() {
  8. return new StringJsonMessageConverter();
  9. }
  10. @Bean
  11. @InboundChannelAdapter(channel = &quot;inputChannel&quot;, poller = @Poller(fixedDelay = &quot;5000&quot;))
  12. KafkaMessageSource&lt;String, String&gt; consumeMsg(ConsumerFactory&lt;String, String&gt; consumerFactory,
  13. RecordMessageConverter messageConverter) {
  14. KafkaMessageSource&lt;String, String&gt; kafkaMessageSource = new KafkaMessageSource&lt;&gt;(consumerFactory,
  15. new ConsumerProperties(&quot;myTopic&quot;));
  16. kafkaMessageSource.getConsumerProperties().setGroupId(&quot;myGroupId&quot;);
  17. kafkaMessageSource.getConsumerProperties().setClientId(&quot;myClientId&quot;);
  18. kafkaMessageSource.setMessageConverter(messageConverter);
  19. kafkaMessageSource.setPayloadType(CreateResponse.class);
  20. return kafkaMessageSource;
  21. }
  22. @ServiceActivator(inputChannel = &quot;inputChannel&quot;)
  23. void consumeIt(CreateResponse cr) {
  24. System.out.println(cr);
  25. }
  26. @Bean
  27. public NewTopic topic() {
  28. return TopicBuilder.name(&quot;myTopic&quot;).partitions(1).replicas(1).build();
  29. }
  30. @Bean
  31. ApplicationRunner runner(KafkaTemplate&lt;String, String&gt; template) {
  32. return args -&gt; {
  33. template.send(&quot;myTopic&quot;, &quot;{\&quot;name\&quot;:\&quot;foo\&quot;}&quot;);
  34. };
  35. }
  36. public static class CreateResponse {
  37. private String name;
  38. protected String getName() {
  39. return this.name;
  40. }
  41. protected void setName(String name) {
  42. this.name = name;
  43. }
  44. @Override
  45. public String toString() {
  46. return &quot;CreateResponse [name=&quot; + this.name + &quot;]&quot;;
  47. }
  48. }
  49. }
  1. spring.kafka.consumer.auto-offset-reset=earliest
  1. CreateResponse [name=foo]

huangapple
  • 本文由 发表于 2023年8月10日 21:07:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76876015.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定