英文:
Failed integration between C# and Python using same schema on schema registry
问题
我们有一段C#代码,它在Kafka上生成记录并使用Avro模式,我们也可以用C#消费同一个主题,但是当我们尝试在Python中消费时,我们会收到以下错误信息:
消息反序列化失败:在[主题名称] [4] 偏移 17 处的消息反序列化失败:消息不以魔术字节开头
当我们使用Python生成并使用Python消费时,这个过程也是正常的。
P.S. 在C#中使用了IAsyncSerializer
,在Python代码中使用了AvroConsumer
。
我们尝试过在C#中序列化记录,并根据相同的Avro模式在模式注册表中反序列化记录。
问题:
如果您以前有类似的经验,是否可以帮助我?
英文:
Problem Description
We have C# code which is producing records on Kafka and use an Avro schema, we can consume the same topic by C# as well, but when we want to consume it in Python, we get the following error:
> Message deserialization failed: Message deserialization failed for message at [topic name] [4] offset 17: message does not start with magic byte`
and also this procedure is fine when we produce by Python and consume by Python as well.
P.S. IAsyncSerializer
is used in C# and AvroConsumer
in Python code.
We have tried to serialize records in C# and deserialize the in Python based on the same Avro schema on schema registry.
Question:
Could you please help me if you had such experience before?
答案1
得分: 1
以下是您提供的代码的中文翻译部分:
这是在此主题中提出问题的C#代码。
可以通过以下代码行检索模式,然后将其转换为Avro模式。
GetSchemaRegistry().GetLatestSchemaAsync("SchemaRegistrySubject").Result.Schema
-----------------------------------------------------------------
使用Avro.IO;
使用Avro.Specific;
使用Confluent.Kafka;
public class KafkaAvroSerializer<T> : IAsyncSerializer<T>
where T : class, ISpecificRecord
{
public Task<byte[]> SerializeAsync(T data, SerializationContext context)
{
return Task.Run(() =>
{
using (var ms = new MemoryStream())
{
var enc = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(data.Schema);
writer.Write(data, enc);
return ms.ToArray();
}
});
}
}
-------------------------------------------------------------------------------
使用Avro;
使用Avro.Specific;
public class MyRecord : ISpecificRecord
{
public Schema Schema =>
Schema.Parse(@"
{
""name"": ""name"",
""namespace"": ""namespace"",
""type"": ""record"",
""fields"": [
{
""name"": ""field1"",
""type"": [
""null"",
""int""
],
""default"": null
},
{
""name"": ""field2"",
""type"": [
""null"",
""string""
],
""default"": null
},
{
""name"": ""field3"",
""type"": [
""null"",
""string""
],
""default"": null
}
]
}");
public int field1 { get; set; }
public string field2 { get; set; }
public string field3 { get; set; }
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return field1;
case 1: return field2;
case 2: return field3;
default: throw new AvroRuntimeException("在Get()中的无效索引:" + fieldPos);
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: field1 = (int)fieldValue; break;
case 1: field2 = (string)fieldValue; break;
case 2: field3 = (string)fieldValue; break;
default: throw new AvroRuntimeException("在Put()中的无效索引:" + fieldPos);
};
}
}
------------------------------------------------------------------------------
使用Confluent.Kafka;
使用Confluent.SchemaRegistry;
public async Task Publish<T>(T @event) where T
{
using (var producer = GetProducer())
{
try
{
if (@event == null)
{
return;
}
var kafkaEvent = KafkaEventFactory.Create(@event);
var avroModel = AvroModelFactory.Create(kafkaEvent);
var message = new Message<string, MyRecord>() { Value = avroModel, Key = kafkaEvent.Key };
var deliveryResult = await producer.ProduceAsync(_kafkaOption.Topic, message);
_logger.LogInformation($"已交付 '{deliveryResult.Value}' 到 '{deliveryResult.TopicPartitionOffset}'");
}
catch (ProduceException<string, MyRecord> e)
{
_logger.LogError($"交付失败:{e.Error.Reason}");
throw;
}
}
}
private IProducer<string, MyRecord> GetProducer()
{
var config = GetProducerConfig();
var schemaRegistry = GetSchemaRegistry();
var producer = new ProducerBuilder<string, MyRecord >(config)
.SetErrorHandler((_, error) =>
{
_logger.LogError(error.ToString());
})
.SetValueSerializer(new KafkaAvroSerializer< MyRecord >())
.Build();
return producer;
}
private ProducerConfig GetProducerConfig()
{
return new ProducerConfig
{
BootstrapServers = "BootstrapServers"
};
}
private CachedSchemaRegistryClient GetSchemaRegistry()
{
return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = "SchemaRegistryUrl" });
}
希望这对您有所帮助!如果您需要进一步的翻译或有其他问题,请随时提问。
英文:
This is the C# code which was questioned in this topic.
The schema can be retrieved by this line of code and it can be convertet to Avro schema.
GetSchemaRegistry().GetLatestSchemaAsync("SchemaRegistrySubject").Result.Schema
using Avro.IO;
using Avro.Specific;
using Confluent.Kafka;
public class KafkaAvroSerializer<T> : IAsyncSerializer<T>
where T : class, ISpecificRecord
{
public Task<byte[]> SerializeAsync(T data, SerializationContext context)
{
return Task.Run(() =>
{
using (var ms = new MemoryStream())
{
var enc = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(data.Schema);
writer.Write(data, enc);
return ms.ToArray();
}
});
}
}
using Avro;
using Avro.Specific;
public class MyRecord : ISpecificRecord
{
public Schema Schema =>
Schema.Parse(@”Schema.Parse @"{
""name"": ""name "",
""namespace"": ""namespace"",
""type"": ""record"",
""fields"": [
{
""name"": ""field1"",
""type"": [
""null"",
""int""
],
""default"": null
},
{
""name"": ""field2"",
""type"": [
""null"",
""string""
],
""default"": null
},
{
""name"": ""field3"",
""type"": [
""null"",
""string""
],
""default"": null
}
]
}");
public int field1 { get; set; }
public string field2 { get; set; }
public string field3 { get; set; }
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return field1;
case 1: return field2;
case 2: return field3;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in
Get()");
};
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: field1= (int)fieldValue; break;
case 1: field2= (string)fieldValue; break;
case 2: field3= (string)fieldValue; break;
default: throw new AvroRuntimeException("Bad index " + fieldPos + " in
Put()");
};
}
}
using Confluent.Kafka;
using Confluent.SchemaRegistry;
public async Task Publish<T>(T @event) where T
{
using (var producer = GetProducer())
{
try
{
if (@event == null)
{
return;
}
var kafkaEvent = KafkaEventFactory.Create(@event);
var avroModel = AvroModelFactory.Create(kafkaEvent);
var message = new Message<string, MyRecord>() { Value = avroModel, Key = kafkaEvent.Key };
var deliveryResult = await producer.ProduceAsync(_kafkaOption.Topic, message);
_logger.LogInformation($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
}
catch (ProduceException<string, MyRecord> e)
{
_logger.LogError($"Delivery failed: {e.Error.Reason}");
throw;
}
}
}
private IProducer<string, MyRecord> GetProducer()
{
var config = GetProducerConfig();
var schemaRegistry = GetSchemaRegistry();
var producer = new ProducerBuilder<string, MyRecord >(config)
.SetErrorHandler((_, error) =>
{
_logger.LogError(error.ToString());
})
.SetValueSerializer(new KafkaAvroSerializer< MyRecord >())
.Build();
return producer;
}
private ProducerConfig GetProducerConfig()
{
return new ProducerConfig
{
BootstrapServers = “BootstrapServers”
};
}
private CachedSchemaRegistryClient GetSchemaRegistry()
{
return new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url =
“SchemaRegistryUrl” });
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论