Kafka显示:「在实际加入消费者组之前,组成员需要具有有效的成员ID」。

huangapple go评论78阅读模式
英文:

Kafka is giving: "The group member needs to have a valid member id before actually entering a consumer group"

问题

我正在使用Java中的Kafka来消费消息。我想通过在本地多次启动同一个应用程序来进行测试。当我启动应用程序时,第一次能够从主题中开始消费消息。但是当我启动第二个应用程序时,我会得到以下错误:

Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group


并且无法从主题中获取任何消息。如果我尝试启动更多应用程序,我会遇到相同的问题。

我使用的Kafka配置如下:

```yaml
spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.use.type.headers: false
    listener:
      missing-topics-fatal: false

我有两个主题:

@Configuration
public class KafkaTopics {
    @Bean("alertsTopic")
    public NewTopic alertsTopic() {
        return TopicBuilder.name("XXX.alerts")
            .compact()
            .build();
    }

    @Bean("serversTopic")
    public NewTopic serversTopic() {
        return TopicBuilder.name("XXX.servers")
            .compact()
            .build();
    }
}

并且有两个监听器在不同的类文件中。

@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=java.lang.String",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo"
})
public void registerServer(
@Payload(required = false) ServerInfo serverInfo
)

@KafkaListener(topics = ALERTS_KAFKA_TOPIC,
id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka"
})
public void processAlert(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) long offset,
@Payload(required = false) AlertOnKafka alert)


<details>
<summary>英文:</summary>

I am using Kafka to consume messages in Java.  I want to test by starting the same app multiple times on my local box. When I start up, the first time I am able to start consuming messages from the topic.  When I start up a second one I get:

Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group

 and dont get any messages from the topic.  If I try to start more of them I get the same issues.

The configuration I am using for Kafka is

```yaml
spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
      properties:
        spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
        spring.json.use.type.headers: false
    listener:
      missing-topics-fatal: false

I have two topics

@Configuration
public class KafkaTopics {
    @Bean(&quot;alertsTopic&quot;)
    public NewTopic alertsTopic() {

        return TopicBuilder.name(&quot;XXX.alerts&quot;)
            .compact()
            .build();
    }

    @Bean(&quot;serversTopic&quot;)
    public NewTopic serversTopic() {

        return TopicBuilder.name(&quot;XXX.servers&quot;)
            .compact()
            .build();
    }

}

And two listeners in different class files.

@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = &quot;#{T(java.util.UUID).randomUUID().toString()}&quot;,
    properties = {
        &quot;spring.json.key.default.type=java.lang.String&quot;,
        &quot;spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo&quot;
    })
public void registerServer(
    @Payload(required = false) ServerInfo serverInfo
) 

@KafkaListener(topics = ALERTS_KAFKA_TOPIC,
    id = &quot;#{T(java.util.UUID).randomUUID().toString()}&quot;,
    properties = {
        &quot;spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey&quot;,
        &quot;spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka&quot;
    })
public void processAlert(
    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
    @Header(KafkaHeaders.OFFSET) long offset,
    @Payload(required = false) AlertOnKafka alert)

</details>


# 答案1
**得分**: 18

根据我的分析。这是正常行为,您可以更改日志级别以排除它。

原因是,如果服务器检测到客户端可以支持 `member.id`,它会将该错误返回给客户端。这在 [KIP-394][1] 中有所说明。

然后客户端会使用生成的成员 ID 重新连接到服务器。

[1]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request

<details>
<summary>英文:</summary>

From my analysis.  This is normal behaviour, you can change the log levels to exclude it.

The reason for this is if the server detects that the client can support `member.id` it will give that error back to the client.  This is noted in [KIP-394][1].

The client will then reconnect back to the server with a generated member ID.

[1]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request

</details>



# 答案2
**得分**: 2

根据我的经验,这是由于缺少群组实例ID配置引起的。

以下是我在配置Kafka消费者组时使用的内容。

```java
consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, ("SchedulerCoordinator" + UUID.randomUUID()));

请注意,我个人更喜欢基于类的配置,而这些consumerProperties将用于消费者初始化。

英文:

In my experience this is caused by a missing group instance id configuration.

Below is what I use while configuring the kafka consumer for a group.

consumerProperties.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,(&quot;SchedulerCoordinator&quot;+UUID.randomUUID()));

Bear in mind that I personally prefer class configurations over property based ones and these consumerProperties go into the Consumer initialisation.

答案3

得分: 1

如果 @Archimedes Trajano 给出的答案对您不起作用(就像在我的情况下一样),那么这是因为 Kafka 无法获取消费者组 ID。

如果您只有一个消费者组,您可以在属性文件中指定它,如下所示:

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      group-id: 插入您的消费者组ID
      ... 其余的属性 ...

或者,如果您有多个消费者,您可以为每个消费者指定 groupId

@KafkaListener(topics="topic-1", groupId="group-1")
public void registerServer(@Payload(required = false) ServerInfo serverInfo) 

@KafkaListener(topics="topic-2", groupId="group-2")
public void processAlert(@Payload(required = false) AlertOnKafka alert)

文档链接:https://docs.spring.io/spring-kafka/reference/html/#annotation-properties

英文:

If the answer from @Archimedes Trajano doesn't work for you (like in my case), then this happens when kafka can't pick up the consumer group id.

if you have a single consumer group, you can specify it in the properties file like this:

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      group-id: insert-your-consumer-group-id
      ... rest of your properties ...

or if you have multiple consumers then you can specify the groupId for each one:

@KafkaListener(topics=&quot;topic-1&quot;, groupId=&quot;group-1&quot;)
public void registerServer(@Payload(required = false) ServerInfo serverInfo) 

@KafkaListener(topics=&quot;topic-2&quot;,groupId=&quot;group-2&quot;)
public void processAlert(@Payload(required = false) AlertOnKafka alert)

docs: https://docs.spring.io/spring-kafka/reference/html/#annotation-properties

答案4

得分: 0

我遇到了相同的问题,也导致我的消费者无法订阅特定的主题。

我想可能会员的 .id 可能 类似于客户端 ID(在这里使用驼峰命名法),这可能在某种程度上与消费者群组相关。

对我有帮助的是为我的消费服务更改了客户端 ID,这次客户端 ID 是固定的,保持了相同的消费者群组。

英文:

I encountered the same issue, also preventing my consumer from subscribing to certain topics.

I figured that the member.id might be similar to the client-id (using Camel here), which might be in some way related to the consumer group.

What fixed it for me is a changed, this time non-changing client-id for my consuming service, leaving the same consumer group as is.

答案5

得分: 0

在我的情况下,消费者将使用生成的ID重新加入组。以下是我的测试和结果。供参考。

@Test
void testSyncSend() throws ExecutionException, InterruptedException {
    int id = (int)(System.currentTimeMillis()/1000);
    SendResult result = producer.syncSend(id);
    logger.info("[testSyncSend] id:{}, result:{}", id, result);
    new CountDownLatch(1).await();
}

2021-04-20 14:32:20.980 INFO 6672 --- [main] o.a.kafka.common.utils.AppInfoParser : Kafka版本:2.5.1
2021-04-20 14:32:20.981 INFO 6672 --- [main] o.a.kafka.common.utils.AppInfoParser : Kafka提交ID:0efa8fb0f4c73d92
2021-04-20 14:32:20.981 INFO 6672 --- [main] o.a.kafka.common.utils.AppInfoParser : Kafka启动时间:1618900340980
2021-04-20 14:32:21.125 INFO 6672 --- [listener1-0-C-1] org.apache.kafka.clients.Metadata : [消费者客户端ID=consumer-listener1-1,组ID=listener1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125 INFO 6672 ---

Kafka显示:「在实际加入消费者组之前,组成员需要具有有效的成员ID」。
org.apache.kafka.clients.Metadata : [生产者客户端ID=producer-1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125 INFO 6672 --- [listener1-3-C-1] org.apache.kafka.clients.Metadata : [消费者客户端ID=consumer-listener1-4,组ID=listener1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125 INFO 6672 --- [listener1-2-C-1] org.apache.kafka.clients.Metadata : [消费者客户端ID=consumer-listener1-3,组ID=listener1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125 INFO 6672 --- [listener1-1-C-1] org.apache.kafka.clients.Metadata : [消费者客户端ID=consumer-listener1-2,组ID=listener1] 集群ID:RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.127 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-2,组ID=listener1] 发现组协调器 localhost:29092(ID:2147483646,机架:null)
2021-04-20 14:32:21.127 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-3,组ID=listener1] 发现组协调器 localhost:29092(ID:2147483646,机架:null)
2021-04-20 14:32:21.127 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-1,组ID=listener1] 发现组协调器 localhost:29092(ID:2147483646,机架:null)
2021-04-20 14:32:21.127 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-4,组ID=listener1] 发现组协调器 localhost:29092(ID:2147483646,机架:null)
2021-04-20 14:32:21.130 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-2,组ID=listener1](重新)加入组
2021-04-20 14:32:21.130 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-4,组ID=listener1](重新)加入组
2021-04-20 14:32:21.130 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-1,组ID=listener1](重新)加入组
2021-04-20 14:32:21.130 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-3,组ID=listener1](重新)加入组
2021-04-20 14:32:21.147 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-1,组ID=listener1] 使用 org.apache.kafka.common.errors.MemberIdRequiredException 加入组失败:组成员需要在实际加入消费者组之前具有有效的成员ID
2021-04-20 14:32:21.147 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-3,组ID=listener1] 使用 org.apache.kafka.common.errors.MemberIdRequiredException 加入组失败:组成员需要在实际加入消费者组之前具有有效的成员ID
2021-04-20 14:32:21.147 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [消费者客户端ID=consumer-listener1-4,组ID=listener1] 使用 org.apache.kafka.common.errors.MemberIdRequiredException 加入组失败:组成员需要在实际加入消费者组之前具有有效的成员ID
2021-04-20 14:32:21.147 INFO 6672

英文:

hummm....in my case, consumers will rejoin the group with a generated ID. Here is my test and result. FYI

@Test
    void testSyncSend() throws ExecutionException, InterruptedException {
        int id = (int)(System.currentTimeMillis()/1000);
        SendResult result = producer.syncSend(id);
        logger.info(&quot;[testSyncSend] id:{}, result:{}&quot;, id, result);
        new CountDownLatch(1).await();
    }
2021-04-20 14:32:20.980  INFO 6672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
2021-04-20 14:32:20.981  INFO 6672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
2021-04-20 14:32:20.981  INFO 6672 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1618900340980
2021-04-20 14:32:21.125  INFO 6672 --- [listener1-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-listener1-1, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw
2021-04-20 14:32:21.125  INFO 6672 --- 
Kafka显示:「在实际加入消费者组之前,组成员需要具有有效的成员ID」。
org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: RctzTn4XR4WNNVuqh25izw 2021-04-20 14:32:21.125 INFO 6672 --- [listener1-3-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-listener1-4, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw 2021-04-20 14:32:21.125 INFO 6672 --- [listener1-2-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-listener1-3, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw 2021-04-20 14:32:21.125 INFO 6672 --- [listener1-1-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-listener1-2, groupId=listener1] Cluster ID: RctzTn4XR4WNNVuqh25izw 2021-04-20 14:32:21.127 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null) 2021-04-20 14:32:21.127 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null) 2021-04-20 14:32:21.127 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null) 2021-04-20 14:32:21.127 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Discovered group coordinator localhost:29092 (id: 2147483646 rack: null) 2021-04-20 14:32:21.130 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] (Re-)joining group 2021-04-20 14:32:21.130 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] (Re-)joining group 2021-04-20 14:32:21.130 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] (Re-)joining group 2021-04-20 14:32:21.130 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] (Re-)joining group 2021-04-20 14:32:21.147 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group 2021-04-20 14:32:21.147 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group 2021-04-20 14:32:21.147 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group 2021-04-20 14:32:21.147 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group 2021-04-20 14:32:21.148 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] (Re-)joining group 2021-04-20 14:32:21.148 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] (Re-)joining group 2021-04-20 14:32:21.148 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] (Re-)joining group 2021-04-20 14:32:21.148 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] (Re-)joining group 2021-04-20 14:32:21.317 INFO 6672 --- [ main] c.z.s.cbaConnector.KafkaProducerTest : [testSyncSend] id:1618900340, result:SendResult [producerRecord=ProducerRecord(topic=test1, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [99, 111, 109, 46, 122, 104, 105, 108, 105, 46, 115, 109, 115, 109, 111, 100, 117, 108, 101, 46, 101, 110, 116, 105, 116, 121, 46, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=Message(id=1618900340), timestamp=null), recordMetadata=test1-1@0] 2021-04-20 14:32:23.770 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Finished assignment for group at generation 2: {consumer-listener1-4-9a383250-e84d-413a-b012-85405abdcf7f=Assignment(partitions=[test1-8, test1-9]), consumer-listener1-2-3d26d9ef-b973-4d19-a930-5ba77d938680=Assignment(partitions=[test1-3, test1-4, test1-5]), consumer-listener1-1-10b6895e-264e-45bd-ba90-c71ea12b21e5=Assignment(partitions=[test1-0, test1-1, test1-2]), consumer-listener1-3-54ce965a-87cd-4e28-b0e9-b0f2c9f69423=Assignment(partitions=[test1-6, test1-7])} 2021-04-20 14:32:23.775 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Successfully joined group with generation 2 2021-04-20 14:32:23.775 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Successfully joined group with generation 2 2021-04-20 14:32:23.775 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Successfully joined group with generation 2 2021-04-20 14:32:23.776 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Successfully joined group with generation 2 2021-04-20 14:32:23.779 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Adding newly assigned partitions: test1-5, test1-4, test1-3 2021-04-20 14:32:23.779 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Adding newly assigned partitions: test1-6, test1-7 2021-04-20 14:32:23.779 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Adding newly assigned partitions: test1-0, test1-2, test1-1 2021-04-20 14:32:23.779 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Adding newly assigned partitions: test1-9, test1-8 2021-04-20 14:32:23.789 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Found no committed offset for partition test1-8 2021-04-20 14:32:23.789 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Found no committed offset for partition test1-6 2021-04-20 14:32:23.789 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Found no committed offset for partition test1-2 2021-04-20 14:32:23.789 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Found no committed offset for partition test1-1 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-1, groupId=listener1] Setting offset for partition test1-0 to the committed offset FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}} 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-5 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}} 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-4, groupId=listener1] Setting offset for partition test1-9 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}} 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-3, groupId=listener1] Setting offset for partition test1-7 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}} 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-4 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}} 2021-04-20 14:32:23.791 INFO 6672 --- [listener1-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-listener1-2, groupId=listener1] Setting offset for partition test1-3 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:29092 (id: 1 rack: null)], epoch=0}} 2021-04-20 14:32:23.792 INFO 6672 --- [listener1-1-C-1] o.s.k.l.KafkaMessageListenerContainer : listener1: partitions assigned: [test1-5, test1-4, test1-3] 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-3-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-listener1-4, groupId=listener1] Resetting offset for partition test1-8 to offset 0. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-2-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-listener1-3, groupId=listener1] Resetting offset for partition test1-6 to offset 0. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-listener1-1, groupId=listener1] Resetting offset for partition test1-2 to offset 0. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-listener1-1, groupId=listener1] Resetting offset for partition test1-1 to offset 0. 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-3-C-1] o.s.k.l.KafkaMessageListenerContainer : listener1: partitions assigned: [test1-9, test1-8] 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-2-C-1] o.s.k.l.KafkaMessageListenerContainer : listener1: partitions assigned: [test1-6, test1-7] 2021-04-20 14:32:23.804 INFO 6672 --- [listener1-0-C-1] o.s.k.l.KafkaMessageListenerContainer : listener1: partitions assigned: [test1-0, test1-2, test1-1] 2021-04-20 14:32:23.817 INFO 6672 --- [listener1-0-C-1] c.z.s.cbaConnector.KafkaConsumer : [KafakaConsumer][consume] thread:21 received message:{&quot;id&quot;:1618900340}

huangapple
  • 本文由 发表于 2020年9月18日 06:00:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/63946696.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定