Kafka Connect自定义SMT Arvo序列化异常

huangapple go评论85阅读模式
英文:

Kafka Connect custom SMT arvo serialization exception

问题

以下是您提供的内容的翻译部分:

I am writing my own SMT that do some cleansing on a specific fields.
我正在编写自己的SMT,用于对特定字段进行一些数据清洗。

The code structure is straightforward, compiled and added to the `plugin.path` successfully.
代码结构很简单,已成功编译并添加到“plugin.path”。

When I create a connector with the following configuration
当我使用以下配置创建一个连接器时

I got the below exception in the connector logs
我在连接器日志中收到了以下异常信息

Not sure why Avro serialization is complaining here, also worth to mention when I update the connector configs and amend the below configs to use JSONConverter for the value and StringConverter for the key, everything works fine, not sure what I am missing here.
不确定为什么Avro序列化会出现问题,值得一提的是,当我更新连接器配置并修改以下配置以使用JSONConverter作为值的转换器,以及StringConverter作为键的转换器时,一切正常运行,不确定我在这里漏掉了什么。
英文:

I am writing my own SMT that do some cleansing on a specific fields.

The code structure is straightforward, compiled and added to the plugin.path successfully.

When I create a connector with the following configuration

{
    "name": "sql-to-kafka", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "127.0.0.1", 
        "database.port": "3306", 
        "database.user": "username", 
        "database.password": "password", 
        "database.server.id": "11111", 
        "database.include.list": "test", 
        "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", 
        "schema.history.internal.kafka.topic": "schemahistory.localdb", 
        "include.schema.changes": "false",
        "database.encrypt": false,
        "table.include.list": "test.bins",
        "topic.prefix":"localdb",
        "transforms":"unwrap,MyCustomSMT",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": false,
        "transforms.unwrap.delete.handling.mode": "drop",
        "transforms.MyCustomSMT.type": "MyCustomSMT$Value",
        "transforms.MyCustomSMT.field": "segment"
    }
}

I got the below exception in the connector logs

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:220)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:271)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic localdb.test.bins :
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
	at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)
	... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"Key","namespace":"localdb.test.bins","fields":[{"name":"id","type":"int"}],"connect.name":"localdb.test.bins.Key"}
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
	at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
	... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Leader not known.; error code: 50004
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:257)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:366)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:337)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:115)
	... 17 more

Not sure why Avro serialization is complaining here, also worth to mention when I update the connector configs and amend the below configs to use JSONConverter for the value and StringConverter for the key, everything works fine, not sure what I am missing here.

        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable":"false"

答案1

得分: 0

在下面的URL中查看:

https://debezium.io/documentation/reference/stable/configuration/avro.html#avro-serialization

在帖子中提到:

"如果您希望记录以JSON序列化,考虑将以下连接器配置属性设置为false:

key.converter.schemas.enable

value.converter.schemas.enable

将这些属性设置为false会排除每个记录的冗长模式信息。"

"要使用Apache Avro序列化,您必须部署一个管理Avro消息模式及其版本的模式注册表。可用选项包括Apicurio API和模式注册表以及Confluent模式注册表。这两者都在此处描述。"

这解释了行为。

英文:

Look at the following url:

https://debezium.io/documentation/reference/stable/configuration/avro.html#avro-serialization

In the post remark:

" If you want records to be serialized with JSON, consider setting the following connector configuration properties to false:

key.converter.schemas.enable

value.converter.schemas.enable

Setting these properties to false excludes the verbose schema information from each record."

"To use Apache Avro serialization, you must deploy a schema registry that manages Avro message schemas and their versions. Available options include the Apicurio API and Schema Registry as well as the Confluent Schema Registry. Both are described here."

This explains the behaviour.

答案2

得分: 0

我的问题的解决方法只是从头开始重新创建本地环境,其中模式注册表异常日志不再发生,与连接器或SMT配置无关的其他内容。

英文:

The fix for my issue was just recreating the local environment from scratch, where the schema-registry exceptions logs are not happening anymore, nothing else related to the connector nor the SMT configuration.

huangapple
  • 本文由 发表于 2023年2月26日 23:07:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/75572879.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定