英文:
Problem with Kafka deserialization Python
问题
I new in Kafka and Python but I should create consumer:)
我是Kafka和Python的新手,但我应该创建一个消费者
I created simple consumer and got result, but the data in Kafka is store in Avro that's why I need to make deserialization.
我创建了一个简单的消费者并获得了结果,但Kafka中的数据存储在Avro中,所以我需要进行反序列化。
I tried variant like this
我尝试了类似这样的变体
And got error
然后出现了错误
fastavro._schema_common.SchemaParseException: Default value
fastavro._schema_common.SchemaParseException:默认值
SCHEMA_PATH = "descr.avsc" looks like
SCHEMA_PATH = "descr.avsc"看起来像
{
"type": "record",
"name": "klf",
"namespace": "test_ns",
"fields": [
{
"name": "descr",
"type": "string",
"default": "undefined"
},
{
"name": "test_id",
"type": "long",
"default": "undefined"
},
{
"name": "dep",
"type": "string",
"default": "undefined"
},
{
"name": "stor_key",
"type": "string",
"default": "undefined"
},
{
"name": "time_dt",
"type": "string",
"default": "undefined"
},
{
"name": "pos",
"type": "string",
"default": "undefined"
}
]
}
What will I need to change to get the result with data?
我需要更改什么才能获得带有数据的结果?
英文:
I new in Kafka and Python but I should create consumer:)
I created simple consumer and got result, but the data in Kafka is store in Avro that's why I need to make deserialization.
I tried variant like this
import os
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
if __name__ == "__main__":
class test(object):
def __init__(self,test_id=None,dep=None,descr=None,stor_key=None,pos=None,time_dt=None):
self.test_id = test_id
self.dep = dep
self.descr = descr
self.stor_key = stor_key
self.pos = pos
self.time_dt = time_dt
def dict_to_klf(obj, ctx):
if obj is None:
return None
return test(test_id=obj['test_id'],
dep=obj['dep'],
descr=obj['descr'],
stor_key=obj['stor_key'],
pos=obj['pos'],
time_dt=obj['time_dt'])
schema = "descr.avsc"
path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}\\{schema}") as f:
schema_str = f.read()
sr_conf = {'url': ':8081'}
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_deserializer = AvroDeserializer(schema_registry_client,
schema_str,
dict_to_klf)
consumer_config = {
"bootstrap.servers": "com:9092",
"group.id": "descr_events",
"auto.offset.reset": "earliest"
}
consumer = Consumer(consumer_config)
consumer.subscribe(['descr'])
while True:
msg = consumer.poll(1)
if msg is None:
continue
user = avro_deserializer(message.value, SerializationContext(topic, MessageField.VALUE))
print(msg.topic())
print("-------------------------")
And got error
fastavro._schema_common.SchemaParseException: Default value <undefined> must match schema type: long
SCHEMA_PATH = "descr.avsc" looks like
{
"type": "record",
"name": "klf",
"namespace": "test_ns",
"fields": [
{
"name": "descr",
"type": "string",
"default": "undefined"
},
{
"name": "test_id",
"type": "long",
"default": "undefined"
},
{
"name": "dep",
"type": "string",
"default": "undefined"
},
{
"name": "stor_key",
"type": "string",
"default": "undefined"
},
{
"name": "time_dt",
"type": "string",
"default": "undefined"
},
{
"name": "pos",
"type": "string",
"default": "undefined"
}
]
}
What will I need to change to get the result with data?
答案1
得分: 0
这个方法帮助了我:
from confluent_kafka import avro, KafkaError, KafkaException, OFFSET_BEGINNING
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.cimpl import TopicPartition
import json
sr_url = "http://schema-registry:8081"
kafka_broker_url = "kafka:9092"
kafka_group_id = "group_id"
consumer = AvroConsumer({
'bootstrap.servers': kafka_broker_url,
'group.id': kafka_group_id,
'schema.registry.url': sr_url,
'default.topic.config': {'auto.offset.reset': 'earliest'}
})
topic = 'topic'
partitions = 0
tp = TopicPartition(topic, partitions, OFFSET_BEGINNING)
consumer.assign([tp])
consumer.seek(tp)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('分区结束事件接收到,分区 {}:{} 结束'.format(msg.topic(), msg.partition()))
else:
raise KafkaException(msg.error())
else:
value = msg.value()
print(value)
except KeyboardInterrupt:
pass
finally:
consumer.close()
请注意,我已经将URL中的HTML编码还原,并进行了一些格式调整,以使代码更易于阅读。
英文:
This method helped me
from confluent_kafka import avro, KafkaError, KafkaException,OFFSET_BEGINNING
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.cimpl import TopicPartition
import json
sr_url = ":8081"
kafka_broker_url = ":9092"
kafka_group_id = "group_id"
consumer = AvroConsumer({
'bootstrap.servers': kafka_broker_url,
'group.id': kafka_group_id,
'schema.registry.url': sr_url,
'default.topic.config': {'auto.offset.reset': 'earliest'}
})
topic = 'topic'
partitions = 0
tp = TopicPartition(topic, partitions, OFFSET_BEGINNING)
consumer.assign([tp])
consumer.seek(tp)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition event received for partition {}:{}'.format(msg.topic(), msg.partition()))
else:
raise KafkaException(msg.error())
else:
value = msg.value()
print(value)
except KeyboardInterrupt:
pass
finally:
consumer.close()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论