如何设置Kafka连接器以在Debezium中使用自定义转换器?

huangapple go评论70阅读模式
英文:

How to set up Kafka-connector to use custom transform in Debezium?

问题

我是你的中文翻译,以下是你提供的翻译好的部分:

"transforms.router.type" 是 Kafka Connect 中用于进行事件转换的配置项,你想要使用它来与你的 Debezium 设置一起使用,但是遇到了问题。你创建了一个特殊的事件转换 Java 类,并为它准备了配置,以便部署到容器中,配置如下:

{
  "name": "task-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "slot.name": "task_engine_saga",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "tasks",
    "schema.whitelist": "public",
    "table.whitelist": "public.task",
    "tombstones.on.delete": "false",
    "transforms": "router",
    "transforms.router.type": "com.task.connect.TaskEventRouter"
  }
}

然后,响应中说找不到这个配置。

接着,你将带有事件转换逻辑的 Java 类的 jar 文件复制到容器的 /connect 目录中,但也没有帮助。请问有人可以帮助我,告诉我应该如何使这个自定义的 transforms.router.type 在我的 Debezium 设置中工作吗?

你的容器 docker-compose 设置如下:

version: '3'
services:
    pgadmin:
        container_name: pgadmin_container
        image: dpage/pgadmin4
        environment:
            PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org}
            PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin}
        volumes:
            - pgadmin:/root/.pgadmin
        ports:
            - "${PGADMIN_PORT:-5050}:80"
        restart: unless-stopped

    zookeeper:
        image: debezium/zookeeper:1.3
        ports:
            - 2181:2181
            - 2888:2888
            - 3888:3888
    kafka:
        image: debezium/kafka:1.3
        ports:
            - 9092:9092
        links:
            - zookeeper
        environment:
            - ZOOKEEPER_CONNECT=zookeeper:2181
    postgres:
        image: debezium/example-postgres:1.3
        ports:
            - 5432:5432
        environment:
            - POSTGRES_USER=postgres
            - POSTGRES_PASSWORD=postgres
            - PGDATA=/data/postgres
            - POSTGRES_DB=${POSTGRES_DB:-task_engine}
    connect:
        image: debezium/connect:1.3
        ports:
            - 8083:8083
        links:
            - kafka
            - postgres
        environment:
            - BOOTSTRAP_SERVERS=kafka:9092
            - GROUP_ID=1
            - CONFIG_STORAGE_TOPIC=my_connect_configs
            - OFFSET_STORAGE_TOPIC=my_connect_offsets
            - STATUS_STORAGE_TOPIC=my_connect_statuses

volumes:
    postgres:
    pgadmin:

请提供更多关于问题的详细信息,以便更好地帮助你解决问题。

英文:

I'm new to Kafka and I can't figure out how to use "transforms.router.type" to make it work with my Debezium setup.
So I made special event transformation java class and prepeared configuration for it to deploy to container which looks like this:

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d

{
  "name": "task-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "slot.name" : "task_engine_saga",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "tasks",
    "schema.whitelist": "public",
    "table.whitelist" : "public.task",
    "tombstones.on.delete" : "false",
    "transforms" : "router",
    "transforms.router.type" : "com.task.connect.TaskEventRouter"
  }
}

And the response says that this config can't be found.

CREATE kafka connector task-connector....
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1091  100   516  100   575   8322   9274 --:--:-- --:--:-- --:--:-- 17596{"error_code":400,"message":"Connector configuration is invalid and contains the following 3 error(s):\nInvalid value com.task.connect.TaskEventRouter for configuration transforms.router.type: Class com.task.connect.TaskEventRouter could not be found.\nInvalid value null for configuration transforms.router.type: Not a Transformation\nA value is required\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

Then I copied to container /connect directory my host folder with jar file in which I have my java class with event transformation logic, but it didn't help too. Could someone please help me and tell what should I do to make this custom transforms.router.type work with my Debezium set up?

My containers docker-compose set up:

version: '3'
services:
    pgadmin:
        container_name: pgadmin_container
        image: dpage/pgadmin4
        environment:
            PGADMIN_DEFAULT_EMAIL: ${PGADMIN_DEFAULT_EMAIL:-pgadmin4@pgadmin.org}
            PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_DEFAULT_PASSWORD:-admin}
        volumes:
            - pgadmin:/root/.pgadmin
        ports:
            - "${PGADMIN_PORT:-5050}:80"
        restart: unless-stopped

    zookeeper:
        image: debezium/zookeeper:1.3
        ports:
            - 2181:2181
            - 2888:2888
            - 3888:3888
    kafka:
        image: debezium/kafka:1.3
        ports:
            - 9092:9092
        links:
            - zookeeper
        environment:
            - ZOOKEEPER_CONNECT=zookeeper:2181
    postgres:
        image: debezium/example-postgres:1.3
        ports:
            - 5432:5432
        environment:
            - POSTGRES_USER=postgres
            - POSTGRES_PASSWORD=postgres
            - PGDATA=/data/postgres
            - POSTGRES_DB=${POSTGRES_DB:-task_engine}
    connect:
        image: debezium/connect:1.3
        ports:
            - 8083:8083
        links:
            - kafka
            - postgres
        environment:
            - BOOTSTRAP_SERVERS=kafka:9092
            - GROUP_ID=1
            - CONFIG_STORAGE_TOPIC=my_connect_configs
            - OFFSET_STORAGE_TOPIC=my_connect_offsets
            - STATUS_STORAGE_TOPIC=my_connect_statuses

volumes:
    postgres:
    pgadmin:

答案1

得分: 0

我创建了特殊事件转换的Java类

根据我所了解,您似乎没有将这个已编译的类装载到您的Debezium镜像中,或者没有编辑plugin.path文件以包含该JAR文件。

如果您已经创建了自己的Docker镜像包含了这些数据,那么您应该修改image: debezium/connect:1.3

这可能解释了为什么转换不可用。

英文:

> I made special event transformation java class

From what I can tell, you have not mounted this compiled class into your debezium image or edited the plugin.path file to include that JAR

If you have created your own Docker image with that data, then you should change image: debezium/connect:1.3

This would explain why the transform is not available

huangapple
  • 本文由 发表于 2020年8月8日 00:47:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/63306072.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定