C#和Python在模式注册表上使用相同架构的集成失败。

huangapple go评论112阅读模式
英文:

Failed integration between C# and Python using same schema on schema registry

问题

我们有一段C#代码,它在Kafka上生成记录并使用Avro模式,我们也可以用C#消费同一个主题,但是当我们尝试在Python中消费时,我们会收到以下错误信息:

消息反序列化失败:在[主题名称] [4] 偏移 17 处的消息反序列化失败:消息不以魔术字节开头

当我们使用Python生成并使用Python消费时,这个过程也是正常的。

P.S. 在C#中使用了IAsyncSerializer,在Python代码中使用了AvroConsumer

我们尝试过在C#中序列化记录,并根据相同的Avro模式在模式注册表中反序列化记录。

问题:

如果您以前有类似的经验,是否可以帮助我?

英文:

Problem Description

We have C# code which is producing records on Kafka and use an Avro schema, we can consume the same topic by C# as well, but when we want to consume it in Python, we get the following error:
> Message deserialization failed: Message deserialization failed for message at [topic name] [4] offset 17: message does not start with magic byte`

and also this procedure is fine when we produce by Python and consume by Python as well.

P.S. IAsyncSerializer is used in C# and AvroConsumer in Python code.

We have tried to serialize records in C# and deserialize the in Python based on the same Avro schema on schema registry.

Question:

Could you please help me if you had such experience before?

答案1

得分: 1

以下是您提供的代码的中文翻译部分:

  1. 这是在此主题中提出问题的C#代码。
  2. 可以通过以下代码行检索模式,然后将其转换为Avro模式
  3. GetSchemaRegistry().GetLatestSchemaAsync("SchemaRegistrySubject").Result.Schema
  4. -----------------------------------------------------------------
  5. 使用Avro.IO;
  6. 使用Avro.Specific;
  7. 使用Confluent.Kafka;
  8. public class KafkaAvroSerializer<T> : IAsyncSerializer<T>
  9. where T : class, ISpecificRecord
  10. {
  11. public Task<byte[]> SerializeAsync(T data, SerializationContext context)
  12. {
  13. return Task.Run(() =>
  14. {
  15. using (var ms = new MemoryStream())
  16. {
  17. var enc = new BinaryEncoder(ms);
  18. var writer = new SpecificDefaultWriter(data.Schema);
  19. writer.Write(data, enc);
  20. return ms.ToArray();
  21. }
  22. });
  23. }
  24. }
  25. -------------------------------------------------------------------------------
  26. 使用Avro;
  27. 使用Avro.Specific;
  28. public class MyRecord : ISpecificRecord
  29. {
  30. public Schema Schema =>
  31. Schema.Parse(@"
  32. {
  33. ""name"": ""name"",
  34. ""namespace"": ""namespace"",
  35. ""type"": ""record"",
  36. ""fields"": [
  37. {
  38. ""name"": ""field1"",
  39. ""type"": [
  40. ""null"",
  41. ""int""
  42. ],
  43. ""default"": null
  44. },
  45. {
  46. ""name"": ""field2"",
  47. ""type"": [
  48. ""null"",
  49. ""string""
  50. ],
  51. ""default"": null
  52. },
  53. {
  54. ""name"": ""field3"",
  55. ""type"": [
  56. ""null"",
  57. ""string""
  58. ],
  59. ""default"": null
  60. }
  61. ]
  62. }");
  63. public int field1 { get; set; }
  64. public string field2 { get; set; }
  65. public string field3 { get; set; }
  66. public object Get(int fieldPos)
  67. {
  68. switch (fieldPos)
  69. {
  70. case 0: return field1;
  71. case 1: return field2;
  72. case 2: return field3;
  73. default: throw new AvroRuntimeException("在Get()中的无效索引:" + fieldPos);
  74. };
  75. }
  76. public void Put(int fieldPos, object fieldValue)
  77. {
  78. switch (fieldPos)
  79. {
  80. case 0: field1 = (int)fieldValue; break;
  81. case 1: field2 = (string)fieldValue; break;
  82. case 2: field3 = (string)fieldValue; break;
  83. default: throw new AvroRuntimeException("在Put()中的无效索引:" + fieldPos);
  84. };
  85. }
  86. }
  87. ------------------------------------------------------------------------------
  88. 使用Confluent.Kafka;
  89. 使用Confluent.SchemaRegistry;
  90. public async Task Publish<T>(T @event) where T
  91. {
  92. using (var producer = GetProducer())
  93. {
  94. try
  95. {
  96. if (@event == null)
  97. {
  98. return;
  99. }
  100. var kafkaEvent = KafkaEventFactory.Create(@event);
  101. var avroModel = AvroModelFactory.Create(kafkaEvent);
  102. var message = new Message<string, MyRecord>() { Value = avroModel, Key = kafkaEvent.Key };
  103. var deliveryResult = await producer.ProduceAsync(_kafkaOption.Topic, message);
  104. _logger.LogInformation($"已交付 '{deliveryResult.Value}' 到 '{deliveryResult.TopicPartitionOffset}'");
  105. }
  106. catch (ProduceException<string, MyRecord> e)
  107. {
  108. _logger.LogError($"交付失败:{e.Error.Reason}");
  109. throw;
  110. }
  111. }
  112. }
  113. private IProducer<string, MyRecord> GetProducer()
  114. {
  115. var config = GetProducerConfig();
  116. var schemaRegistry = GetSchemaRegistry();
  117. var producer = new ProducerBuilder<string, MyRecord >(config)
  118. .SetErrorHandler((_, error) =>
  119. {
  120. _logger.LogError(error.ToString());
  121. })
  122. .SetValueSerializer(new KafkaAvroSerializer< MyRecord >())
  123. .Build();
  124. return producer;
  125. }
  126. private ProducerConfig GetProducerConfig()
  127. {
  128. return new ProducerConfig
  129. {
  130. BootstrapServers = "BootstrapServers"
  131. };
  132. }
  133. private CachedSchemaRegistryClient GetSchemaRegistry()
  134. {
  135. return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = "SchemaRegistryUrl" });
  136. }

希望这对您有所帮助!如果您需要进一步的翻译或有其他问题,请随时提问。

英文:

This is the C# code which was questioned in this topic.
The schema can be retrieved by this line of code and it can be convertet to Avro schema.

GetSchemaRegistry().GetLatestSchemaAsync("SchemaRegistrySubject").Result.Schema


  1. using Avro.IO;
  2. using Avro.Specific;
  3. using Confluent.Kafka;
  4. public class KafkaAvroSerializer&lt;T&gt; : IAsyncSerializer&lt;T&gt;
  5. where T : class, ISpecificRecord
  6. {
  7. public Task&lt;byte[]&gt; SerializeAsync(T data, SerializationContext context)
  8. {
  9. return Task.Run(() =&gt;
  10. {
  11. using (var ms = new MemoryStream())
  12. {
  13. var enc = new BinaryEncoder(ms);
  14. var writer = new SpecificDefaultWriter(data.Schema);
  15. writer.Write(data, enc);
  16. return ms.ToArray();
  17. }
  18. });
  19. }
  20. }

  1. using Avro;
  2. using Avro.Specific;
  3. public class MyRecord : ISpecificRecord
  4. {
  5. public Schema Schema =&gt;
  6. Schema.Parse(@”Schema.Parse @&quot;{
  7. &quot;&quot;name&quot;&quot;: &quot;&quot;name &quot;&quot;,
  8. &quot;&quot;namespace&quot;&quot;: &quot;&quot;namespace&quot;&quot;,
  9. &quot;&quot;type&quot;&quot;: &quot;&quot;record&quot;&quot;,
  10. &quot;&quot;fields&quot;&quot;: [
  11. {
  12. &quot;&quot;name&quot;&quot;: &quot;&quot;field1&quot;&quot;,
  13. &quot;&quot;type&quot;&quot;: [
  14. &quot;&quot;null&quot;&quot;,
  15. &quot;&quot;int&quot;&quot;
  16. ],
  17. &quot;&quot;default&quot;&quot;: null
  18. },
  19. {
  20. &quot;&quot;name&quot;&quot;: &quot;&quot;field2&quot;&quot;,
  21. &quot;&quot;type&quot;&quot;: [
  22. &quot;&quot;null&quot;&quot;,
  23. &quot;&quot;string&quot;&quot;
  24. ],
  25. &quot;&quot;default&quot;&quot;: null
  26. },
  27. {
  28. &quot;&quot;name&quot;&quot;: &quot;&quot;field3&quot;&quot;,
  29. &quot;&quot;type&quot;&quot;: [
  30. &quot;&quot;null&quot;&quot;,
  31. &quot;&quot;string&quot;&quot;
  32. ],
  33. &quot;&quot;default&quot;&quot;: null
  34. }
  35. ]
  36. }&quot;);
  37. public int field1 { get; set; }
  38. public string field2 { get; set; }
  39. public string field3 { get; set; }
  40. public object Get(int fieldPos)
  41. {
  42. switch (fieldPos)
  43. {
  44. case 0: return field1;
  45. case 1: return field2;
  46. case 2: return field3;
  47. default: throw new AvroRuntimeException(&quot;Bad index &quot; + fieldPos + &quot; in
  48. Get()&quot;);
  49. };
  50. }
  51. public void Put(int fieldPos, object fieldValue)
  52. {
  53. switch (fieldPos)
  54. {
  55. case 0: field1= (int)fieldValue; break;
  56. case 1: field2= (string)fieldValue; break;
  57. case 2: field3= (string)fieldValue; break;
  58. default: throw new AvroRuntimeException(&quot;Bad index &quot; + fieldPos + &quot; in
  59. Put()&quot;);
  60. };
  61. }
  62. }

  1. using Confluent.Kafka;
  2. using Confluent.SchemaRegistry;
  3. public async Task Publish&lt;T&gt;(T @event) where T
  4. {
  5. using (var producer = GetProducer())
  6. {
  7. try
  8. {
  9. if (@event == null)
  10. {
  11. return;
  12. }
  13. var kafkaEvent = KafkaEventFactory.Create(@event);
  14. var avroModel = AvroModelFactory.Create(kafkaEvent);
  15. var message = new Message&lt;string, MyRecord&gt;() { Value = avroModel, Key = kafkaEvent.Key };
  16. var deliveryResult = await producer.ProduceAsync(_kafkaOption.Topic, message);
  17. _logger.LogInformation($&quot;Delivered &#39;{deliveryResult.Value}&#39; to &#39;{deliveryResult.TopicPartitionOffset}&#39;&quot;);
  18. }
  19. catch (ProduceException&lt;string, MyRecord&gt; e)
  20. {
  21. _logger.LogError($&quot;Delivery failed: {e.Error.Reason}&quot;);
  22. throw;
  23. }
  24. }
  25. }
  26. private IProducer&lt;string, MyRecord&gt; GetProducer()
  27. {
  28. var config = GetProducerConfig();
  29. var schemaRegistry = GetSchemaRegistry();
  30. var producer = new ProducerBuilder&lt;string, MyRecord &gt;(config)
  31. .SetErrorHandler((_, error) =&gt;
  32. {
  33. _logger.LogError(error.ToString());
  34. })
  35. .SetValueSerializer(new KafkaAvroSerializer&lt; MyRecord &gt;())
  36. .Build();
  37. return producer;
  38. }
  39. private ProducerConfig GetProducerConfig()
  40. {
  41. return new ProducerConfig
  42. {
  43. BootstrapServers = BootstrapServers
  44. };
  45. }
  46. private CachedSchemaRegistryClient GetSchemaRegistry()
  47. {
  48. return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url =
  49. SchemaRegistryUrl });
  50. }

huangapple
  • 本文由 发表于 2023年7月24日 18:29:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76753579.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定