Simple C# Kafka Producer with Schema Registry Implementation throws Invalid Receive Size

huangapple go评论121阅读模式
英文:

Simple C# Kafka Producer with Schema Regitry Implementation throws Invalid Receive Size

问题

我正在尝试使用C#在本地机器上学习Kafka。我有一个使用字符串工作的生产者/消费者/流,现在正在尝试使用模式注册允许复杂类型。

我尝试使用以下控制台应用程序注册一个类:

  1. static async Task Main(string[] args)
  2. {
  3. var schemaRegistryConfig = new SchemaRegistryConfig
  4. {
  5. Url = "http://localhost:29092"
  6. };
  7. var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
  8. var schema = @"{
  9. ""type"": ""TotallyCoolCustomClass"",
  10. ""properties"": {
  11. ""FavouriteQuote"": {""type"": ""string""},
  12. ""FavouriteNumber"": {""type"": ""integer""}
  13. }
  14. }";
  15. var subject = "SimpleTest";
  16. var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
  17. }

每当我运行我的模式注册应用程序时,我会收到一个Http异常:

  1. System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'

并且Docker在我的Kafka实例下报告了以下错误:

  1. WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
  2. org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
  3. at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)

我的消息显然很小,并且没有超过默认的100MB限制,所以我不确定为什么它会这样认为。有人可以告诉我为什么我无法注册模式吗?

我已经使用以下配置设置了Kafka、Zookeeper和Schema Registry:

  1. version: '2'
  2. services:
  3. zookeeper:
  4. image: confluentinc/cp-zookeeper:latest
  5. environment:
  6. ZOOKEEPER_CLIENT_PORT: 2181
  7. ZOOKEEPER_TICK_TIME: 2000
  8. ports:
  9. - 22181:2181
  10. kafka:
  11. image: confluentinc/cp-kafka:latest
  12. depends_on:
  13. - zookeeper
  14. ports:
  15. - 29092:29092
  16. environment:
  17. KAFKA_BROKER_ID: 1
  18. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  19. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
  20. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  21. KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  22. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  23. schema-registry:
  24. image: confluentinc/cp-schema-registry:latest
  25. depends_on:
  26. - kafka
  27. ports:
  28. - 28081:28081
  29. environment:
  30. SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
  31. SCHEMA_REGISTRY_HOST_NAME: schema-registry
  32. SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
  33. # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081

并且使用以下方式进行生产(虽然我认为这在这个问题中主要是多余的,因为模式没有注册,所以发布永远不会起作用):

  1. public class KafkaPProducerHostedService : IHostedService
  2. {
  3. private readonly ILogger<KafkaPProducerHostedService> _logger;
  4. private readonly IProducer<string, TotallyCoolCustomClass> _producer;
  5. public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
  6. {
  7. _logger = logger;
  8. var config = new ProducerConfig();
  9. config.BootstrapServers = "localhost:29092";
  10. var schemaRegistryConfig = new SchemaRegistryConfig
  11. {
  12. Url = "http://localhost:28081/"
  13. };
  14. _producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
  15. .SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
  16. .Build();
  17. }
  18. public async Task StartAsync(CancellationToken cancellationToken)
  19. {
  20. int i = 0;
  21. var rand = new Random();
  22. while (true) {
  23. Thread.Sleep(2000);
  24. var customClass = new TotallyCoolCustomClass(rand);
  25. var key = $"UpdatedKey-{i}";
  26. await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
  27. {
  28. Key = key,
  29. Value = customClass
  30. },
  31. cancellationToken);
  32. Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
  33. i++;
  34. }
  35. }
  36. public Task StopAsync(CancellationToken cancellationToken)
  37. {
  38. _producer?.Dispose();
  39. return Task.CompletedTask;
  40. }
  41. }
英文:

I'm trying to learn Kafka using C# on my local machine. I had a producer/consumer/stream that work with strings and am now trying to allow for complex types using schema registry.

I'm attempting to register a class using the following console app:

  1. static async Task Main(string[] args)
  2. {
  3. var schemaRegistryConfig = new SchemaRegistryConfig
  4. {
  5. Url = &quot;http://localhost:29092&quot;
  6. };
  7. var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
  8. var schema = @&quot;{
  9. &quot;&quot;type&quot;&quot;: &quot;&quot;TotallyCoolCustomClass&quot;&quot;,
  10. &quot;&quot;properties&quot;&quot;: {
  11. &quot;&quot;FavouriteQuote&quot;&quot;: {&quot;&quot;type&quot;&quot;: &quot;&quot;string&quot;&quot;},
  12. &quot;&quot;FavouriteNumber&quot;&quot;: {&quot;&quot;type&quot;&quot;: &quot;&quot;integer&quot;&quot;}
  13. }
  14. }&quot;;
  15. var subject = &quot;SimpleTest&quot;;
  16. var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
  17. }

Whenever I run my schema registry app, I get an Http exception:

  1. System.Net.Http.HttpRequestException: &#39;[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.&#39;

and docker reports this under my kafka instance:

  1. WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
  2. org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
  3. at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)

My message is obviously small, and not breaking the default 100Mb - so I'm not sure why it thinks it is. Can anyone tell me why I'm unable to register a schema?

I've set up Kafka, Zookeeper and a Schema Registry using this:

  1. version: &#39;2&#39;
  2. services:
  3. zookeeper:
  4. image: confluentinc/cp-zookeeper:latest
  5. environment:
  6. ZOOKEEPER_CLIENT_PORT: 2181
  7. ZOOKEEPER_TICK_TIME: 2000
  8. ports:
  9. - 22181:2181
  10. kafka:
  11. image: confluentinc/cp-kafka:latest
  12. depends_on:
  13. - zookeeper
  14. ports:
  15. - 29092:29092
  16. environment:
  17. KAFKA_BROKER_ID: 1
  18. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  19. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
  20. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  21. KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  22. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  23. schema-registry:
  24. image: confluentinc/cp-schema-registry:latest
  25. depends_on:
  26. - kafka
  27. ports:
  28. - 28081:28081
  29. environment:
  30. SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: &#39;zookeeper:2181&#39;
  31. SCHEMA_REGISTRY_HOST_NAME: schema-registry
  32. SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: &#39;PLAINTEXT://kafka:9092&#39;
  33. # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081

And produce to it with this (though I think this is largely redundant for the question, as the schema isn't registered so the publish will never work):

  1. public class KafkaPProducerHostedService : IHostedService
  2. {
  3. private readonly ILogger&lt;KafkaPProducerHostedService&gt; _logger;
  4. private readonly IProducer&lt;string, TotallyCoolCustomClass&gt; _producer;
  5. public KafkaPProducerHostedService(ILogger&lt;KafkaPProducerHostedService&gt; logger)
  6. {
  7. _logger = logger;
  8. var config = new ProducerConfig();
  9. config.BootstrapServers = &quot;localhost:29092&quot;;
  10. var schemaRegistryConfig = new SchemaRegistryConfig
  11. {
  12. Url = &quot;http://localhost:28081/&quot;
  13. };
  14. //video on serializers
  15. _producer = new ProducerBuilder&lt;string, TotallyCoolCustomClass&gt;(config)
  16. .SetValueSerializer(new JsonSerializer&lt;TotallyCoolCustomClass&gt;(new CachedSchemaRegistryClient(schemaRegistryConfig)))
  17. .Build();
  18. }
  19. public async Task StartAsync(CancellationToken cancellationToken)
  20. {
  21. int i = 0;
  22. var rand = new Random();
  23. while (true) {
  24. Thread.Sleep(2000);
  25. var customClass = new TotallyCoolCustomClass(rand);
  26. var key = $&quot;UpdatedKey-{i}&quot;;
  27. await _producer.ProduceAsync(&quot;SimpleTest&quot;, new Message&lt;string, TotallyCoolCustomClass&gt;()
  28. {
  29. Key = key,
  30. Value = customClass
  31. },
  32. cancellationToken);
  33. Console.WriteLine($&quot;Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}&quot;);
  34. i++;
  35. }
  36. }
  37. public Task StopAsync(CancellationToken cancellationToken)
  38. {
  39. _producer?.Dispose();
  40. return Task.CompletedTask;
  41. }
  42. }

答案1

得分: 1

你已配置了模式注册表客户端以将数据发送到Kafka代理,但Kafka代理不是HTTP服务器。

请使用端口28081。

然而,你已注释掉了 SCHEMA_REGISTRY_LISTENERS,因此端口映射是不正确的,应该使用默认的8081端口。

> 模式未注册,因此发布将永远无法工作。

生产者会自动注册模式,除非你真的需要在某处手动获取模式ID,否则不需要手动注册。

英文:

You've configured your schema registry client to send data at the Kafka broker, which is not an HTTP server

Use port 28081

However you've commented SCHEMA_REGISTRY_LISTENERS, therefore the port mapping is incorrect, and should be the default 8081

> schema isn't registered so the publish will never work

Producers automatically register schemas. There's no need to do that manually unless you really need the schema ID somewhere

huangapple
  • 本文由 发表于 2023年6月26日 20:33:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/76556720.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定