如何在Kafka Connect中使用Schema注册表进行AVRO数据格式的处理。

huangapple go评论52阅读模式
英文:

How to use Schema registry for Kafka Connect AVRO

问题

我最近开始探索Kafka和Kafka Connect,并进行了一些初始设置。但是我想更深入地了解模式注册部分。

我的模式注册已经启动,现在我应该做什么?

我有一个AVRO模式存储在avro_schema.avsc文件中。以下是模式:

{
   "name": "FSP-AUDIT-EVENT",
   "type": "record",
   "namespace": "com.acme.avro",
   "fields": [
      {"name": "ID", "type": "string"},
      {"name": "VERSION", "type": "int"},
      {"name": "ACTION_TYPE", "type": "string"},
      {"name": "EVENT_TYPE", "type": "string"},
      {"name": "CLIENT_ID", "type": "string"},
      {"name": "DETAILS", "type": "string"},
      {"name": "OBJECT_TYPE", "type": "string"},
      {"name": "UTC_DATE_TIME", "type": "long"},
      {"name": "POINT_IN_TIME_PRECISION", "type": "string"},
      {"name": "TIME_ZONE", "type": "string"},
      {"name": "TIMELINE_PRECISION", "type": "string"},
      {"name": "AUDIT_EVENT_TO_UTC_DT", "type": ["string", "null"]},
      {"name": "AUDIT_EVENT_TO_DATE_PITP", "type": "string"},
      {"name": "AUDIT_EVENT_TO_DATE_TZ", "type": "string"},
      {"name": "AUDIT_EVENT_TO_DATE_TP", "type": "string"},
      {"name": "GROUP_ID", "type": "string"},
      {"name": "OBJECT_DISPLAY_NAME", "type": "string"},
      {"name": "OBJECT_ID", "type": ["string", "null"]},
      {"name": "USER_DISPLAY_NAME", "type": ["string", "null"]},
      {"name": "USER_ID", "type": "string"},
      {"name": "PARENT_EVENT_ID", "type": ["string", "null"]},
      {"name": "NOTES", "type": ["string", "null"]},
      {"name": "SUMMARY", "type": ["string", "null"]}
   ]
}

我的模式是否有效?我是在线从JSON转换的,不确定模式文件应该放在哪个位置,请指导我接下来要执行的步骤。

我从Lambda函数和JDBC源发送记录。那么,我如何强制执行AVRO模式并进行测试?我是否需要更改avro-consumer属性文件?或者这是注册模式的正确方式:

./bin/kafka-avro-console-producer \
  --broker-list b-3.**:9092,b-**:9092,b-**:9092 --topic AVRO-AUDIT_EVENT \
  --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

接下来我该做什么?

但是,当我尝试查看我的模式时,我只能看到以下内容:

curl --silent -X GET http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/latest

这是结果:

{"subject":"AVRO-AUDIT-EVENT","version":1,"id":161,"schema":"{"type":"string","optional":false}"}

为什么我看不到我的完整注册模式?

此外,当我尝试删除模式时,我收到以下错误:

{"error_code":405,"message":"HTTP 405 Method Not Allowed"}

我不确定我的模式是否正确注册。请帮助我。提前感谢!

英文:

I have started exploring Kafka and Kafka connect recently and did some initial set up .
But wanted to explore more on schema registry part .

My schema registry is started now what i should do .

I have a AVRO schema stored in avro_schema.avsc.

here is the schema

{
  "name": "FSP-AUDIT-EVENT",
  "type": "record",
  "namespace": "com.acme.avro",
  "fields": [
    {
      "name": "ID",
      "type": "string"
    },
    {
      "name": "VERSION",
      "type": "int"
    },
    {
      "name": "ACTION_TYPE",
      "type": "string"
    },
    {
      "name": "EVENT_TYPE",
      "type": "string"
    },
    {
      "name": "CLIENT_ID",
      "type": "string"
    },
    {
      "name": "DETAILS",
      "type": "string"
    },
    {
      "name": "OBJECT_TYPE",
      "type": "string"
    },
    {
      "name": "UTC_DATE_TIME",
      "type": "long"
    },
    {
      "name": "POINT_IN_TIME_PRECISION",
      "type": "string"
    },
    {
      "name": "TIME_ZONE",
      "type": "string"
    },
    {
      "name": "TIMELINE_PRECISION",
      "type": "string"
    },
    {
      "name": "AUDIT_EVENT_TO_UTC_DT",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "AUDIT_EVENT_TO_DATE_PITP",
      "type": "string"
    },
    {
      "name": "AUDIT_EVENT_TO_DATE_TZ",
      "type": "string"
    },
    {
      "name": "AUDIT_EVENT_TO_DATE_TP",
      "type": "string"
    },
    {
      "name": "GROUP_ID",
      "type": "string"
    },
    {
      "name": "OBJECT_DISPLAY_NAME",
      "type": "string"
    },
    {
      "name": "OBJECT_ID",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "USER_DISPLAY_NAME",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "USER_ID",
      "type": "string"
    },
    {
      "name": "PARENT_EVENT_ID",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "NOTES",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "SUMMARY",
      "type": [
        "string",
        "null"
      ]
    }
  ]
}

Is my schema is valid .I converted it online from JSON ?
where should i keep this schema file location i am not sure about .
Please guide me with the step to follow
.
I am sending records from Lambda function and from JDBC source both .

So basically how can i enforce AVRO schema and test ?
Do i have to change anything in avro-consumer properties file ?

Or is this correct way to register schema

   ./bin/kafka-avro-console-producer \
                 --broker-list b-3.**:9092,b-**:9092,b-**:9092 --topic AVRO-AUDIT_EVENT \
                 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'





curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"     --data '{"schema" : "{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"VERSION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"ACTION_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"EVENT_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"CLIENT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"DETAILS\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_TYPE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"UTC_DATE_TIME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"POINT_IN_TIME_PRECISION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TIME_ZONE\"},{\"type\":\"string\",\"optional\":true,\"field\":\"TIMELINE_PRECISION\"},{\"type\":\"string\",\"optional\":true,\"field\":\"GROUP_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_DISPLAY_NAME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"OBJECT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"USER_DISPLAY_NAME\"},{\"type\":\"string\",\"optional\":true,\"field\":\"USER_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"PARENT_EVENT_ID\"},{\"type\":\"string\",\"optional\":true,\"field\":\"NOTES\"},{\"type\":\"string\",\"optional\":true,\"field\":\"SUMMARY\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_UTC_DT\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_PITP\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_TZ\"},{\"type\":\"string\",\"optional\":true,\"field\":\"AUDIT_EVENT_TO_DATE_TP\"}],\"optional\":false,\"name\":\"test\"}"}' http://localhost:8081/subjects/view/versions

what next i have to do

But when i try to see my schema i get only below

curl --silent -X GET http://localhost:8081/subjects/AVRO-AUDIT-EVENT/versions/latest

this is the result

{"subject":"AVRO-AUDIT-EVENT","version":1,"id":161,"schema":"{\"type\":\"string\",\"optional\":false}"}

Why i do not see my full registered schema

Also when i try to delete schema

i get below error

{"error_code":405,"message":"HTTP 405 Method Not Allowed"

i am not sure if my schema is registered correctly .

Please help me.
Thanks in Advance

答案1

得分: 2

  • "is my schema valid" 是我的模式有效
  • "You can use the REST API of the Registry to try and submit it and see..." 你可以使用注册表的 REST API 尝试提交并查看...
  • "where should i keep this schema file location i am not sure about" 我不清楚你要把这个模式文件放在哪里
  • "It's not clear how you're sending messages..." 不清楚你是如何发送消息的...
  • "If you actually wrote Kafka producer code, you store it within your code (as a string) or as a resource file.. If using Java, you can instead use the SchemaBuilder class to create the Schema object" 如果你真的编写了Kafka生产者代码,你可以将它存储在你的代码中(作为字符串)或作为资源文件。如果使用Java,你可以使用SchemaBuilder类来创建Schema对象。
  • "You need to rewrite your producer to use Avro Schema and Serializers if you've not already" 如果你还没有这样做,你需要重写你的生产者以使用Avro Schema和序列化器。
  • "If we create AVRO schema will it work for Json as well." 如果我们创建AVRO模式,它是否也适用于Json。
  • "Avro is a Binary format, but there is a JSONDecoder for it." Avro是一种二进制格式,但有一个JSON解码器适用于它。
  • "what should be the URL for our AVRO schema properties file ?" 我们的AVRO模式属性文件的URL应该是什么?
  • "It needs to be the IP of your Schema Registry once you figure out how to start it. (with schema-registry-start)" 一旦你弄清楚如何启动它,它需要是你的模式注册表的IP地址(使用schema-registry-start)。
  • "Do i have to change anything in avro-consumer properties file ?" 我在avro-consumer属性文件中需要做任何更改吗?
  • "You need to use the Avro Deserializer" 你需要使用Avro Deserializer。
  • "is this correct way to register schema" 这是注册模式的正确方式吗?
  • "/bin/kafka-avro-console-producer" 不完全正确。这是使用模式生成消息的方式(你需要使用正确的模式)。你还必须提供--property schema.registry.url
  • "You use the REST API of the Registry to register and verify schemas" 你使用注册表的REST API来注册和验证模式。
英文:

> is my schema valid

You can use the REST API of the Registry to try and submit it and see...

> where should i keep this schema file location i am not sure about

It's not clear how you're sending messages...

If you actually wrote Kafka producer code, you store it within your code (as a string) or as a resource file.. If using Java, you can instead use the SchemaBuilder class to create the Schema object

You need to rewrite your producer to use Avro Schema and Serializers if you've not already

> If we create AVRO schema will it work for Json as well .

Avro is a Binary format, but there is a JSONDecoder for it.

> what should be the URL for our AVRO schema properties file ?

It needs to be the IP of your Schema Registry once you figure out how to start it. (with schema-registry-start)

> Do i have to change anything in avro-consumer properties file ?

You need to use the Avro Deserializer

> is this correct way to register schema

.> /bin/kafka-avro-console-producer \

Not quite. That's how you produce a message with a schema (and you need to use the correct schema). You also must provide --property schema.registry.url

You use the REST API of the Registry to register and verify schemas

huangapple
  • 本文由 发表于 2020年1月6日 17:57:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/59609984.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定