英文:
How to set up Kafka-connector to use custom transform in Debezium?
问题
我是你的中文翻译,以下是你提供的翻译好的部分:
"transforms.router.type" 是 Kafka Connect 中用于进行事件转换的配置项,你想要使用它来与你的 Debezium 设置一起使用,但是遇到了问题。你创建了一个特殊的事件转换 Java 类,并为它准备了配置,以便部署到容器中,配置如下:
{
"name": "task-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"slot.name": "task_engine_saga",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "tasks",
"schema.whitelist": "public",
"table.whitelist": "public.task",
"tombstones.on.delete": "false",
"transforms": "router",
"transforms.router.type": "com.task.connect.TaskEventRouter"
}
}
然后,响应中说找不到这个配置。
接着,你将带有事件转换逻辑的 Java 类的 jar 文件复制到容器的 /connect 目录中,但也没有帮助。请问有人可以帮助我,告诉我应该如何使这个自定义的 transforms.router.type 在我的 Debezium 设置中工作吗?
你的容器 docker-compose 设置如下:
version: '3'
services:
pgadmin:
container_name: pgadmin_container
image: dpage/pgadmin4
environment:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin}
volumes:
- pgadmin:/root/.pgadmin
ports:
- "${PGADMIN_PORT:-5050}:80"
restart: unless-stopped
zookeeper:
image: debezium/zookeeper:1.3
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:1.3
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: debezium/example-postgres:1.3
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- PGDATA=/data/postgres
- POSTGRES_DB=${POSTGRES_DB:-task_engine}
connect:
image: debezium/connect:1.3
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
volumes:
postgres:
pgadmin:
请提供更多关于问题的详细信息,以便更好地帮助你解决问题。
英文:
I'm new to Kafka and I can't figure out how to use "transforms.router.type" to make it work with my Debezium setup.
So I made special event transformation java class and prepeared configuration for it to deploy to container which looks like this:
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d
{
"name": "task-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"slot.name" : "task_engine_saga",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "tasks",
"schema.whitelist": "public",
"table.whitelist" : "public.task",
"tombstones.on.delete" : "false",
"transforms" : "router",
"transforms.router.type" : "com.task.connect.TaskEventRouter"
}
}
And the response says that this config can't be found.
CREATE kafka connector task-connector....
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1091 100 516 100 575 8322 9274 --:--:-- --:--:-- --:--:-- 17596{"error_code":400,"message":"Connector configuration is invalid and contains the following 3 error(s):\nInvalid value com.task.connect.TaskEventRouter for configuration transforms.router.type: Class com.task.connect.TaskEventRouter could not be found.\nInvalid value null for configuration transforms.router.type: Not a Transformation\nA value is required\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
Then I copied to container /connect directory my host folder with jar file in which I have my java class with event transformation logic, but it didn't help too. Could someone please help me and tell what should I do to make this custom transforms.router.type work with my Debezium set up?
My containers docker-compose set up:
version: '3'
services:
pgadmin:
container_name: pgadmin_container
image: dpage/pgadmin4
environment:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin}
volumes:
- pgadmin:/root/.pgadmin
ports:
- "${PGADMIN_PORT:-5050}:80"
restart: unless-stopped
zookeeper:
image: debezium/zookeeper:1.3
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:1.3
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: debezium/example-postgres:1.3
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- PGDATA=/data/postgres
- POSTGRES_DB=${POSTGRES_DB:-task_engine}
connect:
image: debezium/connect:1.3
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
volumes:
postgres:
pgadmin:
答案1
得分: 0
我创建了特殊事件转换的Java类
根据我所了解,您似乎没有将这个已编译的类装载到您的Debezium镜像中,或者没有编辑plugin.path
文件以包含该JAR文件。
如果您已经创建了自己的Docker镜像包含了这些数据,那么您应该修改image: debezium/connect:1.3
。
这可能解释了为什么转换不可用。
英文:
> I made special event transformation java class
From what I can tell, you have not mounted this compiled class into your debezium image or edited the plugin.path
file to include that JAR
If you have created your own Docker image with that data, then you should change image: debezium/connect:1.3
This would explain why the transform is not available
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论