英文:
Kafka Connect: Convert message from bytes to Json
问题
我正在尝试使用Google PubSub源连接器从我的Google Cloud抓取数据到Kafka。我确实获取到数据,但消息以字节形式显示。我参考了这里,如其所述,我使用了JSON转换器进行转换。
这是我的连接器代码部分:
name=CPSSourceConnector
connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector
tasks.max=10
kafka.topic=test-topic
kafka.topic.replication.factor=1
kafka.key.attribute=message
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
cps.subscription=test-sub
cps.project=sensor-alpha
这是我在Kafka中收到的内容:
{
"schema":{
"type":"struct",
"fields":[
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"subFolder"
},
{
"type":"string",
"optional":false,
"field":"deviceId"
},
{
"type":"string",
"optional":false,
"field":"deviceRegistryLocation"
},
{
"type":"string",
"optional":false,
"field":"projectId"
},
{
"type":"string",
"optional":false,
"field":"deviceNumId"
},
{
"type":"string",
"optional":false,
"field":"deviceRegistryId"
}
],
"optional":false
},
"payload":{
"message":"eyJz...XhdGVkIn0=",
"subFolder":"",
"deviceId":"deviceid",
"deviceRegistryLocation":"region_value",
"projectId":"projectid",
"deviceNumId":"device_num_value",
"deviceRegistryId":"registryid"
}
}
即使在提供连接器详细信息后,我仍然得到字节形式的消息。我是否需要进一步处理才能将其转换为JSON格式?
英文:
I am trying to use a Google PubSub source connector to fetch data from my google cloud to kafka. I do get the data, but the message comes as bytes. I refered here and as mentioned, I have used a JSON convertor to change it.
Here is my connector code part:
name=CPSSourceConnector
connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector
tasks.max=10
kafka.topic=test-topic
kafka.topic.replication.factor=1
kafka.key.attribute=message
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
cps.subscription=test-sub
cps.project=sensor-alpha
And this what I get in my kafka:
{
"schema":{
"type":"struct",
"fields":[
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"subFolder"
},
{
"type":"string",
"optional":false,
"field":"deviceId"
},
{
"type":"string",
"optional":false,
"field":"deviceRegistryLocation"
},
{
"type":"string",
"optional":false,
"field":"projectId"
},
{
"type":"string",
"optional":false,
"field":"deviceNumId"
},
{
"type":"string",
"optional":false,
"field":"deviceRegistryId"
}
],
"optional":false
},
"payload":{
"message":"eyJzZW5zb3JfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMtMzAyIiwgInRfY2Vsc2l1cyI6IDEwLCAicmVnaXN0cnlfaWQiOiAiYmFsZW5hLXJlZ2lzdHJ5IiwgInByZXNzdXJlIjogMTAsICJ0aW1lc3RhbXAiOiAxNTk4NDM2NTk3LjQxNTEwNDYsICJkZXZpY2VfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMiLCAic3RyaW5nX2JhdHRlcnkiOiAiYmF0dGVyeV9ub3JtYWwiLCAic3RyaW5nX2luZmxhdGUiOiAidGlyZV9vdmVyX2luZmxhdGVkIn0=",
"subFolder":"",
"deviceId":"deviceid",
"deviceRegistryLocation":"region_value",
"projectId":"projectid",
"deviceNumId":"device_num_value",
"deviceRegistryId":"registryid"
}
}
Even after providing the connector, details I get message as byte. Is there something further I should be doing to convert it to json format ?
答案1
得分: 0
云 Pub/Sub Kafka 连接器不会检查或转换接收到的消息中的数据;它只是将数据字段以字节的形式传递,这是 PubsubMessage 中字段的类型。目前还没有办法使连接器本身读取消息的内容并将其转换为 JSON。
英文:
The Cloud Pub/Sub Kafka connector does not inspect or convert the data in messages that it receives; it just passes the data field through as bytes, which is the type of the field in the PubsubMessage. There is currently no way to get the connector itself to read the contents of the message and convert it to JSON.
答案2
得分: 0
检查这个连接器:
{
"transforms": "bytesToString",
"transforms.bytesToString.type": "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value",
"transforms.bytesToString.fields": "bytes"
}
链接:https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/examples/BytesToString.struct.html
英文:
check this connector:
{
"transforms" : "bytesToString",
"transforms.bytesToString.type" : "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value",
"transforms.bytesToString.fields" : "bytes"
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论