C#和Python在模式注册表上使用相同架构的集成失败。

huangapple go评论74阅读模式
英文:

Failed integration between C# and Python using same schema on schema registry

问题

我们有一段C#代码,它在Kafka上生成记录并使用Avro模式,我们也可以用C#消费同一个主题,但是当我们尝试在Python中消费时,我们会收到以下错误信息:

消息反序列化失败:在[主题名称] [4] 偏移 17 处的消息反序列化失败:消息不以魔术字节开头

当我们使用Python生成并使用Python消费时,这个过程也是正常的。

P.S. 在C#中使用了IAsyncSerializer,在Python代码中使用了AvroConsumer

我们尝试过在C#中序列化记录,并根据相同的Avro模式在模式注册表中反序列化记录。

问题:

如果您以前有类似的经验,是否可以帮助我?

英文:

Problem Description

We have C# code which is producing records on Kafka and use an Avro schema, we can consume the same topic by C# as well, but when we want to consume it in Python, we get the following error:
> Message deserialization failed: Message deserialization failed for message at [topic name] [4] offset 17: message does not start with magic byte`

and also this procedure is fine when we produce by Python and consume by Python as well.

P.S. IAsyncSerializer is used in C# and AvroConsumer in Python code.

We have tried to serialize records in C# and deserialize the in Python based on the same Avro schema on schema registry.

Question:

Could you please help me if you had such experience before?

答案1

得分: 1

以下是您提供的代码的中文翻译部分:

这是在此主题中提出问题的C#代码。
可以通过以下代码行检索模式,然后将其转换为Avro模式

GetSchemaRegistry().GetLatestSchemaAsync("SchemaRegistrySubject").Result.Schema

-----------------------------------------------------------------

使用Avro.IO;
使用Avro.Specific;
使用Confluent.Kafka;

public class KafkaAvroSerializer<T> : IAsyncSerializer<T>
where T : class, ISpecificRecord
{
    public Task<byte[]> SerializeAsync(T data, SerializationContext context)
    {
        return Task.Run(() =>
        {
            using (var ms = new MemoryStream())
            {
                var enc = new BinaryEncoder(ms);
                var writer = new SpecificDefaultWriter(data.Schema);
                writer.Write(data, enc);
                return ms.ToArray();
            }
        });
    }
}

-------------------------------------------------------------------------------

使用Avro;
使用Avro.Specific;

public class MyRecord : ISpecificRecord
{
    public Schema Schema =>
    Schema.Parse(@"
    {
        ""name"": ""name"",
        ""namespace"": ""namespace"",
        ""type"": ""record"",
        ""fields"": [
            {
                ""name"": ""field1"",
                ""type"": [
                    ""null"",
                    ""int""
                ],
                ""default"": null
            },
            {
                ""name"": ""field2"",
                ""type"": [
                    ""null"",
                    ""string""
                ],
                ""default"": null
            },
            {
                ""name"": ""field3"",
                ""type"": [
                    ""null"",
                    ""string""
                ],
                ""default"": null
            }
        ]
    }");

    public int field1 { get; set; }
    public string field2 { get; set; }
    public string field3 { get; set; }

    public object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return field1;
            case 1: return field2;
            case 2: return field3;
            default: throw new AvroRuntimeException("在Get()中的无效索引:" + fieldPos);
        };
    }

    public void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: field1 = (int)fieldValue; break;
            case 1: field2 = (string)fieldValue; break;
            case 2: field3 = (string)fieldValue; break;
            default: throw new AvroRuntimeException("在Put()中的无效索引:" + fieldPos);
        };
    }
}

------------------------------------------------------------------------------

使用Confluent.Kafka;
使用Confluent.SchemaRegistry;
public async Task Publish<T>(T @event) where T
{
    using (var producer = GetProducer())
    {
        try
        {
            if (@event == null)
            {
                return;
            }

            var kafkaEvent = KafkaEventFactory.Create(@event);

            var avroModel = AvroModelFactory.Create(kafkaEvent);
            var message = new Message<string, MyRecord>() { Value = avroModel, Key = kafkaEvent.Key };

            var deliveryResult = await producer.ProduceAsync(_kafkaOption.Topic, message);
            _logger.LogInformation($"已交付 '{deliveryResult.Value}' 到 '{deliveryResult.TopicPartitionOffset}'");
        }
        catch (ProduceException<string, MyRecord> e)
        {
            _logger.LogError($"交付失败:{e.Error.Reason}");
            throw;
        }
    }
}
private IProducer<string, MyRecord> GetProducer()
{
    var config = GetProducerConfig();
    var schemaRegistry = GetSchemaRegistry();

    var producer = new ProducerBuilder<string, MyRecord >(config)
        .SetErrorHandler((_, error) =>
        {
            _logger.LogError(error.ToString());
        })
        .SetValueSerializer(new KafkaAvroSerializer< MyRecord >())
        .Build();

    return producer;
}

private ProducerConfig GetProducerConfig()
{
    return new ProducerConfig
    {
        BootstrapServers = "BootstrapServers"
    };
}

private CachedSchemaRegistryClient GetSchemaRegistry()
{
    return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = "SchemaRegistryUrl" });
}

希望这对您有所帮助!如果您需要进一步的翻译或有其他问题,请随时提问。

英文:

This is the C# code which was questioned in this topic.
The schema can be retrieved by this line of code and it can be convertet to Avro schema.

GetSchemaRegistry().GetLatestSchemaAsync("SchemaRegistrySubject").Result.Schema


using Avro.IO;
using Avro.Specific;
using Confluent.Kafka;
public class KafkaAvroSerializer&lt;T&gt; : IAsyncSerializer&lt;T&gt;
where T : class, ISpecificRecord
{
public Task&lt;byte[]&gt; SerializeAsync(T data, SerializationContext context)
{
return Task.Run(() =&gt;
{
using (var ms = new MemoryStream())
{
var enc = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(data.Schema);
writer.Write(data, enc);
return ms.ToArray();
}
});
}
}

using Avro;
using Avro.Specific;
public class MyRecord : ISpecificRecord
{
public Schema Schema =&gt;
Schema.Parse(@”Schema.Parse @&quot;{
&quot;&quot;name&quot;&quot;: &quot;&quot;name &quot;&quot;,
&quot;&quot;namespace&quot;&quot;: &quot;&quot;namespace&quot;&quot;,
&quot;&quot;type&quot;&quot;: &quot;&quot;record&quot;&quot;,
&quot;&quot;fields&quot;&quot;: [
{
&quot;&quot;name&quot;&quot;: &quot;&quot;field1&quot;&quot;,
&quot;&quot;type&quot;&quot;: [
&quot;&quot;null&quot;&quot;,
&quot;&quot;int&quot;&quot;
],
&quot;&quot;default&quot;&quot;: null
},
{
&quot;&quot;name&quot;&quot;: &quot;&quot;field2&quot;&quot;,
&quot;&quot;type&quot;&quot;: [
&quot;&quot;null&quot;&quot;,
&quot;&quot;string&quot;&quot;
],
&quot;&quot;default&quot;&quot;: null
},
{
&quot;&quot;name&quot;&quot;: &quot;&quot;field3&quot;&quot;,
&quot;&quot;type&quot;&quot;: [
&quot;&quot;null&quot;&quot;,
&quot;&quot;string&quot;&quot;
],
&quot;&quot;default&quot;&quot;: null
}
]
}&quot;);
public int field1 { get; set; }
public string field2 { get; set; }
public string field3 { get; set; }
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return field1;
case 1: return field2;
case 2: return field3;
default: throw new AvroRuntimeException(&quot;Bad index &quot; + fieldPos + &quot; in 
Get()&quot;);
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: field1= (int)fieldValue; break;
case 1: field2= (string)fieldValue; break;
case 2: field3= (string)fieldValue; break;
default: throw new AvroRuntimeException(&quot;Bad index &quot; + fieldPos + &quot; in 
Put()&quot;);
};
}
}

using Confluent.Kafka;
using Confluent.SchemaRegistry;
public async Task Publish&lt;T&gt;(T @event) where T 
{
using (var producer = GetProducer())
{
try
{
if (@event == null)
{
return;
}
var kafkaEvent = KafkaEventFactory.Create(@event);
var avroModel = AvroModelFactory.Create(kafkaEvent);
var message = new Message&lt;string, MyRecord&gt;() { Value = avroModel, Key = kafkaEvent.Key };
var deliveryResult = await producer.ProduceAsync(_kafkaOption.Topic, message);
_logger.LogInformation($&quot;Delivered &#39;{deliveryResult.Value}&#39; to &#39;{deliveryResult.TopicPartitionOffset}&#39;&quot;);
}
catch (ProduceException&lt;string, MyRecord&gt; e)
{
_logger.LogError($&quot;Delivery failed: {e.Error.Reason}&quot;);
throw;
}
}
}
private IProducer&lt;string, MyRecord&gt; GetProducer()
{
var config = GetProducerConfig();
var schemaRegistry = GetSchemaRegistry();
var producer = new ProducerBuilder&lt;string, MyRecord &gt;(config)
.SetErrorHandler((_, error) =&gt;
{
_logger.LogError(error.ToString());
})
.SetValueSerializer(new KafkaAvroSerializer&lt; MyRecord &gt;())
.Build();
return producer;
}
private ProducerConfig GetProducerConfig()
{
return new ProducerConfig
{
BootstrapServers = “BootstrapServers”
};
}
private CachedSchemaRegistryClient GetSchemaRegistry()
{
return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = 
“SchemaRegistryUrl” });
}

huangapple
  • 本文由 发表于 2023年7月24日 18:29:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76753579.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定