英文:
Simple C# Kafka Producer with Schema Regitry Implementation throws Invalid Receive Size
问题
我正在尝试使用C#在本地机器上学习Kafka。我有一个使用字符串工作的生产者/消费者/流,现在正在尝试使用模式注册允许复杂类型。
我尝试使用以下控制台应用程序注册一个类:
static async Task Main(string[] args)
{
    var schemaRegistryConfig = new SchemaRegistryConfig
    {
        Url = "http://localhost:29092"
    };
    var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
    var schema = @"{
        ""type"": ""TotallyCoolCustomClass"",
        ""properties"": {
            ""FavouriteQuote"": {""type"": ""string""},
            ""FavouriteNumber"": {""type"": ""integer""}
        }
    }";
    var subject = "SimpleTest";
    var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
}
每当我运行我的模式注册应用程序时,我会收到一个Http异常:
System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'
并且Docker在我的Kafka实例下报告了以下错误:
WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)
我的消息显然很小,并且没有超过默认的100MB限制,所以我不确定为什么它会这样认为。有人可以告诉我为什么我无法注册模式吗?
我已经使用以下配置设置了Kafka、Zookeeper和Schema Registry:
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    depends_on: 
      - kafka
    ports:
      - 28081:28081
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081
并且使用以下方式进行生产(虽然我认为这在这个问题中主要是多余的,因为模式没有注册,所以发布永远不会起作用):
public class KafkaPProducerHostedService : IHostedService
{
    private readonly ILogger<KafkaPProducerHostedService> _logger;
    private readonly IProducer<string, TotallyCoolCustomClass> _producer;
    public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
    {
        _logger = logger;
        var config = new ProducerConfig();
        config.BootstrapServers = "localhost:29092";
        var schemaRegistryConfig = new SchemaRegistryConfig
        {
            Url = "http://localhost:28081/"
        };
        _producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
            .SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
            .Build();
    }
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        int i = 0;
        var rand = new Random();
        while (true) {
            Thread.Sleep(2000);
            var customClass = new TotallyCoolCustomClass(rand);
            var key = $"UpdatedKey-{i}";
            await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
            {
                Key = key,
                Value = customClass
            },                    
            cancellationToken);
            Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
            i++;
        }
    }
    public Task StopAsync(CancellationToken cancellationToken)
    {
        _producer?.Dispose();
        return Task.CompletedTask;
    }
}
英文:
I'm trying to learn Kafka using C# on my local machine. I had a producer/consumer/stream that work with strings and am now trying to allow for complex types using schema registry.
I'm attempting to register a class using the following console app:
    static async Task Main(string[] args)
    {
        var schemaRegistryConfig = new SchemaRegistryConfig
        {
            Url = "http://localhost:29092"
        };
        var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
        var schema = @"{
            ""type"": ""TotallyCoolCustomClass"",
            ""properties"": {
                ""FavouriteQuote"": {""type"": ""string""},
                ""FavouriteNumber"": {""type"": ""integer""}
            }
        }";
        var subject = "SimpleTest";
        var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
    }
Whenever I run my schema registry app, I get an Http exception:
System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'
and docker reports this under my kafka instance:
    WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)
My message is obviously small, and not breaking the default 100Mb - so I'm not sure why it thinks it is. Can anyone tell me why I'm unable to register a schema?
I've set up Kafka, Zookeeper and a Schema Registry using this:
   version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    depends_on: 
      - kafka
    ports:
      - 28081:28081
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081
And produce to it with this (though I think this is largely redundant for the question, as the schema isn't registered so the publish will never work):
public class KafkaPProducerHostedService : IHostedService
        {
            private readonly ILogger<KafkaPProducerHostedService> _logger;
            private readonly IProducer<string, TotallyCoolCustomClass> _producer;
            public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
            {
                _logger = logger;
                var config = new ProducerConfig();
                config.BootstrapServers = "localhost:29092";
                var schemaRegistryConfig = new SchemaRegistryConfig
                {
                    Url = "http://localhost:28081/"
                };
                //video on serializers
                _producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
                    .SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
                    .Build();
            }
            public async Task StartAsync(CancellationToken cancellationToken)
            {
                int i = 0;
                var rand = new Random();
                while (true) {
                    Thread.Sleep(2000);
                    var customClass = new TotallyCoolCustomClass(rand);
                    var key = $"UpdatedKey-{i}";
                    await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
                    {
                        Key = key,
                        Value = customClass
                    },                    
                    cancellationToken);
                    Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
                    i++;
                }
            }
            public Task StopAsync(CancellationToken cancellationToken)
            {
                _producer?.Dispose();
                return Task.CompletedTask;
            }
        }
答案1
得分: 1
你已配置了模式注册表客户端以将数据发送到Kafka代理,但Kafka代理不是HTTP服务器。
请使用端口28081。
然而,你已注释掉了 SCHEMA_REGISTRY_LISTENERS,因此端口映射是不正确的,应该使用默认的8081端口。
> 模式未注册,因此发布将永远无法工作。
生产者会自动注册模式,除非你真的需要在某处手动获取模式ID,否则不需要手动注册。
英文:
You've configured your schema registry client to send data at the Kafka broker, which is not an HTTP server
Use port 28081
However you've commented SCHEMA_REGISTRY_LISTENERS, therefore the port mapping is incorrect, and should be the default 8081
> schema isn't registered so the publish will never work
Producers automatically register schemas. There's no need to do that manually unless you really need the schema ID somewhere
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论