英文:
C# Kafka IConsumer's Subscribe method doesn't consume any messages but Assign methods works fine
问题
尝试使用以下代码创建一个简单的C# Kafka消费者:
private static CancellationTokenSource StartConsumer(IAdminClient client, string topicName)
{
ConsumerConfig config = new()
{
BootstrapServers = BootstrapServers,
GroupId = "testConsumerGroup",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
IConsumer<Null, string> consumer = new ConsumerBuilder<Null, string>(config).Build();
// consumer.Assign(new TopicPartitionOffset(topicName, 0, Offset.Beginning)); // 这个方法可行
consumer.Subscribe(topicName); // 这个方法不起作用
CancellationTokenSource cancellationTokenSource = new();
CancellationToken cancellationToken = cancellationTokenSource.Token;
Task.Run(() =>
{
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<Null, string> consumeResult = consumer.Consume();
Console.WriteLine($"{consumeResult.Offset}: {consumeResult.Message.Value}");
}
consumer.Close();
consumer.Dispose();
});
return cancellationTokenSource;
}
当使用Assign方法时,消费者能够正常消费消息。但是当尝试使用Subscribe方法时,消费者无法消费任何消息。方法consumer.Consume()永远不会返回。
在调试时发现,在调用consumer.Subscribe(topicName)后,consumer.Assignment列表为空。根据https://github.com/confluentinc/confluent-kafka-dotnet/issues/278,我的猜测是由于某种原因,协调器没有将任何分区分配给我的消费者。
创建主题的方法如下:
private static async Task<string> CreateTopic(IAdminClient client, string topicName)
{
await client.CreateTopicsAsync(new TopicSpecification[]
{
new TopicSpecification()
{
Name = topicName,
ReplicationFactor = 1,
NumPartitions = 1
}
});
return topicName;
}
系统详情:
- 操作系统 = Windows 10
- Kafka版本 = 3.4.0
- Java = jdk 1.8.0_202 (32位)
- Confluent.Kafka NuGet版本 = 2.0.2
英文:
I'm trying to create a simple Kafka consumer in c# using the below code
private static CancellationTokenSource StartConsumer(IAdminClient client, string topicName)
{
ConsumerConfig config = new()
{
BootstrapServers = BootstrapServers,
GroupId = "testConsumerGroup",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
IConsumer<Null, string> consumer = new ConsumerBuilder<Null, string>(config).Build();
//consumer.Assign(new TopicPartitionOffset(topicName, 0, Offset.Beginning));//this works
consumer.Subscribe(topicName);//this doesn't work
CancellationTokenSource cancellationTokenSource = new();
CancellationToken cancellationToken = cancellationTokenSource.Token;
Task.Run(() =>
{
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<Null, string> consumeResult = consumer.Consume();
Console.WriteLine($"{consumeResult.Offset}: {consumeResult.Message.Value}");
}
consumer.Close();
consumer.Dispose();
});
return cancellationTokenSource;
}
When I use the Assign method, my consumer is able to consume the messages just fine. But when I try to use the Subscribe method, my consumer is not able to consume any messages. The method consumer.Consume() never returns.
When I tried to debugging I found that after calling the consumer.Subscribe(topicName) the consumer.Assignment list is empty. Based on this https://github.com/confluentinc/confluent-kafka-dotnet/issues/278 my guess is that for some reason the coordinator is not assigning any partition to my consumer.
I'm creating the topic like this
private static async Task<string> CreateTopic(IAdminClient client, string topicName)
{
await client.CreateTopicsAsync(new TopicSpecification[]
{
new TopicSpecification()
{
Name = topicName,
ReplicationFactor = 1,
NumPartitions = 1
}
});
return topicName;
}
System Details
- OS = Windows 10
- Kafka Version = 3.4.0
- Java = jdk 1.8.0_202 (32 bit)
- Confluent.Kafka NuGet = 2.0.2
答案1
得分: 0
经过数小时的反复尝试,最终我成功解决了这个问题。问题出在Java上。我之前使用的是32位版本,当我将它更换为来自OpenJDK的64位版本的Java 8时,问题得以解决。确切的版本是:8u362-b09。
英文:
After hours of trial and error finally I was able to resolve the issue. The problem was Java. I previously had 32bit version, when I changed it to 64bit version of Java 8 from OpenJDK it resolved the issue. Exact version: 8u362-b09.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论