Kafka Connect:将消息从字节转换为Json

huangapple go评论83阅读模式
英文:

Kafka Connect: Convert message from bytes to Json

问题

我正在尝试使用Google PubSub源连接器从我的Google Cloud抓取数据到Kafka。我确实获取到数据,但消息以字节形式显示。我参考了这里,如其所述,我使用了JSON转换器进行转换。

这是我的连接器代码部分:

name=CPSSourceConnector
connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector
tasks.max=10
kafka.topic=test-topic
kafka.topic.replication.factor=1
kafka.key.attribute=message
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
cps.subscription=test-sub
cps.project=sensor-alpha

这是我在Kafka中收到的内容:

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"bytes",
            "optional":false,
            "field":"message"
         },
         {
            "type":"string",
            "optional":false,
            "field":"subFolder"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceRegistryLocation"
         },
         {
            "type":"string",
            "optional":false,
            "field":"projectId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceNumId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceRegistryId"
         }
      ],
      "optional":false
   },
   "payload":{
      "message":"eyJz...XhdGVkIn0=",
      "subFolder":"",
      "deviceId":"deviceid",
      "deviceRegistryLocation":"region_value",
      "projectId":"projectid",
      "deviceNumId":"device_num_value",
      "deviceRegistryId":"registryid"
   }
}

即使在提供连接器详细信息后,我仍然得到字节形式的消息。我是否需要进一步处理才能将其转换为JSON格式?

英文:

I am trying to use a Google PubSub source connector to fetch data from my google cloud to kafka. I do get the data, but the message comes as bytes. I refered here and as mentioned, I have used a JSON convertor to change it.

Here is my connector code part:

name=CPSSourceConnector
connector.class=com.google.pubsub.kafka.source.CloudPubSubSourceConnector
tasks.max=10
kafka.topic=test-topic
kafka.topic.replication.factor=1
kafka.key.attribute=message
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
cps.subscription=test-sub
cps.project=sensor-alpha

And this what I get in my kafka:

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"bytes",
            "optional":false,
            "field":"message"
         },
         {
            "type":"string",
            "optional":false,
            "field":"subFolder"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceRegistryLocation"
         },
         {
            "type":"string",
            "optional":false,
            "field":"projectId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceNumId"
         },
         {
            "type":"string",
            "optional":false,
            "field":"deviceRegistryId"
         }
      ],
      "optional":false
   },
   "payload":{
      "message":"eyJzZW5zb3JfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMtMzAyIiwgInRfY2Vsc2l1cyI6IDEwLCAicmVnaXN0cnlfaWQiOiAiYmFsZW5hLXJlZ2lzdHJ5IiwgInByZXNzdXJlIjogMTAsICJ0aW1lc3RhbXAiOiAxNTk4NDM2NTk3LjQxNTEwNDYsICJkZXZpY2VfaWQiOiAiYmEwMGQyNjNiNzRiMzBhMGFjM2EzMDlkZWZjZjM0ODMiLCAic3RyaW5nX2JhdHRlcnkiOiAiYmF0dGVyeV9ub3JtYWwiLCAic3RyaW5nX2luZmxhdGUiOiAidGlyZV9vdmVyX2luZmxhdGVkIn0=",
      "subFolder":"",
      "deviceId":"deviceid",
      "deviceRegistryLocation":"region_value",
      "projectId":"projectid",
      "deviceNumId":"device_num_value",
      "deviceRegistryId":"registryid"
   }
}

Even after providing the connector, details I get message as byte. Is there something further I should be doing to convert it to json format ?

答案1

得分: 0

云 Pub/Sub Kafka 连接器不会检查或转换接收到的消息中的数据;它只是将数据字段以字节的形式传递,这是 PubsubMessage 中字段的类型。目前还没有办法使连接器本身读取消息的内容并将其转换为 JSON。

英文:

The Cloud Pub/Sub Kafka connector does not inspect or convert the data in messages that it receives; it just passes the data field through as bytes, which is the type of the field in the PubsubMessage. There is currently no way to get the connector itself to read the contents of the message and convert it to JSON.

答案2

得分: 0

检查这个连接器:

{
  "transforms": "bytesToString",
  "transforms.bytesToString.type": "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value",
  "transforms.bytesToString.fields": "bytes"
}

链接:https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/examples/BytesToString.struct.html

英文:

check this connector:

{
  "transforms" : "bytesToString",
  "transforms.bytesToString.type" : "com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value",
  "transforms.bytesToString.fields" : "bytes"
}

https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/examples/BytesToString.struct.html

huangapple
  • 本文由 发表于 2020年8月26日 18:36:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/63595793.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定