能在一个Kafka集群中运行两个Debezium连接器吗?

huangapple go评论59阅读模式
英文:

Can there be 2 Debezium connector running in one Kafka cluster?

问题

Debezium官方页面上可以看到,有一张图片显示多个Debezium连接器可以连接到同一个Kafka。

所以我有2个数据库,2个Debezium实例,1个运行在docker-compose中的Kafka,但似乎只有1个Debezium将更新发送到Kafka(从Kafdrop监视)。

这是我的docker-compose文件:

version: '3.6'
services:

  hero_db:
    image: postgres:14
    restart: always
    environment:
      POSTGRES_PASSWORD: postgrespassword
    ports:
      - '5432:5432'
    expose:
      - '5432'
    command: [ "postgres", "-c", "wal_level=logical" ]
    volumes:
      - hero_db_data:/var/lib/postgresql/data

  villian_db:
    image: postgres:14
    restart: always
    environment:
      POSTGRES_PASSWORD: postgrespassword
    ports:
      - '2345:2345'
    expose:
      - '2345'
    command: [ "postgres", "-c", "wal_level=logical" ]
    volumes:
      - villian_db_data:/var/lib/postgresql/data

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:5.3.1
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  kafdrop:
    image: obsidiandynamics/kafdrop
    container_name: kafdrop
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka:9092"
      JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
    depends_on:
      - kafka

  hero_debezium:
    image: debezium/connect:1.9
    ports:
      - 8083:8083
    expose:
      - '8083'
    environment:
      CONFIG_STORAGE_TOPIC: hero_configs
      OFFSET_STORAGE_TOPIC: hero_offsets
      STATUS_STORAGE_TOPIC: hero_statuses
      BOOTSTRAP_SERVERS: kafka:9092
    depends_on: [ zookeeper, kafka, hero_db ]

  villian_debezium:
    image: debezium/connect:1.9
    ports:
      - 8084:8083
    expose:
      - '8084'
    environment:
      CONFIG_STORAGE_TOPIC: villian_configs
      OFFSET_STORAGE_TOPIC: villian_offsets
      STATUS_STORAGE_TOPIC: villian_statuses
      BOOTSTRAP_SERVERS: kafka:9092
    depends_on: [ zookeeper, kafka, villian_db ]

volumes:
  hero_db_data:
  villian_db_data:

这里是hero_dbz和villian_dbz的JSON配置文件:
hero_dbz.json

{
    "name": "hero-postgresql-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "hero_db",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgrespassword",
        "database.dbname": "postgres",
        "database.server.name": "hero_server",
        "table.include.list": "public.heroes",
        "table.whitelist": "public.heroes",
        "topic.prefix": "topic_heroes"
    }
}

villian_dbz.json

{
    "name": "villian-postgresql-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "villian_db",
        "database.port": "2345",
        "database.user": "postgres",
        "database.password": "postgrespassword",
        "database.dbname": "postgres",
        "database.server.name": "villian_server",
        "table.include.list": "public.villians",
        "table.whitelist": "public.villians",
        "topic.prefix": "topic_villian"
    }
}

我使用以下命令配置了hero_dbz和villian_dbz:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ --data "@hero_dbz.json"
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8084/connectors/ --data "@villian_dbz.json"

这里是从Kafdrop显示的屏幕截图,只显示来自hero_db的数据(hero_server.public.heroes),但没有来自villian_db的数据。

英文:

能在一个Kafka集群中运行两个Debezium连接器吗?
From Debezium official page, there's this picture showing that multiple Debezium connector can connect to the same Kafka.

So I have 2 databases, 2 Debeziums, 1 Kafka running in docker-compose, but it seems like only 1 debezium sent update to kafka (watch from kafdrop).

Here's my docker-compose file:<br>

version: &#39;3.6&#39;
services:
hero_db:
image: postgres:14
restart: always
environment:
POSTGRES_PASSWORD: postgrespassword
ports:
- &#39;5432:5432&#39;
expose:
- &#39;5432&#39;
command: [ &quot;postgres&quot;, &quot;-c&quot;, &quot;wal_level=logical&quot; ]
volumes:
- hero_db_data:/var/lib/postgresql/data
villian_db:
image: postgres:14
restart: always
environment:
POSTGRES_PASSWORD: postgrespassword
ports:
- &#39;2345:2345&#39;
expose:
- &#39;2345&#39;
command: [ &quot;postgres&quot;, &quot;-c&quot;, &quot;wal_level=logical&quot; ]
volumes:
- villian_db_data:/var/lib/postgresql/data
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:5.3.1
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
kafdrop:
image: obsidiandynamics/kafdrop
container_name: kafdrop
ports:
- &quot;9000:9000&quot;
environment:
KAFKA_BROKERCONNECT: &quot;kafka:9092&quot;
JVM_OPTS: &quot;-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify&quot;
depends_on:
- kafka
hero_debezium:
image: debezium/connect:1.9
ports:
- 8083:8083
expose:
- &#39;8083&#39;
environment:
CONFIG_STORAGE_TOPIC: hero_configs
OFFSET_STORAGE_TOPIC: hero_offsets
STATUS_STORAGE_TOPIC: hero_statuses
BOOTSTRAP_SERVERS: kafka:9092
depends_on: [ zookeeper, kafka, hero_db ]
villian_debezium:
image: debezium/connect:1.9
ports:
- 8084:8083
expose:
- &#39;8084&#39;
environment:
CONFIG_STORAGE_TOPIC: villian_configs
OFFSET_STORAGE_TOPIC: villian_offsets
STATUS_STORAGE_TOPIC: villian_statuses
BOOTSTRAP_SERVERS: kafka:9092
depends_on: [ zookeeper, kafka, villian_db ]
volumes:
hero_db_data:
villian_db_data:

Here's a debezium config file in json of hero_dbz and villian_dbz:<br>
hero_dbz.json<br>

{
&quot;name&quot;: &quot;hero-postgresql-connector&quot;,
&quot;config&quot;: {
&quot;connector.class&quot;: &quot;io.debezium.connector.postgresql.PostgresConnector&quot;,
&quot;plugin.name&quot;: &quot;pgoutput&quot;,
&quot;database.hostname&quot;: &quot;hero_db&quot;,
&quot;database.port&quot;: &quot;5432&quot;,
&quot;database.user&quot;: &quot;postgres&quot;,
&quot;database.password&quot;: &quot;postgrespassword&quot;,
&quot;database.dbname&quot;: &quot;postgres&quot;,
&quot;database.server.name&quot;: &quot;hero_server&quot;,
&quot;table.include.list&quot;: &quot;public.heroes&quot;,
&quot;table.whitelist&quot;: &quot;public.heroes&quot;,
&quot;topic.prefix&quot;: &quot;topic_heroes&quot;
}
}

villian_dbz.json<br>

{
&quot;name&quot;: &quot;villian-postgresql-connector&quot;,
&quot;config&quot;: {
&quot;connector.class&quot;: &quot;io.debezium.connector.postgresql.PostgresConnector&quot;,
&quot;plugin.name&quot;: &quot;pgoutput&quot;,
&quot;database.hostname&quot;: &quot;villian_db&quot;,
&quot;database.port&quot;: &quot;2345&quot;,
&quot;database.user&quot;: &quot;postgres&quot;,
&quot;database.password&quot;: &quot;postgrespassword&quot;,
&quot;database.dbname&quot;: &quot;postgres&quot;,
&quot;database.server.name&quot;: &quot;villian_server&quot;,
&quot;table.include.list&quot;: &quot;public.villians&quot;,
&quot;table.whitelist&quot;: &quot;public.villians&quot;,
&quot;topic.prefix&quot;: &quot;topic_villian&quot;
}
}

I config both hero_dbz & villian_dbz with these command:<br>
curl -i -X POST -H &quot;Accept:application/json&quot; -H &quot;Content-Type:application/json&quot; 127.0.0.1:8083/connectors/ --data &quot;@hero_dbz.json&quot;<br>
curl -i -X POST -H &quot;Accept:application/json&quot; -H &quot;Content-Type:application/json&quot; 127.0.0.1:8084/connectors/ --data &quot;@villian_dbz.json&quot;

Here's a screen from Kafdrop showing only data from hero_db (hero_server.public.heroes) but nothing from villian_db.

能在一个Kafka集群中运行两个Debezium连接器吗?

答案1

得分: 1

不应该在运行多个Kafka Connect服务器时出现问题。可能存在配置和设置方面的问题。也许需要查看一下恶棍连接器的日志。

我认为你的恶棍数据库的内部端口设置是错误的:

villian_db:
    image: postgres:14
    restart: always
    environment:
      POSTGRES_PASSWORD: postgrespassword
    ports:
      - '2345:5432'
    expose:
      - '2345'
    command: [ "postgres", "-c", "wal_level=logical" ]
    volumes:
      - villian_db_data:/var/lib/postgresql/data

更新:

请向连接集群添加不同的GROUP_ID环境变量。背景信息如下:

运行Kafka Connect服务时需要此环境变量。将其设置为一个唯一标识Kafka Connect集群、服务及其工作进程所属的ID。

类似于GROUP_ID: 3GROUP_ID: 2等。你的JSON文件应该指向5432,即PostgresDB的内部端口,因此.json文件应该指向5432(两者都是)。

另外,你的Zookeeper配置错误,请像这样修复它:

....
ports:
      - 2181:2181
英文:

Shouldn't be a problem running multiple kafka-connect servers. There might an issue in your configuration and setup. Perhaps look into to logs of the villian connector.

your internal port of villan db is wrong IMHO:

villian_db:
image: postgres:14
restart: always
environment:
POSTGRES_PASSWORD: postgrespassword
ports:
- &#39;2345:5432&#39;
expose:
- &#39;2345&#39;
command: [ &quot;postgres&quot;, &quot;-c&quot;, &quot;wal_level=logical&quot; ]
volumes:
- villian_db_data:/var/lib/postgresql/data

updates:

please add different GROUP_IDs env to the connect clusters. background:
> This environment variable is required when running the Kafka Connect service. Set this to an ID that uniquely identifies the Kafka Connect cluster the service and its workers belong to.

Something like GROUP_ID: 3 and GROUP_ID: 2 for the other. Your json should point to 5432, internal port of PostgresDB, so the .json files should point to 5432 (both)

And Your Zookeeper is wrongly configures, please fix it like that:

....
ports:
- 2181:2181

答案2

得分: 0

在我修复了我的villians_db,使其在端口2345上运行后,连接了英雄(hero)和恶棍(villain)的Debezium到Kafka,通过它们的JSON文件。恶棍数据库(villain_dbz)一直显示以下消息:

2023-02-25 20:43:15 2023-02-25 13:43:15,210 INFO   ||  [Worker clientId=connect-1, groupId=1] Rebalance started   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2023-02-25 20:43:15 2023-02-25 13:43:15,210 INFO   ||  [Worker clientId=connect-1, groupId=1] (Re-)joining group   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=6, memberId='connect-1-a2647c32-4c10-48be-b832-a38a37a11cd0', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=6, memberId='connect-1-a2647c32-4c10-48be-b832-a38a37a11cd0', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Joined group at generation 6 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-07842920-d640-4c15-ae17-ad4900077143', leaderUrl='http://172.19.0.8:8083/', offset=5, connectorIds=[hero-postgresql-connector], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 WARN   ||  [Worker clientId=connect-1, groupId=1] Catching up to assignment's config offset.   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId 1] Current config state offset -1 is behind group assignment 5, reading to end of config log   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2023-02-25 20:43:15 2023-02-25 13:43:15,212 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished reading to end of log and updated config snapshot, new config log offset: -1   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2023-02-25 20:43:15 2023-02-25 13:43:15,212 INFO   ||  [Worker clientId=connect-1, groupId=1] Current config state offset -1 does not match group assignment 5. Forcing rebalance.   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]

如果只有一个(英雄或恶棍)与Kafka连接,将不会出现此错误。

英文:

After I fixed my villians_db to run on port 2345. Then connect both of hero and villian debezium to Kafka via their JSON file.<br>
Inside the villian_dbz it keep showing this message :

2023-02-25 20:43:15 2023-02-25 13:43:15,210 INFO   ||  [Worker clientId=connect-1, groupId=1] Rebalance started   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2023-02-25 20:43:15 2023-02-25 13:43:15,210 INFO   ||  [Worker clientId=connect-1, groupId=1] (Re-)joining group   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=6, memberId=&#39;connect-1-a2647c32-4c10-48be-b832-a38a37a11cd0&#39;, protocol=&#39;sessioned&#39;}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=6, memberId=&#39;connect-1-a2647c32-4c10-48be-b832-a38a37a11cd0&#39;, protocol=&#39;sessioned&#39;}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Joined group at generation 6 with protocol version 2 and got assignment: Assignment{error=0, leader=&#39;connect-1-07842920-d640-4c15-ae17-ad4900077143&#39;, leaderUrl=&#39;http://172.19.0.8:8083/&#39;, offset=5, connectorIds=[hero-postgresql-connector], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 WARN   ||  [Worker clientId=connect-1, groupId=1] Catching up to assignment&#39;s config offset.   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2023-02-25 20:43:15 2023-02-25 13:43:15,211 INFO   ||  [Worker clientId=connect-1, groupId=1] Current config state offset -1 is behind group assignment 5, reading to end of config log   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2023-02-25 20:43:15 2023-02-25 13:43:15,212 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished reading to end of log and updated config snapshot, new config log offset: -1   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2023-02-25 20:43:15 2023-02-25 13:43:15,212 INFO   ||  [Worker clientId=connect-1, groupId=1] Current config state offset -1 does not match group assignment 5. Forcing rebalance.   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]

This error does not appear if only one (either hero or villian) dbz connect to Kafka.

huangapple
  • 本文由 发表于 2023年2月24日 17:47:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/75554982.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定