AvroConverter 无法使用最新模式版本序列化嵌套结构。

huangapple go评论123阅读模式
英文:

AvroConverter fails to serialize Nested Structures using latest schema version

问题

I'm trying to setup a debezium connector to capture the changes on a collection and stream those changes to a kafka topic.

Everything works great (inserts, updates, deletes/tombstones) until I introduced the schema registry and Avro Schemas to the configuration.

I'm trying to use a schema already defined on the topic that contains nested structures, here is an example of the schema registered on schema registry:

{
  "name": "User",
  "namespace": "com.namespace.models",
  "type": "record",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "default": null,
      "name": "version",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "default": null,
      "name": "address",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "name": "street",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "city",
              "type": [
                "null",
                "string"
              ]
            }
          ],
          "name": "Address",
          "namespace": "com.myothernamespace.models",
          "type": "record"
        }
      ]
    },
    {
      "default": null,
      "name": "otherAddress",
      "type": [
        "null",
        "com.myothernamespace.models.Address"
      ]
    }
  ]
}

When the connector tries to write to the topic it fails on the AvroConverter with the following exception

Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic..

I can see in the logs that the connector tries to produce the record using a different schema, not using the correct namespace of the nested structure (address). Also it fails on serializing the otherAddress field as it is null on the mongo collection and it outputs as a union (null, string) on the AvroConverter produced schema. It should be a union of null, Address.

Looking at the ID field it fails as well as the produced schema defines it as optional.

This causes a mismatch on the Schema registered on the Schema Registry and the schema that the producer is trying to use.

Here I share the mismatched schema that the converter is using to produce the records:

{
  "name": "User",
  "namespace": "com.namespace.models",
  "type": "record",
  "fields": [
    {
      "default": null,
      "name": "id",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "version",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "default": null,
      "name": "address",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "name": "street",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "city",
              "type": [
                "null",
                "string"
              ]
            }
          ],
          "name": "address",        
          "namespace": "<my_topic_name>", # It's using my topic name as namespace
          "type": "record"
        }
      ]
    },
    {
      "default": null,
      "name": "otherAddress",
      "type": [
        "null",
        "string" # not using the correct structure (Adddress)
      ]
    }
  ]
}

I was able to solve the root structure namespace with the SetSchemaMetadata SMT but AFAIK it doesn't support to be applied on nested structures.

Here is my connector config:

{
  "name": "test-connector",
  "config": {
    "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.members.auto.discover": true,
    "mongodb.connection.string": "<redacted>",
    "mongodb.user" : "<redacted>",
    "mongodb.password" : "<redacted>",
    "database.include.list": "<redacted>",
    "collection.include.list": "<redacted>",
    "mongodb.ssl.enabled": true,   
    "publish.full.document.only": true,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.enhanced.avro.schema.support": "true",
    "value.converter.schema.registry.url": "<redacted>",
    "value.converter.auto.register.schemas": "false",
    "value.converter.use.latest.version": "true",
    "value.converter.connect.meta.data": "false",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.basic.auth.user.info": "<redacted>",
    "transforms": "unwrap,ReplaceField,extractIdAsKey,SetValueSchema",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "none",
    "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.ReplaceField.blacklist": "_id",
    "transforms.extractIdAsKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractIdAsKey.field": "id",
    "transforms.SetValueSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
    "transforms.SetValueSchema.schema.name": "com.namespace.models.User",
    "transforms.SetValueSchema.predicate": "isTombstone",
    "transforms.SetValueSchema.negate": "true",
    "predicates": "isTombstone",
    "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
    "topic.prefix": "test",
    "tasks.max" : "1"
  }
}

I was expecting that by using Schema Registry with use.latest.version the AvroConverter would use the schema to produce the records. Does anyone have any ideas on what causes and how to overcome this mismatch between the schema registered on schema registry and the schema that AvroConverter uses?

英文:

I'm trying to setup a debezium connector to capture the changes on a collection and stream those changes to a kafka topic.

Everything works great (inserts, updates, deletes/tombstones) until I introduced the schema registry and Avro Schemas to the configuration.

I'm trying to use a schema already defined on the topic that contains nested structures, here is an example of the schema registered on schema registry:

{
  &quot;name&quot;: &quot;User&quot;,
  &quot;namespace&quot;: &quot;com.namespace.models&quot;,
  &quot;type&quot;: &quot;record&quot;
  &quot;fields&quot;: [
    {
      &quot;name&quot;: &quot;id&quot;,
      &quot;type&quot;: &quot;string&quot;
    },
    {
      &quot;default&quot;: null,
      &quot;name&quot;: &quot;version&quot;,
      &quot;type&quot;: [
        &quot;null&quot;,
        &quot;int&quot;
      ]
    },
    {
      &quot;default&quot;: null,
      &quot;name&quot;: &quot;address&quot;,
      &quot;type&quot;: [
        &quot;null&quot;,
        {
          &quot;fields&quot;: [
            {
              &quot;default&quot;: null,
              &quot;name&quot;: &quot;street&quot;,
              &quot;type&quot;: [
                &quot;null&quot;,
                &quot;string&quot;
              ]
            },
            {
              &quot;default&quot;: null,
              &quot;name&quot;: &quot;city&quot;,
              &quot;type&quot;: [
                &quot;null&quot;,
                &quot;string&quot;
              ]
            }
          ],
          &quot;name&quot;: &quot;Address&quot;,
          &quot;namespace&quot;: &quot;com.myothernamespace.models&quot;,
          &quot;type&quot;: &quot;record&quot;
        }
      ]
    },
    {
      &quot;default&quot;: null,
      &quot;name&quot;: &quot;otherAddress&quot;,
      &quot;type&quot;: [
        &quot;null&quot;,
        &quot;com.myothernamespace.models.Address&quot;
      ]
    }
  ]
}

When the connector tries to write to the topic it fails on the AvroConverter with the following exception

Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic.. 

I can see in the logs that the connector tries to produce the record using a different schema, not using the correct namespace of the nested structure (address). Also it fails on serializing the otherAddress field as it is null on the mongo collection and it outputs as a union (null, string) on the AvroConverter produced schema. It should be a union of null, Address.

Looking at the ID field it fails as well as the produced schema defines it as optional.

This causes a mismatch on the Schema registered on the Schema Registry and the schema that the producer is trying to use.

Here I share the mismatched schema that the converter is using to produce the records:

{
  &quot;name&quot;: &quot;User&quot;,
  &quot;namespace&quot;: &quot;com.namespace.models&quot;,
  &quot;type&quot;: &quot;record&quot;
  &quot;fields&quot;: [
    {
      &quot;default&quot;: null,
      &quot;name&quot;: &quot;id&quot;,
      &quot;type&quot;: [
        &quot;null&quot;,
        &quot;string&quot;
      ]
    },
    {
      &quot;default&quot;: null,
      &quot;name&quot;: &quot;version&quot;,
      &quot;type&quot;: [
        &quot;null&quot;,
        &quot;int&quot;
      ]
    },
    {
      &quot;default&quot;: null,
      &quot;name&quot;: &quot;address&quot;,
      &quot;type&quot;: [
        &quot;null&quot;,
        {
          &quot;fields&quot;: [
            {
              &quot;default&quot;: null,
              &quot;name&quot;: &quot;street&quot;,
              &quot;type&quot;: [
                &quot;null&quot;,
                &quot;string&quot;
              ]
            },
            {
              &quot;default&quot;: null,
              &quot;name&quot;: &quot;city&quot;,
              &quot;type&quot;: [
                &quot;null&quot;,
                &quot;string&quot;
              ]
            }
          ],
          &quot;name&quot;: &quot;address&quot;,        
          &quot;namespace&quot;: &quot;&lt;my_topic_name&gt;&quot;, # It&#39;s using my topic name as namespace
          &quot;type&quot;: &quot;record&quot;
        }
      ]
    },
    {
      &quot;default&quot;: null,
      &quot;name&quot;: &quot;otherAddress&quot;,
      &quot;type&quot;: [
        &quot;null&quot;,
        &quot;string&quot; # not using the correct structure (Adddress)
      ]
    }
  ]
}

I was able to solve the root structure namespace with the SetSchemaMetadata SMT but AFAIK it doesn't support to be applied on nested structures.

Here is my connector config:

{
  &quot;name&quot;: &quot;test-connector&quot;,
  &quot;config&quot;: {
    &quot;connector.class&quot; : &quot;io.debezium.connector.mongodb.MongoDbConnector&quot;,
    &quot;mongodb.members.auto.discover&quot;: true,
    &quot;mongodb.connection.string&quot;: &quot;&lt;redacted&gt;&quot;,
    &quot;mongodb.user&quot; : &quot;&lt;redacted&gt;&quot;,
    &quot;mongodb.password&quot; : &quot;&lt;redacted&gt;&quot;,
    &quot;database.include.list&quot;: &quot;&lt;redacted&gt;&quot;,
    &quot;collection.include.list&quot;: &quot;&lt;redacted&gt;&quot;,
    &quot;mongodb.ssl.enabled&quot;: true,   
    &quot;publish.full.document.only&quot;: true,
    &quot;key.converter&quot;: &quot;org.apache.kafka.connect.storage.StringConverter&quot;,
    &quot;value.converter&quot;: &quot;io.confluent.connect.avro.AvroConverter&quot;,
    &quot;value.converter.enhanced.avro.schema.support&quot;: &quot;true&quot;,
    &quot;value.converter.schema.registry.url&quot;: &quot;&lt;redacted&gt;&quot;,
    &quot;value.converter.auto.register.schemas&quot;: &quot;false&quot;,
    &quot;value.converter.use.latest.version&quot;: &quot;true&quot;,
    &quot;value.converter.connect.meta.data&quot;: &quot;false&quot;,
    &quot;value.converter.basic.auth.credentials.source&quot;: &quot;USER_INFO&quot;,
    &quot;value.converter.basic.auth.user.info&quot;: &quot;&lt;redacted&gt;&quot;,
    &quot;transforms&quot;: &quot;unwrap,ReplaceField,extractIdAsKey,SetValueSchema&quot;,
    &quot;transforms.unwrap.type&quot;: &quot;io.debezium.connector.mongodb.transforms.ExtractNewDocumentState&quot;,
    &quot;transforms.unwrap.drop.tombstones&quot;: &quot;false&quot;,
    &quot;transforms.unwrap.delete.handling.mode&quot;: &quot;none&quot;,
    &quot;transforms.ReplaceField.type&quot;: &quot;org.apache.kafka.connect.transforms.ReplaceField$Value&quot;,
    &quot;transforms.ReplaceField.blacklist&quot;: &quot;_id&quot;,
    &quot;transforms.extractIdAsKey.type&quot;: &quot;org.apache.kafka.connect.transforms.ExtractField$Key&quot;,
    &quot;transforms.extractIdAsKey.field&quot;: &quot;id&quot;,
    &quot;transforms.SetValueSchema.type&quot;: &quot;org.apache.kafka.connect.transforms.SetSchemaMetadata$Value&quot;,
    &quot;transforms.SetValueSchema.schema.name&quot;: &quot;com.namespace.models.User&quot;,
    &quot;transforms.SetValueSchema.predicate&quot;: &quot;isTombstone&quot;,
    &quot;transforms.SetValueSchema.negate&quot;: &quot;true&quot;,
    &quot;predicates&quot;: &quot;isTombstone&quot;,
    &quot;predicates.isTombstone.type&quot;: &quot;org.apache.kafka.connect.transforms.predicates.RecordIsTombstone&quot;,
    &quot;topic.prefix&quot;: &quot;test&quot;,
    &quot;tasks.max&quot; : &quot;1&quot;
  }
}

I was expecting that by using Schema Registry with use.latest.version the AvroConverter would use the schema to produce the records. Does anyone have any ideas on what causes and how to overcome this mismatch between the schema registered on schema registry and the schema that AvroConverter uses?

答案1

得分: 3

我已解决了这个问题并确定了根本原因。问题出在BsonDocumentSourceRecord之间的转换上。据我所知,使用Debezium不可能为此转换提供模式,因此它是使用从ChangeStream(fullDocument)中提取的BsonDocument上的可用数据进行转换的。这就是为什么当它尝试将SourceRecord转换为Avro时会失败。

最终,我使用了MongoDBSourceConnector,因为它支持为BsonDocumentSourceRecord之间的转换提供模式(output.schema.value)。Mongo源连接器在1.7版本上添加了对Avro模式命名空间的支持。

我使用的是1.9.0版本,它还提供了墓碑支持,并且连接器可以使用Schema Registry上注册的最新模式版本正常工作,从BsonDocument转换到Avro也正常工作。

更多信息请参考:https://www.mongodb.com/docs/kafka-connector/current/whats-new/#std-label-kafka-connector-whats-new-1.9

英文:

I've solved this issue and identified the root cause. The problem lies on the conversion between BsonDocument and SourceRecord. AFAIK, with debezium it's not possible to provide a schema to be used on this conversion thus it is converted using the data available on the BsonDocument extracted from the ChangeStream (fullDocument). That's why it fails when it tries to convert the SourceRecord to Avro.

I've ended up using MongoDBSourceConnector as it supports a way to provide the schema to the conversion between BsonDocument and SourceRecord (output.schema.value). Mongo source connector added support for avro schema namespaces on version 1.7.

I've used version 1.9.0 that also ships with tombstone support and got the connector working using the latest schema version registered on Schema Registry and the conversion from BsonDocument until Avro works properly.

https://www.mongodb.com/docs/kafka-connector/current/whats-new/#std-label-kafka-connector-whats-new-1.9

huangapple
  • 本文由 发表于 2023年3月7日 21:13:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/75662442.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定