问题出在Kafka反序列化Python上。

huangapple go评论59阅读模式
英文:

Problem with Kafka deserialization Python

问题

I new in Kafka and Python but I should create consumer:)
我是Kafka和Python的新手,但我应该创建一个消费者 问题出在Kafka反序列化Python上。

I created simple consumer and got result, but the data in Kafka is store in Avro that's why I need to make deserialization.
我创建了一个简单的消费者并获得了结果,但Kafka中的数据存储在Avro中,所以我需要进行反序列化。

I tried variant like this
我尝试了类似这样的变体

And got error
然后出现了错误

fastavro._schema_common.SchemaParseException: Default value must match schema type: long
fastavro._schema_common.SchemaParseException:默认值必须匹配模式类型:long

SCHEMA_PATH = "descr.avsc" looks like
SCHEMA_PATH = "descr.avsc"看起来像

{
"type": "record",
"name": "klf",
"namespace": "test_ns",
"fields": [
{
"name": "descr",
"type": "string",
"default": "undefined"
},
{
"name": "test_id",
"type": "long",
"default": "undefined"
},
{
"name": "dep",
"type": "string",
"default": "undefined"
},
{
"name": "stor_key",
"type": "string",
"default": "undefined"
},
{
"name": "time_dt",
"type": "string",
"default": "undefined"
},
{
"name": "pos",
"type": "string",
"default": "undefined"
}
]
}

What will I need to change to get the result with data?
我需要更改什么才能获得带有数据的结果?

英文:

I new in Kafka and Python but I should create consumer:)

I created simple consumer and got result, but the data in Kafka is store in Avro that's why I need to make deserialization.

I tried variant like this

import os
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer


if __name__ == "__main__":

    class test(object):

        def __init__(self,test_id=None,dep=None,descr=None,stor_key=None,pos=None,time_dt=None):
            self.test_id = test_id
            self.dep = dep
            self.descr = descr
            self.stor_key = stor_key
            self.pos = pos
            self.time_dt = time_dt


def dict_to_klf(obj, ctx):

   if obj is None:
        return None

   return test(test_id=obj['test_id'],
                    dep=obj['dep'],
                    descr=obj['descr'],
                    stor_key=obj['stor_key'],
                    pos=obj['pos'],
                    time_dt=obj['time_dt'])

schema = "descr.avsc"

path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}\\{schema}") as f:
        schema_str = f.read()
        
sr_conf = {'url': ':8081'}
schema_registry_client = SchemaRegistryClient(sr_conf)     


avro_deserializer = AvroDeserializer(schema_registry_client,
                                         schema_str,
                                         dict_to_klf)   

consumer_config = {
        "bootstrap.servers": "com:9092",
        "group.id": "descr_events",
        "auto.offset.reset": "earliest"
                  }

consumer = Consumer(consumer_config)

consumer.subscribe(['descr'])

while True:
        msg = consumer.poll(1)
        if msg is None:
          continue

         user = avro_deserializer(message.value, SerializationContext(topic, MessageField.VALUE))
   
        print(msg.topic())
        print("-------------------------")

And got error

fastavro._schema_common.SchemaParseException: Default value <undefined> must match schema type: long

SCHEMA_PATH = "descr.avsc" looks like

{
	"type": "record",
	"name": "klf",
	"namespace": "test_ns",
	"fields": [	
		{
			"name": "descr",
			"type": "string",
			"default": "undefined"
		},
		{
			"name": "test_id",
			"type": "long",
			"default": "undefined"
		},
		{
			"name": "dep",
			"type": "string",
			"default": "undefined"
		},
		{
			"name": "stor_key",
			"type": "string",
			"default": "undefined"
		},
		{
			"name": "time_dt",
			"type": "string",
			"default": "undefined"
		},
		{
			"name": "pos",
			"type": "string",
			"default": "undefined"
		}
	]
	
}

What will I need to change to get the result with data?

答案1

得分: 0

这个方法帮助了我:

from confluent_kafka import avro, KafkaError, KafkaException, OFFSET_BEGINNING
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.cimpl import TopicPartition
import json

sr_url = "http://schema-registry:8081"
kafka_broker_url = "kafka:9092"
kafka_group_id = "group_id"

consumer = AvroConsumer({
    'bootstrap.servers': kafka_broker_url,
    'group.id': kafka_group_id,
    'schema.registry.url': sr_url,
    'default.topic.config': {'auto.offset.reset': 'earliest'}
})

topic = 'topic'
partitions = 0

tp = TopicPartition(topic, partitions, OFFSET_BEGINNING)
consumer.assign([tp])
consumer.seek(tp)

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('分区结束事件接收到,分区 {}:{} 结束'.format(msg.topic(), msg.partition()))
            else:
                raise KafkaException(msg.error())
        else:
            value = msg.value()
            print(value)
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

请注意,我已经将URL中的HTML编码还原,并进行了一些格式调整,以使代码更易于阅读。

英文:

This method helped me

from confluent_kafka import avro, KafkaError, KafkaException,OFFSET_BEGINNING
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.cimpl import TopicPartition
import json

sr_url = ":8081"
kafka_broker_url = ":9092"
kafka_group_id = "group_id"

consumer = AvroConsumer({
    'bootstrap.servers': kafka_broker_url,
    'group.id': kafka_group_id,
    'schema.registry.url': sr_url,
    'default.topic.config': {'auto.offset.reset': 'earliest'}
})


topic = 'topic'
partitions = 0

tp = TopicPartition(topic, partitions, OFFSET_BEGINNING)
consumer.assign([tp])
consumer.seek(tp)


try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                print('End of partition event received for partition {}:{}'.format(msg.topic(), msg.partition()))
            else:
                raise KafkaException(msg.error())
        else:
            value = msg.value()
            print(value)
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

huangapple
  • 本文由 发表于 2023年2月24日 00:01:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/75547314.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定