英文:
Simple C# Kafka Producer with Schema Regitry Implementation throws Invalid Receive Size
问题
我正在尝试使用C#在本地机器上学习Kafka。我有一个使用字符串工作的生产者/消费者/流,现在正在尝试使用模式注册允许复杂类型。
我尝试使用以下控制台应用程序注册一个类:
static async Task Main(string[] args)
{
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:29092"
};
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
var schema = @"{
""type"": ""TotallyCoolCustomClass"",
""properties"": {
""FavouriteQuote"": {""type"": ""string""},
""FavouriteNumber"": {""type"": ""integer""}
}
}";
var subject = "SimpleTest";
var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
}
每当我运行我的模式注册应用程序时,我会收到一个Http异常:
System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'
并且Docker在我的Kafka实例下报告了以下错误:
WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)
我的消息显然很小,并且没有超过默认的100MB限制,所以我不确定为什么它会这样认为。有人可以告诉我为什么我无法注册模式吗?
我已经使用以下配置设置了Kafka、Zookeeper和Schema Registry:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:latest
depends_on:
- kafka
ports:
- 28081:28081
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081
并且使用以下方式进行生产(虽然我认为这在这个问题中主要是多余的,因为模式没有注册,所以发布永远不会起作用):
public class KafkaPProducerHostedService : IHostedService
{
private readonly ILogger<KafkaPProducerHostedService> _logger;
private readonly IProducer<string, TotallyCoolCustomClass> _producer;
public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
{
_logger = logger;
var config = new ProducerConfig();
config.BootstrapServers = "localhost:29092";
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:28081/"
};
_producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
.SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
.Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
int i = 0;
var rand = new Random();
while (true) {
Thread.Sleep(2000);
var customClass = new TotallyCoolCustomClass(rand);
var key = $"UpdatedKey-{i}";
await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
{
Key = key,
Value = customClass
},
cancellationToken);
Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
i++;
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
return Task.CompletedTask;
}
}
英文:
I'm trying to learn Kafka using C# on my local machine. I had a producer/consumer/stream that work with strings and am now trying to allow for complex types using schema registry.
I'm attempting to register a class using the following console app:
static async Task Main(string[] args)
{
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:29092"
};
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
var schema = @"{
""type"": ""TotallyCoolCustomClass"",
""properties"": {
""FavouriteQuote"": {""type"": ""string""},
""FavouriteNumber"": {""type"": ""integer""}
}
}";
var subject = "SimpleTest";
var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
}
Whenever I run my schema registry app, I get an Http exception:
System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'
and docker reports this under my kafka instance:
WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)
My message is obviously small, and not breaking the default 100Mb - so I'm not sure why it thinks it is. Can anyone tell me why I'm unable to register a schema?
I've set up Kafka, Zookeeper and a Schema Registry using this:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:latest
depends_on:
- kafka
ports:
- 28081:28081
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081
And produce to it with this (though I think this is largely redundant for the question, as the schema isn't registered so the publish will never work):
public class KafkaPProducerHostedService : IHostedService
{
private readonly ILogger<KafkaPProducerHostedService> _logger;
private readonly IProducer<string, TotallyCoolCustomClass> _producer;
public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
{
_logger = logger;
var config = new ProducerConfig();
config.BootstrapServers = "localhost:29092";
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:28081/"
};
//video on serializers
_producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
.SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
.Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
int i = 0;
var rand = new Random();
while (true) {
Thread.Sleep(2000);
var customClass = new TotallyCoolCustomClass(rand);
var key = $"UpdatedKey-{i}";
await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
{
Key = key,
Value = customClass
},
cancellationToken);
Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
i++;
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
return Task.CompletedTask;
}
}
答案1
得分: 1
你已配置了模式注册表客户端以将数据发送到Kafka代理,但Kafka代理不是HTTP服务器。
请使用端口28081。
然而,你已注释掉了 SCHEMA_REGISTRY_LISTENERS
,因此端口映射是不正确的,应该使用默认的8081端口。
> 模式未注册,因此发布将永远无法工作。
生产者会自动注册模式,除非你真的需要在某处手动获取模式ID,否则不需要手动注册。
英文:
You've configured your schema registry client to send data at the Kafka broker, which is not an HTTP server
Use port 28081
However you've commented SCHEMA_REGISTRY_LISTENERS
, therefore the port mapping is incorrect, and should be the default 8081
> schema isn't registered so the publish will never work
Producers automatically register schemas. There's no need to do that manually unless you really need the schema ID somewhere
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论