Kafka显示:「在实际加入消费者组之前,组成员需要具有有效的成员ID」。

huangapple go评论97阅读模式
英文:

Kafka is giving: "The group member needs to have a valid member id before actually entering a consumer group"

问题

  1. 我正在使用Java中的Kafka来消费消息。我想通过在本地多次启动同一个应用程序来进行测试。当我启动应用程序时,第一次能够从主题中开始消费消息。但是当我启动第二个应用程序时,我会得到以下错误:

Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group

  1. 并且无法从主题中获取任何消息。如果我尝试启动更多应用程序,我会遇到相同的问题。
  2. 我使用的Kafka配置如下:
  3. ```yaml
  4. spring:
  5. kafka:
  6. bootstrap-servers: kafka:9092
  7. consumer:
  8. auto-offset-reset: earliest
  9. key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
  10. value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
  11. properties:
  12. spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  13. spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  14. spring.json.use.type.headers: false
  15. listener:
  16. missing-topics-fatal: false

我有两个主题:

  1. @Configuration
  2. public class KafkaTopics {
  3. @Bean("alertsTopic")
  4. public NewTopic alertsTopic() {
  5. return TopicBuilder.name("XXX.alerts")
  6. .compact()
  7. .build();
  8. }
  9. @Bean("serversTopic")
  10. public NewTopic serversTopic() {
  11. return TopicBuilder.name("XXX.servers")
  12. .compact()
  13. .build();
  14. }
  15. }

并且有两个监听器在不同的类文件中。

@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=java.lang.String",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo"
})
public void registerServer(
@Payload(required = false) ServerInfo serverInfo
)

@KafkaListener(topics = ALERTS_KAFKA_TOPIC,
id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka"
})
public void processAlert(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) long offset,
@Payload(required = false) AlertOnKafka alert)

  1. <details>
  2. <summary>英文:</summary>
  3. I am using Kafka to consume messages in Java. I want to test by starting the same app multiple times on my local box. When I start up, the first time I am able to start consuming messages from the topic. When I start up a second one I get:

Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group

  1. and dont get any messages from the topic. If I try to start more of them I get the same issues.
  2. The configuration I am using for Kafka is
  3. ```yaml
  4. spring:
  5. kafka:
  6. bootstrap-servers: kafka:9092
  7. consumer:
  8. auto-offset-reset: earliest
  9. key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
  10. value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
  11. properties:
  12. spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  13. spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
  14. spring.json.use.type.headers: false
  15. listener:
  16. missing-topics-fatal: false

I have two topics

  1. @Configuration
  2. public class KafkaTopics {
  3. @Bean(&quot;alertsTopic&quot;)
  4. public NewTopic alertsTopic() {
  5. return TopicBuilder.name(&quot;XXX.alerts&quot;)
  6. .compact()
  7. .build();
  8. }
  9. @Bean(&quot;serversTopic&quot;)
  10. public NewTopic serversTopic() {
  11. return TopicBuilder.name(&quot;XXX.servers&quot;)
  12. .compact()
  13. .build();
  14. }
  15. }

And two listeners in different class files.

  1. @KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = &quot;#{T(java.util.UUID).randomUUID().toString()}&quot;,
  2. properties = {
  3. &quot;spring.json.key.default.type=java.lang.String&quot;,
  4. &quot;spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo&quot;
  5. })
  6. public void registerServer(
  7. @Payload(required = false) ServerInfo serverInfo
  8. )
  9. @KafkaListener(topics = ALERTS_KAFKA_TOPIC,
  10. id = &quot;#{T(java.util.UUID).randomUUID().toString()}&quot;,
  11. properties = {
  12. &quot;spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey&quot;,
  13. &quot;spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka&quot;
  14. })
  15. public void processAlert(
  16. @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
  17. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
  18. @Header(KafkaHeaders.OFFSET) long offset,
  19. @Payload(required = false) AlertOnKafka alert)
  1. </details>
  2. # 答案1
  3. **得分**: 18
  4. 根据我的分析。这是正常行为,您可以更改日志级别以排除它。
  5. 原因是,如果服务器检测到客户端可以支持 `member.id`,它会将该错误返回给客户端。这在 [KIP-394][1] 中有所说明。
  6. 然后客户端会使用生成的成员 ID 重新连接到服务器。
  7. [1]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request
  8. <details>
  9. <summary>英文:</summary>
  10. From my analysis. This is normal behaviour, you can change the log levels to exclude it.
  11. The reason for this is if the server detects that the client can support `member.id` it will give that error back to the client. This is noted in [KIP-394][1].
  12. The client will then reconnect back to the server with a generated member ID.
  13. [1]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request
  14. </details>
  15. # 答案2
  16. **得分**: 2
  17. 根据我的经验,这是由于缺少群组实例ID配置引起的。
  18. 以下是我在配置Kafka消费者组时使用的内容。
  19. ```java
  20. consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ("SchedulerCoordinator" + UUID.randomUUID()));

请注意,我个人更喜欢基于类的配置,而这些consumerProperties将用于消费者初始化。

英文:

In my experience this is caused by a missing group instance id configuration.

Below is what I use while configuring the kafka consumer for a group.

  1. consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,(&quot;SchedulerCoordinator&quot;+UUID.randomUUID()));

Bear in mind that I personally prefer class configurations over property based ones and these consumerProperties go into the Consumer initialisation.

答案3

得分: 1

如果 @Archimedes Trajano 给出的答案对您不起作用(就像在我的情况下一样),那么这是因为 Kafka 无法获取消费者组 ID。

如果您只有一个消费者组,您可以在属性文件中指定它,如下所示:

  1. spring:
  2. kafka:
  3. bootstrap-servers: kafka:9092
  4. consumer:
  5. group-id: 插入您的消费者组ID
  6. ... 其余的属性 ...

或者,如果您有多个消费者,您可以为每个消费者指定 groupId

  1. @KafkaListener(topics="topic-1", groupId="group-1")
  2. public void registerServer(@Payload(required = false) ServerInfo serverInfo)
  3. @KafkaListener(topics="topic-2", groupId="group-2")
  4. public void processAlert(@Payload(required = false) AlertOnKafka alert)

文档链接:https://docs.spring.io/spring-kafka/reference/html/#annotation-properties

英文:

If the answer from @Archimedes Trajano doesn't work for you (like in my case), then this happens when kafka can't pick up the consumer group id.

if you have a single consumer group, you can specify it in the properties file like this:

  1. spring:
  2. kafka:
  3. bootstrap-servers: kafka:9092
  4. consumer:
  5. group-id: insert-your-consumer-group-id
  6. ... rest of your properties ...

or if you have multiple consumers then you can specify the groupId for each one:

  1. @KafkaListener(topics=&quot;topic-1&quot;, groupId=&quot;group-1&quot;)
  2. public void registerServer(@Payload(required = false) ServerInfo serverInfo)
  3. @KafkaListener(topics=&quot;topic-2&quot;,groupId=&quot;group-2&quot;)
  4. public void processAlert(@Payload(required = false) AlertOnKafka alert)

docs: https://docs.spring.io/spring-kafka/reference/html/#annotation-properties

答案4

得分: 0

我遇到了相同的问题,也导致我的消费者无法订阅特定的主题。

我想可能会员的 .id 可能 类似于客户端 ID(在这里使用驼峰命名法),这可能在某种程度上与消费者群组相关。

对我有帮助的是为我的消费服务更改了客户端 ID,这次客户端 ID 是固定的,保持了相同的消费者群组。

英文:

I encountered the same issue, also preventing my consumer from subscribing to certain topics.

I figured that the member.id might be similar to the client-id (using Camel here), which might be in some way related to the consumer group.

What fixed it for me is a changed, this time non-changing client-id for my consuming service, leaving the same consumer group as is.

答案5

得分: 0

在我的情况下,消费者将使用生成的ID重新加入组。以下是我的测试和结果。供参考。

  1. @Test
  2. void testSyncSend() throws ExecutionException, InterruptedException {
  3. int id = (int)(System.currentTimeMillis()/1000);
  4. SendResult result = producer.syncSend(id);
  5. logger.info("[testSyncSend] id:{}, result:{}", id, result);
  6. new CountDownLatch(1).await();
  7. }

2021-04-20 14:32:20.980 INFO 6672 --- [main] o.a.kafka.common.utils.AppInfoParser : Kafka版本:2.5.1
2021-04-20 14:32:20.981 INFO 6672 --- [main] o.a.kafka.common.utils.AppInfoParser : Kafka提交ID:0efa8fb0f4c73d92
2021-04-20 14:32:20.981 INFO 6672 --- [main] o.a.kafka.common.utils.AppInfoParser : Kafka启动时间:1618900340980
2021-04-20 14:32:21.125 INFO 6672 --- [listener1-0-C-1] org.apache.kafka.clients.Metadata : [消费者客户端ID=consumer-listener1-1,组ID=listener1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125 INFO 6672 ---

Kafka显示:「在实际加入消费者组之前,组成员需要具有有效的成员ID」。
org.apache.kafka.clients.Metadata : [生产者客户端ID=producer-1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125 INFO 6672 --- [listener1-3-C-1] org.apache.kafka.clients.Metadata : [消费者客户端ID=consumer-listener1-4,组ID=listener1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125 INFO 6672 --- [listener1-2-C-1] org.apache.kafka.clients.Metadata : [消费者客户端ID=consumer-listener1-3,组ID=listener1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125 INFO 6672 --- [listener1-1-C-1] org.apache.kafka.clients.Metadata : [消费者客户端ID=consumer-listener1-2,组ID=listener1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.127 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-2,组ID=listener1] 发现组协调器 localhost:29092(ID:2147483646,机架:null)
2021-04-20 14:32:21.127 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-3,组ID=listener1] 发现组协调器 localhost:29092(ID:2147483646,机架:null)
2021-04-20 14:32:21.127 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-1,组ID=listener1] 发现组协调器 localhost:29092(ID:2147483646,机架:null)
2021-04-20 14:32:21.127 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-4,组ID=listener1] 发现组协调器 localhost:29092(ID:2147483646,机架:null)
2021-04-20 14:32:21.130 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-2,组ID=listener1](重新)加入组
2021-04-20 14:32:21.130 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-4,组ID=listener1](重新)加入组
2021-04-20 14:32:21.130 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-1,组ID=listener1](重新)加入组
2021-04-20 14:32:21.130 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-3,组ID=listener1](重新)加入组
2021-04-20 14:32:21.147 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-1,组ID=listener1] 使用 org.apache.kafka.common.errors.MemberIdRequiredException 加入组失败:组成员需要在实际加入消费者组之前具有有效的成员ID
2021-04-20 14:32:21.147 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-3,组ID=listener1] 使用 org.apache.kafka.common.errors.MemberIdRequiredException 加入组失败:组成员需要在实际加入消费者组之前具有有效的成员ID
2021-04-20 14:32:21.147 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-4,组ID=listener1] 使用 org.apache.kafka.common.errors.MemberIdRequiredException 加入组失败:组成员需要在实际加入消费者组之前具有有效的成员ID
2021-04-20 14:32:21.147 INFO 6672

英文:

hummm....in my case, consumers will rejoin the group with a generated ID. Here is my test and result. FYI

  1. @Test
  2. void testSyncSend() throws ExecutionException, InterruptedException {
  3. int id = (int)(System.currentTimeMillis()/1000);
  4. SendResult result = producer.syncSend(id);
  5. logger.info(&quot;[testSyncSend] id:{}, result:{}&quot;, id, result);
  6. new CountDownLatch(1).await();
  7. }
  1. 2021-04-20 14:32:20.980 INFO 6672 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.1
  2. 2021-04-20 14:32:20.981 INFO 6672 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 0efa8fb0f4c73d92
  3. 2021-04-20 14:32:20.981 INFO 6672 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1618900340980
  4. 2021-04-20 14:32:21.125 INFO 6672 --- [listener1-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-listener1-1, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
  5. 2021-04-20 14:32:21.125 INFO 6672 ---
    Kafka显示:「在实际加入消费者组之前,组成员需要具有有效的成员ID」。
    org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: RctzTn4XR4WNNVuqh25izw
  6. 2021-04-20 14:32:21.125 INFO 6672 --- [listener1-3-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-listener1-4, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
  7. 2021-04-20 14:32:21.125 INFO 6672 --- [listener1-2-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-listener1-3, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
  8. 2021-04-20 14:32:21.125 INFO 6672 --- [listener1-1-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-listener1-2, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
  9. 2021-04-20 14:32:21.127 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
  10. 2021-04-20 14:32:21.127 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
  11. 2021-04-20 14:32:21.127 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
  12. 2021-04-20 14:32:21.127 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null)
  13. 2021-04-20 14:32:21.130 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] (Re-)joining group
  14. 2021-04-20 14:32:21.130 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] (Re-)joining group
  15. 2021-04-20 14:32:21.130 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] (Re-)joining group
  16. 2021-04-20 14:32:21.130 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] (Re-)joining group
  17. 2021-04-20 14:32:21.147 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
  18. 2021-04-20 14:32:21.147 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
  19. 2021-04-20 14:32:21.147 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
  20. 2021-04-20 14:32:21.147 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
  21. 2021-04-20 14:32:21.148 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] (Re-)joining group
  22. 2021-04-20 14:32:21.148 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] (Re-)joining group
  23. 2021-04-20 14:32:21.148 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] (Re-)joining group
  24. 2021-04-20 14:32:21.148 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] (Re-)joining group
  25. 2021-04-20 14:32:21.317 INFO 6672 --- [ main] c.z.s.cbaConnector.KafkaProducerTest : [testSyncSend] id:1618900340, result:SendResult [producerRecord=ProducerRecord(topic=test1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 122, 104, 105, 108, 105, 46, 115, 109, 115, 109, 111, 100, 117, 108, 101, 46, 101, 110, 116, 105, 116, 121, 46, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Message(id=1618900340), timestamp=null), recordMetadata=test1-1@0]
  26. 2021-04-20 14:32:23.770 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Finished assignment for group at generation 2: {consumer-listener1-4-9a383250-e84d-413a-b012-85405abdcf7f=Assignment(partitions=[test1-8, test1-9]), consumer-listener1-2-3d26d9ef-b973-4d19-a930-5ba77d938680=Assignment(partitions=[test1-3, test1-4, test1-5]), consumer-listener1-1-10b6895e-264e-45bd-ba90-c71ea12b21e5=Assignment(partitions=[test1-0, test1-1, test1-2]), consumer-listener1-3-54ce965a-87cd-4e28-b0e9-b0f2c9f69423=Assignment(partitions=[test1-6, test1-7])}
  27. 2021-04-20 14:32:23.775 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Successfully joined group with generation 2
  28. 2021-04-20 14:32:23.775 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Successfully joined group with generation 2
  29. 2021-04-20 14:32:23.775 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Successfully joined group with generation 2
  30. 2021-04-20 14:32:23.776 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Successfully joined group with generation 2
  31. 2021-04-20 14:32:23.779 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Adding newly assigned partitions: test1-5, test1-4, test1-3
  32. 2021-04-20 14:32:23.779 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Adding newly assigned partitions: test1-6, test1-7
  33. 2021-04-20 14:32:23.779 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Adding newly assigned partitions: test1-0, test1-2, test1-1
  34. 2021-04-20 14:32:23.779 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Adding newly assigned partitions: test1-9, test1-8
  35. 2021-04-20 14:32:23.789 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Found no committed offset for partition test1-8
  36. 2021-04-20 14:32:23.789 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Found no committed offset for partition test1-6
  37. 2021-04-20 14:32:23.789 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Found no committed offset for partition test1-2
  38. 2021-04-20 14:32:23.789 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Found no committed offset for partition test1-1
  39. 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Setting offset for partition test1-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
  40. 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-5 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
  41. 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Setting offset for partition test1-9 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
  42. 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Setting offset for partition test1-7 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
  43. 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-4 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
  44. 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-3 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}}
  45. 2021-04-20 14:32:23.792 INFO 6672 --- [listener1-1-C-1] o.s.k.l.KafkaMessageListenerContainer : listener1: partitions assigned: [test1-5, test1-4, test1-3]
  46. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-listener1-4, groupId=listener1] Resetting offset for partition test1-8 to offset 0.
  47. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-listener1-3, groupId=listener1] Resetting offset for partition test1-6 to offset 0.
  48. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-listener1-1, groupId=listener1] Resetting offset for partition test1-2 to offset 0.
  49. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-listener1-1, groupId=listener1] Resetting offset for partition test1-1 to offset 0.
  50. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-3-C-1] o.s.k.l.KafkaMessageListenerContainer : listener1: partitions assigned: [test1-9, test1-8]
  51. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-2-C-1] o.s.k.l.KafkaMessageListenerContainer : listener1: partitions assigned: [test1-6, test1-7]
  52. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : listener1: partitions assigned: [test1-0, test1-2, test1-1]
  53. 2021-04-20 14:32:23.817 INFO 6672 --- [listener1-0-C-1] c.z.s.cbaConnector.KafkaConsumer : [KafakaConsumer][consume] thread:21 received message:{&quot;id&quot;:1618900340}

huangapple
  • 本文由 发表于 2020年9月18日 06:00:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/63946696.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定