Kafka Scala消费者未从主题中读取消息,控制台没有错误。

huangapple go评论100阅读模式
英文:

kafka scala consumer not reading messages from topic no error in console

问题

以下是您要翻译的代码部分:

  1. 我有以下示例代码用于从Kafka主题中读取消息
  2. package com.krushna
  3. package kafkademo
  4. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
  5. import java.time.Duration
  6. import java.util.{Collections, Properties}
  7. object TestConsumer extends App {
  8. val consumer = new KafkaConsumer[String, String](getProperties())
  9. consumer.subscribe(Collections.singletonList("wm-cth-salesstreams"))
  10. while (true) {
  11. val data = consumer.poll(Duration.ofSeconds(3))
  12. data.forEach(println(_))
  13. }
  14. def getProperties(): Properties = {
  15. val properties: Properties = new Properties
  16. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  17. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  18. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  19. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "scala-c1-1234")
  20. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  21. properties
  22. }
  23. }

希望这有所帮助。如果您需要更多的帮助,请告诉我。

英文:

I have the below sample code to read messages from a Kafka topic

  1. package com.krushna
  2. package kafkademo
  3. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
  4. import java.time.Duration
  5. import java.util.{Collections, Properties}
  6. object TestConsumer extends App {
  7. val consumer= new KafkaConsumer[String,String](getProperties())
  8. consumer.subscribe(Collections.singletonList("wm-cth-salesstreams"))
  9. while(true){
  10. val data = consumer.poll(Duration.ofSeconds(3))
  11. data.forEach(println(_))
  12. }
  13. def getProperties(): Properties = {
  14. val properties: Properties = new Properties
  15. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  16. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  17. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
  18. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "scala-c1-1234")
  19. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  20. properties
  21. }
  22. }

Which is not reading any messages from the topics, where the kafka console consumer with below configuration is reading messages.

  1. kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wm-cth-salesstream --from-beginning --group c2

In the Scala/java console I can see the below messages print continuously.

  1. 13:12:25.621 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Node 1001 sent an incremental fetch response for session 386731835 with 0 response partition(s), 1 implied partition(s)
  2. 13:12:25.621 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Added READ_UNCOMMITTED fetch request for partition wm-cth-salesstreams-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1001 rack: null), epoch=0}} to node localhost:9092 (id: 1001 rack: null)
  3. 13:12:25.621 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Built incremental fetch (sessionId=386731835, epoch=794) for node 1001. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
  4. 13:12:25.621 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(wm-cth-salesstreams-0)) to broker localhost:9092 (id: 1001 rack: null)
  5. 13:12:26.123 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Node 1001 sent an incremental fetch response for session 386731835 with 0 response partition(s), 1 implied partition(s)
  6. 13:12:26.123 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Added READ_UNCOMMITTED fetch request for partition wm-cth-salesstreams-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1001 rack: null), epoch=0}} to node localhost:9092 (id: 1001 rack: null)
  7. 13:12:26.123 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Built incremental fetch (sessionId=386731835, epoch=795) for node 1001. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)
  8. 13:12:26.123 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(wm-cth-salesstreams-0)) to broker localhost:9092 (id: 1001 rack: null)
  9. 13:12:26.374 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Sending asynchronous auto-commit of offsets {wm-cth-salesstreams-0=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}}
  10. 13:12:26.378 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Committed offset 0 for partition wm-cth-salesstreams-0
  11. 13:12:26.378 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Completed asynchronous auto-commit of offsets {wm-cth-salesstreams-0=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}}
  12. 13:12:26.625 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Node 1001 sent an incremental fetch response for session 386731835 with 0 response partition(s), 1 implied partition(s)
  13. 13:12:26.626 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Added READ_UNCOMMITTED fetch request for partition wm-cth-salesstreams-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1001 rack: null), epoch=0}} to node localhost:9092 (id: 1001 rack: null)
  14. 13:12:26.626 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-scala-c1-1, groupId=scala-c1] Built incremental fetch (sessionId=386731835, epoch=796) for node 1001. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 1 partition(s)

What is the possible error ?

EDIT

output of the /kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups

  1. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  2. c1 wm-cth-salesstream 0 360 360 0 console-consumer-3cba8e14-0835-48e6-9620-296aa32aa551 /127.0.0.1 console-consumer
  3. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  4. c10 wm-cth-salesstream 0 360 360 0 console-consumer-614744ee-33ac-4dc9-87c7-66c6a1cdaa3a /127.0.0.1 console-consumer
  5. Consumer group 'c2' has no active members.
  6. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  7. c2 wm-cth-salesstream 0 360 360 0 - - -
  8. Consumer group 'kafka-java-consumer' has no active members.
  9. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  10. kafka-java-consumer wm-cth-salesstreams 0 0 0 0 - - -
  11. Consumer group 'kafka-java-consumer-v1' has no active members.
  12. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  13. kafka-java-consumer-v1 wm-cth-salesstreams 0 0 0 0 - - -
  14. Consumer group 'new-c1' has no active members.
  15. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  16. new-c1 wm-cth-salesstreams 0 0 0 0 - - -
  17. Consumer group 'scala-c1' has no active members.
  18. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  19. scala-c1 wm-cth-salesstreams 0 0 0 0 - - -
  20. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  21. scala-c1-1234 wm-cth-salesstreams 0 0 0 0 consumer-scala-c1-1234-1-bcc35a35-eb8b-495a-acd9-b98c0d528f93 /172.18.0.1 consumer-scala-c1-1234-1

答案1

得分: 0

如果您以前在组scala-c1中的主题wm-cth-salesstream上运行了任何消费者,并且从未生成更多数据,那么没有更多可读取的内容,您将不会获得任何输出。

auto.offset.reset=earliest 仅在组中没有已提交的偏移量时才适用。

英文:

If you've previously ran any consumer on topic wm-cth-salesstream within group scala-c1, and never produced more data, then there's nothing more to read, and you'll get no output.

auto.offset.reset=earliest only applies if there's no existing offsets committed to the group

huangapple
  • 本文由 发表于 2023年3月8日 15:42:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/75670419.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定