英文:
Kafka Connect custom SMT arvo serialization exception
问题
以下是您提供的内容的翻译部分:
I am writing my own SMT that do some cleansing on a specific fields.
我正在编写自己的SMT,用于对特定字段进行一些数据清洗。
The code structure is straightforward, compiled and added to the `plugin.path` successfully.
代码结构很简单,已成功编译并添加到“plugin.path”。
When I create a connector with the following configuration
当我使用以下配置创建一个连接器时
I got the below exception in the connector logs
我在连接器日志中收到了以下异常信息
Not sure why Avro serialization is complaining here, also worth to mention when I update the connector configs and amend the below configs to use JSONConverter for the value and StringConverter for the key, everything works fine, not sure what I am missing here.
不确定为什么Avro序列化会出现问题,值得一提的是,当我更新连接器配置并修改以下配置以使用JSONConverter作为值的转换器,以及StringConverter作为键的转换器时,一切正常运行,不确定我在这里漏掉了什么。
英文:
I am writing my own SMT that do some cleansing on a specific fields.
The code structure is straightforward, compiled and added to the plugin.path
successfully.
When I create a connector with the following configuration
{
"name": "sql-to-kafka",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "127.0.0.1",
"database.port": "3306",
"database.user": "username",
"database.password": "password",
"database.server.id": "11111",
"database.include.list": "test",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schemahistory.localdb",
"include.schema.changes": "false",
"database.encrypt": false,
"table.include.list": "test.bins",
"topic.prefix":"localdb",
"transforms":"unwrap,MyCustomSMT",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.MyCustomSMT.type": "MyCustomSMT$Value",
"transforms.MyCustomSMT.field": "segment"
}
}
I got the below exception in the connector logs
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:271)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic localdb.test.bins :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)
... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"Key","namespace":"localdb.test.bins","fields":[{"name":"id","type":"int"}],"connect.name":"localdb.test.bins.Key"}
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Leader not known.; error code: 50004
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:257)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:366)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:337)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:115)
... 17 more
Not sure why Avro serialization is complaining here, also worth to mention when I update the connector configs and amend the below configs to use JSONConverter for the value and StringConverter for the key, everything works fine, not sure what I am missing here.
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false"
答案1
得分: 0
在下面的URL中查看:
https://debezium.io/documentation/reference/stable/configuration/avro.html#avro-serialization
在帖子中提到:
"如果您希望记录以JSON序列化,考虑将以下连接器配置属性设置为false:
key.converter.schemas.enable
value.converter.schemas.enable
将这些属性设置为false会排除每个记录的冗长模式信息。"
"要使用Apache Avro序列化,您必须部署一个管理Avro消息模式及其版本的模式注册表。可用选项包括Apicurio API和模式注册表以及Confluent模式注册表。这两者都在此处描述。"
这解释了行为。
英文:
Look at the following url:
https://debezium.io/documentation/reference/stable/configuration/avro.html#avro-serialization
In the post remark:
" If you want records to be serialized with JSON, consider setting the following connector configuration properties to false:
key.converter.schemas.enable
value.converter.schemas.enable
Setting these properties to false excludes the verbose schema information from each record."
"To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. Available options include the Apicurio API and Schema Registry as well as the Confluent Schema Registry. Both are described here."
This explains the behaviour.
答案2
得分: 0
我的问题的解决方法只是从头开始重新创建本地环境,其中模式注册表异常日志不再发生,与连接器或SMT配置无关的其他内容。
英文:
The fix for my issue was just recreating the local environment from scratch, where the schema-registry exceptions logs are not happening anymore, nothing else related to the connector nor the SMT configuration.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论