英文:
AvroConverter fails to serialize Nested Structures using latest schema version
问题
I'm trying to setup a debezium connector to capture the changes on a collection and stream those changes to a kafka topic.
Everything works great (inserts, updates, deletes/tombstones) until I introduced the schema registry and Avro Schemas to the configuration.
I'm trying to use a schema already defined on the topic that contains nested structures, here is an example of the schema registered on schema registry:
{
"name": "User",
"namespace": "com.namespace.models",
"type": "record",
"fields": [
{
"name": "id",
"type": "string"
},
{
"default": null,
"name": "version",
"type": [
"null",
"int"
]
},
{
"default": null,
"name": "address",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "street",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "city",
"type": [
"null",
"string"
]
}
],
"name": "Address",
"namespace": "com.myothernamespace.models",
"type": "record"
}
]
},
{
"default": null,
"name": "otherAddress",
"type": [
"null",
"com.myothernamespace.models.Address"
]
}
]
}
When the connector tries to write to the topic it fails on the AvroConverter with the following exception
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic..
I can see in the logs that the connector tries to produce the record using a different schema, not using the correct namespace of the nested structure (address). Also it fails on serializing the otherAddress field as it is null on the mongo collection and it outputs as a union (null, string) on the AvroConverter produced schema. It should be a union of null, Address.
Looking at the ID field it fails as well as the produced schema defines it as optional.
This causes a mismatch on the Schema registered on the Schema Registry and the schema that the producer is trying to use.
Here I share the mismatched schema that the converter is using to produce the records:
{
"name": "User",
"namespace": "com.namespace.models",
"type": "record",
"fields": [
{
"default": null,
"name": "id",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "version",
"type": [
"null",
"int"
]
},
{
"default": null,
"name": "address",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "street",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "city",
"type": [
"null",
"string"
]
}
],
"name": "address",
"namespace": "<my_topic_name>", # It's using my topic name as namespace
"type": "record"
}
]
},
{
"default": null,
"name": "otherAddress",
"type": [
"null",
"string" # not using the correct structure (Adddress)
]
}
]
}
I was able to solve the root structure namespace with the SetSchemaMetadata SMT but AFAIK it doesn't support to be applied on nested structures.
Here is my connector config:
{
"name": "test-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.members.auto.discover": true,
"mongodb.connection.string": "<redacted>",
"mongodb.user" : "<redacted>",
"mongodb.password" : "<redacted>",
"database.include.list": "<redacted>",
"collection.include.list": "<redacted>",
"mongodb.ssl.enabled": true,
"publish.full.document.only": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.enhanced.avro.schema.support": "true",
"value.converter.schema.registry.url": "<redacted>",
"value.converter.auto.register.schemas": "false",
"value.converter.use.latest.version": "true",
"value.converter.connect.meta.data": "false",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "<redacted>",
"transforms": "unwrap,ReplaceField,extractIdAsKey,SetValueSchema",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "_id",
"transforms.extractIdAsKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractIdAsKey.field": "id",
"transforms.SetValueSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetValueSchema.schema.name": "com.namespace.models.User",
"transforms.SetValueSchema.predicate": "isTombstone",
"transforms.SetValueSchema.negate": "true",
"predicates": "isTombstone",
"predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"topic.prefix": "test",
"tasks.max" : "1"
}
}
I was expecting that by using Schema Registry with use.latest.version
the AvroConverter would use the schema to produce the records. Does anyone have any ideas on what causes and how to overcome this mismatch between the schema registered on schema registry and the schema that AvroConverter uses?
英文:
I'm trying to setup a debezium connector to capture the changes on a collection and stream those changes to a kafka topic.
Everything works great (inserts, updates, deletes/tombstones) until I introduced the schema registry and Avro Schemas to the configuration.
I'm trying to use a schema already defined on the topic that contains nested structures, here is an example of the schema registered on schema registry:
{
"name": "User",
"namespace": "com.namespace.models",
"type": "record"
"fields": [
{
"name": "id",
"type": "string"
},
{
"default": null,
"name": "version",
"type": [
"null",
"int"
]
},
{
"default": null,
"name": "address",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "street",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "city",
"type": [
"null",
"string"
]
}
],
"name": "Address",
"namespace": "com.myothernamespace.models",
"type": "record"
}
]
},
{
"default": null,
"name": "otherAddress",
"type": [
"null",
"com.myothernamespace.models.Address"
]
}
]
}
When the connector tries to write to the topic it fails on the AvroConverter with the following exception
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic..
I can see in the logs that the connector tries to produce the record using a different schema, not using the correct namespace of the nested structure (address). Also it fails on serializing the otherAddress field as it is null on the mongo collection and it outputs as a union (null, string) on the AvroConverter produced schema. It should be a union of null, Address.
Looking at the ID field it fails as well as the produced schema defines it as optional.
This causes a mismatch on the Schema registered on the Schema Registry and the schema that the producer is trying to use.
Here I share the mismatched schema that the converter is using to produce the records:
{
"name": "User",
"namespace": "com.namespace.models",
"type": "record"
"fields": [
{
"default": null,
"name": "id",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "version",
"type": [
"null",
"int"
]
},
{
"default": null,
"name": "address",
"type": [
"null",
{
"fields": [
{
"default": null,
"name": "street",
"type": [
"null",
"string"
]
},
{
"default": null,
"name": "city",
"type": [
"null",
"string"
]
}
],
"name": "address",
"namespace": "<my_topic_name>", # It's using my topic name as namespace
"type": "record"
}
]
},
{
"default": null,
"name": "otherAddress",
"type": [
"null",
"string" # not using the correct structure (Adddress)
]
}
]
}
I was able to solve the root structure namespace with the SetSchemaMetadata SMT but AFAIK it doesn't support to be applied on nested structures.
Here is my connector config:
{
"name": "test-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.members.auto.discover": true,
"mongodb.connection.string": "<redacted>",
"mongodb.user" : "<redacted>",
"mongodb.password" : "<redacted>",
"database.include.list": "<redacted>",
"collection.include.list": "<redacted>",
"mongodb.ssl.enabled": true,
"publish.full.document.only": true,
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.enhanced.avro.schema.support": "true",
"value.converter.schema.registry.url": "<redacted>",
"value.converter.auto.register.schemas": "false",
"value.converter.use.latest.version": "true",
"value.converter.connect.meta.data": "false",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.basic.auth.user.info": "<redacted>",
"transforms": "unwrap,ReplaceField,extractIdAsKey,SetValueSchema",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "_id",
"transforms.extractIdAsKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractIdAsKey.field": "id",
"transforms.SetValueSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetValueSchema.schema.name": "com.namespace.models.User",
"transforms.SetValueSchema.predicate": "isTombstone",
"transforms.SetValueSchema.negate": "true",
"predicates": "isTombstone",
"predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"topic.prefix": "test",
"tasks.max" : "1"
}
}
I was expecting that by using Schema Registry with use.latest.version
the AvroConverter would use the schema to produce the records. Does anyone have any ideas on what causes and how to overcome this mismatch between the schema registered on schema registry and the schema that AvroConverter uses?
答案1
得分: 3
我已解决了这个问题并确定了根本原因。问题出在BsonDocument
和SourceRecord
之间的转换上。据我所知,使用Debezium不可能为此转换提供模式,因此它是使用从ChangeStream(fullDocument)中提取的BsonDocument
上的可用数据进行转换的。这就是为什么当它尝试将SourceRecord
转换为Avro时会失败。
最终,我使用了MongoDBSourceConnector,因为它支持为BsonDocument
和SourceRecord
之间的转换提供模式(output.schema.value
)。Mongo源连接器在1.7版本上添加了对Avro模式命名空间的支持。
我使用的是1.9.0版本,它还提供了墓碑支持,并且连接器可以使用Schema Registry上注册的最新模式版本正常工作,从BsonDocument转换到Avro也正常工作。
更多信息请参考:https://www.mongodb.com/docs/kafka-connector/current/whats-new/#std-label-kafka-connector-whats-new-1.9
英文:
I've solved this issue and identified the root cause. The problem lies on the conversion between BsonDocument
and SourceRecord
. AFAIK, with debezium it's not possible to provide a schema to be used on this conversion thus it is converted using the data available on the BsonDocument
extracted from the ChangeStream (fullDocument). That's why it fails when it tries to convert the SourceRecord
to Avro.
I've ended up using MongoDBSourceConnector as it supports a way to provide the schema to the conversion between BsonDocument
and SourceRecord
(output.schema.value
). Mongo source connector added support for avro schema namespaces on version 1.7.
I've used version 1.9.0 that also ships with tombstone support and got the connector working using the latest schema version registered on Schema Registry and the conversion from BsonDocument until Avro works properly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论