如何将SMT应用于Kafka Connect中的单个主题?

huangapple go评论89阅读模式
英文:

How to apply an SMT to a single topic in Kafka connect?

问题

我想对单个主题应用一组单一消息转换(SMT,Single Message Transforms)。

我有以下配置:


connector.class: io.debezium.connector.postgresql.PostgresConnector
name: test
database.server.name: testserver
database.hostname: 127.0.0.1
database.port: 5432
database.dbname: test
database.user: testuser
database.password: "testpass"
schema.whitelist: test

table.whitelist: test.topic1, test.topic2, test.topic3

transforms:unwrap,extractTopic1Key,extractTopic1Value,extractTopic2Key


transforms.unwrap.type: io.debezium.transforms.UnwrapFromEnvelope
transforms.unwrap.delete.handling.mode: rewrite
transforms.unwrap.drop.tombstones: false

transforms.extractTopic1Key.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractTopic1Value.type: org.apache.kafka.connect.transforms.ExtractField$Value

transforms.extractTopic1Key.field: propname
transforms.extractTopic1Value.field: propvalue

transforms.extractTopic1Key.predicate: test_topic1
transforms.extractTopic1Value.predicate: test_topic1

transforms.extractTopic2Key.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractTopic2Key.field: id
transforms.extractTopic2Key.predicate: test_topic2


predicates=test_topic1,test_topic2
predicates.test_topic1.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.test_topic1.pattern=.*topic1

predicates.test_topic2.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.test_topic2.pattern=.*topic2

key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
decimal.handling.mode: string
include.schema.changes: false
#Adds header which says insert,update,delete
transforms.unwrap.operation.header: true
snapshot.mode: initial
heartbeat.interval.ms: 10000
slot.name: test

我遇到了以下错误:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:308)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 11 more

当我移除除了 unwrap 之外的所有转换时,它能够正常工作。我的示例主题数据如下:

Key

{"propname":"my.test.prop"} 

Value

{"propname":"my.test.prop","propvalue":"10","__deleted":"false"}

propname 是表中的主键。

上述配置有什么问题,如何仅将特定的 SMT 应用于指定的主题?

附注:我正在使用 Confluent Platform 5.3.1。

英文:

I would like to apply a set of SMT (Single Message Transforms) to one single topic.

I have the following configuration..


connector.class: io.debezium.connector.postgresql.PostgresConnector
name: test
database.server.name: testserver
database.hostname: 127.0.0.1
database.port: 5432
database.dbname: test
database.user: testuser
database.password: "testpass"
schema.whitelist: test
table.whitelist: test.topic1, test.topic2, test.topic3
transforms:unwrap,extractTopic1Key,extractTopic1Value,extractTopic2Key
transforms.unwrap.type: io.debezium.transforms.UnwrapFromEnvelope
transforms.unwrap.delete.handling.mode: rewrite
transforms.unwrap.drop.tombstones: false
transforms.extractTopic1Key.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractTopic1Value.type: org.apache.kafka.connect.transforms.ExtractField$Value
transforms.extractTopic1Key.field: propname
transforms.extractTopic1Value.field: propvalue
transforms.extractTopic1Key.predicate: test_topic1
transforms.extractTopic1Value.predicate: test_topic1
transforms.extractTopic2Key.type:org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractTopic2Key.field: id
transforms.extractTopic2Key.predicate: test_topic2
predicates=test_topic1,test_topic2
predicates.test_topic1.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.test_topic1.pattern=.*topic1
predicates.test_topic2.type=org.apache.kafka.connect.predicates.TopicNameMatches
predicates.test_topic2.pattern=.*topic2
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
decimal.handling.mode: string
include.schema.changes: false
#Adds header which says insert,update,delete
transforms.unwrap.operation.header: true
snapshot.mode: initial
heartbeat.interval.ms: 10000
slot.name: test

I am getting the following error..

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:308)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 11 more

When I remove the all transformations except unwrap, it is working fine.
My sample topic data looks as follows.

Key

{"propname":"my.test.prop"} 

Value

{"propname":"my.test.prop","propvalue":"10","__deleted":"false"}

propname is the primary key in the table.

What is wrong with the above configuration and how do we apply specific SMTs only to specified topics?

P.S: I am using Confluent Platform 5.3.1

答案1

得分: 3

[tag:apache-Kafka]自2.6.0版本起,已经通过KIP-585 - 过滤器和条件性SMT支持有条件地应用SMT。

您正在使用Confluent Platform 5.3.1。5.3.1是Confluent Platform的修复错误版本,为您提供了Apache Kafka 2.3.0,这是Kafka的最新稳定版本。

因此,您需要将Kafka Connect升级到2.6.0版本。

英文:

[tag:apache-Kafka] has included supporting for conditionally applying an SMT since 2.6.0 version through KIP-585 - Filter and Conditional SMTs.

You are using Confluent Platform 5.3.1. 5.3.1 is a bugfix release of Confluent Platform that provides you with Apache Kafka 2.3.0, the latest stable version of Kafka.

So you need to upgrade Kafka Connect to 2.6.0 version.

huangapple
  • 本文由 发表于 2020年10月28日 04:03:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/64562076.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定