Simple C# Kafka Producer with Schema Registry Implementation throws Invalid Receive Size

huangapple go评论75阅读模式
英文:

Simple C# Kafka Producer with Schema Regitry Implementation throws Invalid Receive Size

问题

我正在尝试使用C#在本地机器上学习Kafka。我有一个使用字符串工作的生产者/消费者/流,现在正在尝试使用模式注册允许复杂类型。

我尝试使用以下控制台应用程序注册一个类:

static async Task Main(string[] args)
{
    var schemaRegistryConfig = new SchemaRegistryConfig
    {
        Url = "http://localhost:29092"
    };

    var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

    var schema = @"{
        ""type"": ""TotallyCoolCustomClass"",
        ""properties"": {
            ""FavouriteQuote"": {""type"": ""string""},
            ""FavouriteNumber"": {""type"": ""integer""}
        }
    }";

    var subject = "SimpleTest";
    var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
}

每当我运行我的模式注册应用程序时,我会收到一个Http异常:

System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'

并且Docker在我的Kafka实例下报告了以下错误:

WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)

我的消息显然很小,并且没有超过默认的100MB限制,所以我不确定为什么它会这样认为。有人可以告诉我为什么我无法注册模式吗?

我已经使用以下配置设置了Kafka、Zookeeper和Schema Registry:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    depends_on: 
      - kafka
    ports:
      - 28081:28081
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081

并且使用以下方式进行生产(虽然我认为这在这个问题中主要是多余的,因为模式没有注册,所以发布永远不会起作用):

public class KafkaPProducerHostedService : IHostedService
{
    private readonly ILogger<KafkaPProducerHostedService> _logger;
    private readonly IProducer<string, TotallyCoolCustomClass> _producer;
    public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
    {
        _logger = logger;
        var config = new ProducerConfig();
        config.BootstrapServers = "localhost:29092";

        var schemaRegistryConfig = new SchemaRegistryConfig
        {
            Url = "http://localhost:28081/"
        };

        _producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
            .SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
            .Build();
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        int i = 0;

        var rand = new Random();

        while (true) {
            Thread.Sleep(2000);
            var customClass = new TotallyCoolCustomClass(rand);
            var key = $"UpdatedKey-{i}";
            await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
            {
                Key = key,
                Value = customClass
            },                    
            cancellationToken);

            Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
            i++;
        }
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _producer?.Dispose();
        return Task.CompletedTask;
    }
}
英文:

I'm trying to learn Kafka using C# on my local machine. I had a producer/consumer/stream that work with strings and am now trying to allow for complex types using schema registry.

I'm attempting to register a class using the following console app:

    static async Task Main(string[] args)
    {
        var schemaRegistryConfig = new SchemaRegistryConfig
        {
            Url = &quot;http://localhost:29092&quot;
        };

        var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

        var schema = @&quot;{
            &quot;&quot;type&quot;&quot;: &quot;&quot;TotallyCoolCustomClass&quot;&quot;,
            &quot;&quot;properties&quot;&quot;: {
                &quot;&quot;FavouriteQuote&quot;&quot;: {&quot;&quot;type&quot;&quot;: &quot;&quot;string&quot;&quot;},
                &quot;&quot;FavouriteNumber&quot;&quot;: {&quot;&quot;type&quot;&quot;: &quot;&quot;integer&quot;&quot;}
            }
        }&quot;;

        var subject = &quot;SimpleTest&quot;;
        var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
    }

Whenever I run my schema registry app, I get an Http exception:

System.Net.Http.HttpRequestException: &#39;[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.&#39;

and docker reports this under my kafka instance:

    WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)

My message is obviously small, and not breaking the default 100Mb - so I'm not sure why it thinks it is. Can anyone tell me why I'm unable to register a schema?

I've set up Kafka, Zookeeper and a Schema Registry using this:

   version: &#39;2&#39;
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    depends_on: 
      - kafka
    ports:
      - 28081:28081
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: &#39;zookeeper:2181&#39;
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: &#39;PLAINTEXT://kafka:9092&#39;
      # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081

And produce to it with this (though I think this is largely redundant for the question, as the schema isn't registered so the publish will never work):

public class KafkaPProducerHostedService : IHostedService
        {
            private readonly ILogger&lt;KafkaPProducerHostedService&gt; _logger;
            private readonly IProducer&lt;string, TotallyCoolCustomClass&gt; _producer;
            public KafkaPProducerHostedService(ILogger&lt;KafkaPProducerHostedService&gt; logger)
            {
                _logger = logger;
                var config = new ProducerConfig();
                config.BootstrapServers = &quot;localhost:29092&quot;;

                var schemaRegistryConfig = new SchemaRegistryConfig
                {
                    Url = &quot;http://localhost:28081/&quot;
                };

                //video on serializers
                _producer = new ProducerBuilder&lt;string, TotallyCoolCustomClass&gt;(config)
                    .SetValueSerializer(new JsonSerializer&lt;TotallyCoolCustomClass&gt;(new CachedSchemaRegistryClient(schemaRegistryConfig)))
                    .Build();
            }

            public async Task StartAsync(CancellationToken cancellationToken)
            {
                int i = 0;

                var rand = new Random();

                while (true) {
                    Thread.Sleep(2000);
                    var customClass = new TotallyCoolCustomClass(rand);
                    var key = $&quot;UpdatedKey-{i}&quot;;
                    await _producer.ProduceAsync(&quot;SimpleTest&quot;, new Message&lt;string, TotallyCoolCustomClass&gt;()
                    {
                        Key = key,
                        Value = customClass
                    },                    
                    cancellationToken);

                    Console.WriteLine($&quot;Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}&quot;);
                    i++;
                }
            }

            public Task StopAsync(CancellationToken cancellationToken)
            {
                _producer?.Dispose();
                return Task.CompletedTask;
            }
        }

答案1

得分: 1

你已配置了模式注册表客户端以将数据发送到Kafka代理,但Kafka代理不是HTTP服务器。

请使用端口28081。

然而,你已注释掉了 SCHEMA_REGISTRY_LISTENERS,因此端口映射是不正确的,应该使用默认的8081端口。

> 模式未注册,因此发布将永远无法工作。

生产者会自动注册模式,除非你真的需要在某处手动获取模式ID,否则不需要手动注册。

英文:

You've configured your schema registry client to send data at the Kafka broker, which is not an HTTP server

Use port 28081

However you've commented SCHEMA_REGISTRY_LISTENERS, therefore the port mapping is incorrect, and should be the default 8081

> schema isn't registered so the publish will never work

Producers automatically register schemas. There's no need to do that manually unless you really need the schema ID somewhere

huangapple
  • 本文由 发表于 2023年6月26日 20:33:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/76556720.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定