C# Kafka IConsumer的Subscribe方法不会消费任何消息,但Assign方法可以正常工作。

huangapple go评论75阅读模式
英文:

C# Kafka IConsumer's Subscribe method doesn't consume any messages but Assign methods works fine

问题

尝试使用以下代码创建一个简单的C# Kafka消费者:

private static CancellationTokenSource StartConsumer(IAdminClient client, string topicName)
{
    ConsumerConfig config = new()
    {
        BootstrapServers = BootstrapServers,
        GroupId = "testConsumerGroup",
        AutoOffsetReset = AutoOffsetReset.Earliest,
    };
    
    IConsumer<Null, string> consumer = new ConsumerBuilder<Null, string>(config).Build();
    // consumer.Assign(new TopicPartitionOffset(topicName, 0, Offset.Beginning)); // 这个方法可行
    consumer.Subscribe(topicName); // 这个方法不起作用

    CancellationTokenSource cancellationTokenSource = new();
    CancellationToken cancellationToken = cancellationTokenSource.Token;
    Task.Run(() =>
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            ConsumeResult<Null, string> consumeResult = consumer.Consume();
            Console.WriteLine($"{consumeResult.Offset}: {consumeResult.Message.Value}");
        }
        consumer.Close();
        consumer.Dispose();
    });

    return cancellationTokenSource;
}

当使用Assign方法时,消费者能够正常消费消息。但是当尝试使用Subscribe方法时,消费者无法消费任何消息。方法consumer.Consume()永远不会返回。

在调试时发现,在调用consumer.Subscribe(topicName)后,consumer.Assignment列表为空。根据https://github.com/confluentinc/confluent-kafka-dotnet/issues/278,我的猜测是由于某种原因,协调器没有将任何分区分配给我的消费者。

创建主题的方法如下:

private static async Task<string> CreateTopic(IAdminClient client, string topicName)
{
    await client.CreateTopicsAsync(new TopicSpecification[]
    {
        new TopicSpecification()
        {
            Name = topicName,
            ReplicationFactor = 1,
            NumPartitions = 1
        }
    });
    return topicName;
}

系统详情:

  • 操作系统 = Windows 10
  • Kafka版本 = 3.4.0
  • Java = jdk 1.8.0_202 (32位)
  • Confluent.Kafka NuGet版本 = 2.0.2
英文:

I'm trying to create a simple Kafka consumer in c# using the below code

private static CancellationTokenSource StartConsumer(IAdminClient client, string topicName)
{
    ConsumerConfig config = new()
    {
        BootstrapServers = BootstrapServers,
        GroupId = &quot;testConsumerGroup&quot;,
        AutoOffsetReset = AutoOffsetReset.Earliest,

    };
    IConsumer&lt;Null, string&gt; consumer = new ConsumerBuilder&lt;Null, string&gt;(config).Build();
    //consumer.Assign(new TopicPartitionOffset(topicName, 0, Offset.Beginning));//this works
    consumer.Subscribe(topicName);//this doesn&#39;t work

    CancellationTokenSource cancellationTokenSource = new();
    CancellationToken cancellationToken = cancellationTokenSource.Token;
    Task.Run(() =&gt;
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            ConsumeResult&lt;Null, string&gt; consumeResult = consumer.Consume();
            Console.WriteLine($&quot;{consumeResult.Offset}: {consumeResult.Message.Value}&quot;);
        }
        consumer.Close();
        consumer.Dispose();
    });

    return cancellationTokenSource;
}

When I use the Assign method, my consumer is able to consume the messages just fine. But when I try to use the Subscribe method, my consumer is not able to consume any messages. The method consumer.Consume() never returns.

When I tried to debugging I found that after calling the consumer.Subscribe(topicName) the consumer.Assignment list is empty. Based on this https://github.com/confluentinc/confluent-kafka-dotnet/issues/278 my guess is that for some reason the coordinator is not assigning any partition to my consumer.

I'm creating the topic like this

private static async Task&lt;string&gt; CreateTopic(IAdminClient client, string topicName)
{
    await client.CreateTopicsAsync(new TopicSpecification[]
    {
        new TopicSpecification()
        {
            Name = topicName,
            ReplicationFactor = 1,
            NumPartitions = 1
        }
    });
    return topicName;
}

System Details

  • OS = Windows 10
  • Kafka Version = 3.4.0
  • Java = jdk 1.8.0_202 (32 bit)
  • Confluent.Kafka NuGet = 2.0.2

答案1

得分: 0

经过数小时的反复尝试,最终我成功解决了这个问题。问题出在Java上。我之前使用的是32位版本,当我将它更换为来自OpenJDK的64位版本的Java 8时,问题得以解决。确切的版本是:8u362-b09。

英文:

After hours of trial and error finally I was able to resolve the issue. The problem was Java. I previously had 32bit version, when I changed it to 64bit version of Java 8 from OpenJDK it resolved the issue. Exact version: 8u362-b09.

huangapple
  • 本文由 发表于 2023年2月23日 19:29:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/75544207.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定