AWS MSK Connect使用MSSQL Debezium连接器失败并断开连接。

huangapple go评论94阅读模式
英文:

AWS MSK Connect w/ MSSQL Debezium connector fails with disconnect

问题

我试图使用AWS MSK Connect设置一个mssql debezium连接器,但一直收到以下错误消息:

连接器错误日志:

[Worker-0a949760f6b805d4f] [2023-02-15 19:57:56,122] 警告 [src-connector-014|task-0] [Consumer clientId=dlp.compcare.ccdemo-schemahistory,groupId=dlp.compcare.ccdemo-schemahistory] 引导代理 b-3.stuff.morestuff.c7.kafka.us-east-1.amazonaws.com:9098(id:-2,机架:null)断开连接(org.apache.kafka.clients.NetworkClient:1079)

此错误连续发生一段时间,然后我看到以下错误:

org.apache.kafka.common.errors.TimeoutException: 在获取主题元数据时超时

在集群日志中,当我收到断开连接错误时,我看到相应的错误:

[2023-02-15 20:08:21,627] 信息 [SocketServer listenerType=ZK_BROKER,nodeId=3] 与 /172.32.34.126 (SSL 握手失败) 的身份验证失败(org.apache.kafka.common.network.Selector)

我有一个EC2客户端,我已经设置好以连接到我的集群,并能够使用IAM身份验证连接并运行命令。我已经设置了一个主题,并使用控制台的生产者/消费者从主题中产生和消费。我还验证了当连接器启动时,它会创建 __amazon_msk_connect_status_*__amazon_msk_connect_offsets_* 主题。

我验证了日志中的IP是我连接器分配的IP,通过检查它连接到的弹性网络接口来进行验证。

另外,为了测试目的,我已经从0.0.0.0/0打开了所有流量,以便在运行的SG中运行,并确保IAM角色具有msk*、msk-connect*、kafka和s3

我还验证了RDS上启用了CDC,并且它正常工作。我看到变化被捕获并添加到CDC表中。

我相信问题与IAM身份验证仍然有关,但我不能确定。

集群配置:

  1. auto.create.topics.enable=true
  2. delete.topic.enable=true

工作器配置:

  1. key.converter=org.apache.kafka.connect.storage.StringConverter
  2. value.converter=org.apache.kafka.connect.storage.StringConverter
  3. config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
  4. config.providers=secretManager
  5. config.providers.secretManager.param.aws.region=us-east-1
  6. request.timeout.ms=90000
  7. errors.log.enable=true
  8. errors.log.include.messages=true

连接器配置:

  1. connector.class=io.debezium.connector.sqlserver.SqlServerConnector
  2. tasks.max=1
  3. database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  4. schema.include.list=dbo
  5. database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  6. database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  7. database.history.consumer.security.protocol=SASL_SSL
  8. database.instance=MSSQLSERVER
  9. topic.prefix=dlp.compcare.ccdemo
  10. schema.history.internal.kafka.topic=dlp.compcare.ccdemo.history
  11. value.converter=org.apache.kafka.connect.json.JsonConverter
  12. key.converter=org.apache.kafka.connect.storage.StringConverter
  13. database.history.sasl.mechanism=AWS_MSK_IAM
  14. database.encrypt=false
  15. database.history.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  16. database.history.producer.sasl.mechanism=AWS_MSK_IAM
  17. database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  18. database.user=debezium
  19. database.names=Intermodal_CCDEMO
  20. database.history.producer.security.protocol=SASL_SSL
  21. database.server.name=ccdemo_1
  22. schema.history.internal.kafka.bootstrap.servers=b-1:9098
  23. database.port=1433
  24. database.hostname=my-mssql-rds.rds.amazonaws.com
  25. database.history.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  26. database.password=${secretManager:dlp-compcare:dbpassword}
  27. table.include.list=dbo.EquipmentSetup
  28. database.history.security.protocol=SASL_SSL
  29. database.history.consumer.sasl.mechanism=AWS_MSK_IAM

我能够使用Postgres RDS执行相同的过程而没有任何问题。

我已尝试了我能想到的一切,所以任何帮助都将不胜感激!

在设置集群/连接器时,我还参考了以下内容:

英文:

I am trying to setup a mssql debezium connector with AWS MSK Connect but keep getting the following error messages:

Connector error log:

> [Worker-0a949760f6b805d4f] [2023-02-15 19:57:56,122] WARN [src-connector-014|task-0] [Consumer clientId=dlp.compcare.ccdemo-schemahistory, groupId=dlp.compcare.ccdemo-schemahistory] Bootstrap broker b-3.stuff.morestuff.c7.kafka.us-east-1.amazonaws.com:9098 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1079)

This error happens continuously for a bit then I see this error:

> org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

In the cluster logs I see a corresponding error when I get the disconnect error:

> [2023-02-15 20:08:21,627] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Failed authentication with /172.32.34.126 (SSL handshake failed) (org.apache.kafka.common.network.Selector)

I have an ec2 client that i've setup to connect to my cluster and am able to connect and run commands against the cluster using IAM auth. I have setup a topic and produced and consumed from the topic using the console producer/consumers. I've also verified that when the connector start up it is creating the __amazon_msk_connect_status_* and __amazon_msk_connect_offsets_* topics.

I've verified that ip in the logs is the ip assigned to my connector by checking the Elastic Network Interface it was attached to.

Also for testing purposes I've opened up all traffic from 0.0.0.0/0 for the SG they are running in and also made sure the IAM role has msk*, msk-connect*, kafka*, and s3*.

I've also verified CDC is enabled on the RDS and that it is working properly. I see changes being picked and added to the CDC tables.

I believe the issue is related to IAM auth still but am not certain.
Cluster Config:

  1. auto.create.topics.enable=true
  2. delete.topic.enable=true

worker config:

  1. key.converter=org.apache.kafka.connect.storage.StringConverter
  2. value.converter=org.apache.kafka.connect.storage.StringConverter
  3. config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
  4. config.providers=secretManager
  5. config.providers.secretManager.param.aws.region=us-east-1
  6. request.timeout.ms=90000
  7. errors.log.enable=true
  8. errors.log.include.messages=true

Connector Config:

  1. connector.class=io.debezium.connector.sqlserver.SqlServerConnector
  2. tasks.max=1
  3. database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  4. schema.include.list=dbo
  5. database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  6. database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  7. database.history.consumer.security.protocol=SASL_SSL
  8. database.instance=MSSQLSERVER
  9. topic.prefix=dlp.compcare.ccdemo
  10. schema.history.internal.kafka.topic=dlp.compcare.ccdemo.history
  11. value.converter=org.apache.kafka.connect.json.JsonConverter
  12. key.converter=org.apache.kafka.connect.storage.StringConverter
  13. database.history.sasl.mechanism=AWS_MSK_IAM
  14. database.encrypt=false
  15. database.history.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  16. database.history.producer.sasl.mechanism=AWS_MSK_IAM
  17. database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  18. database.user=debezium
  19. database.names=Intermodal_CCDEMO
  20. database.history.producer.security.protocol=SASL_SSL
  21. database.server.name=ccdemo_1
  22. schema.history.internal.kafka.bootstrap.servers=b-1:9098
  23. database.port=1433
  24. database.hostname=my-mssql-rds.rds.amazonaws.com
  25. database.history.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  26. database.password=${secretManager:dlp-compcare:dbpassword}
  27. table.include.list=dbo.EquipmentSetup
  28. database.history.security.protocol=SASL_SSL
  29. database.history.consumer.sasl.mechanism=AWS_MSK_IAM

I was able to do this same process but with a postgres rds with no issues.

I've tried everything I can think of so any an all help would be greatly appreciated!

I also referenced the following when setting up the cluster/connector:

答案1

得分: 2

经过大量的研究以及AWS支持代表的帮助,我最终解决了我的问题。这与IAM身份验证有关。这个属性

schema.history.internal.kafka.topic

需要您指定如何进行身份验证以创建模式历史等主题。

为了做到这一点,您需要将以下属性添加到您的连接器

  1. schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  2. schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  3. schema.history.internal.consumer.security.protocol=SASL_SSL
  4. schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
  5. schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  6. schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  7. schema.history.internal.producer.security.protocol=SASL_SSL
  8. schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM

我已经针对SQL Server和MySQL进行了测试,对我有效。

英文:

After a ton of digging and some help from an AWS support rep I finally figure out my issue. It was related to IAM authentication. This property

schema.history.internal.kafka.topic

Requires you to specify how to authenticate to create the topics for schema history etc.

In order to do that you need to add the following properties to your connector

  1. schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  2. schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  3. schema.history.internal.consumer.security.protocol=SASL_SSL
  4. schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
  5. schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
  6. schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
  7. schema.history.internal.producer.security.protocol=SASL_SSL
  8. schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM

I tested this for SQL Server and MySql and it worked for me.

答案2

得分: 1

我在连接到Aurora MYSQL数据库时遇到了相同的问题。

我正在使用最新版本的Debezium v2.1.2

然而,使用相同的设置,当我使用以前的版本v1.7.0时,一切都正常工作。

我注意到版本v1.7.0接受以下参数:

  1. database.history.kafka.topic=<masked>```
  2. 可以在v1.7的Mysql示例中看到[这里](https://debezium.io/documentation/reference/1.7/connectors/mysql.html#mysql-example-configuration)。
  3. 但最新版本对上述两个参数提出了投诉,并请求一个名为`topic.prefix`的新参数:
  4. ```schema.history.internal.kafka.topic=<masked>
  5. schema.history.internal.kafka.bootstrap.servers=<masked>
  6. topic.prefix=<masked>```
  7. 可以在v2.1的Mysql示例中看到[这里](https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-example-configuration)。
  8. 总之,即使在修复配置更改后,它在v1.7上工作,但在v2.1.2上不工作。这意味着要么是一个错误,要么我们遗漏了一些附加的配置。
  9. 如果我能解决它,我会通知你。
  10. 与此同时,您是否可以尝试使用以前的版本?
  11. <details>
  12. <summary>英文:</summary>
  13. I have been facing the same issue when connecting to the Aurora MYSQL DB.
  14. I was using the latest version of Debezium [v2.1.2](https://debezium.io/releases/2.1/#installation)
  15. However, with the same setup when I used the previous version [v1.7.0](https://debezium.io/releases/1.7/#installation), it worked just fine.
  16. I have noticed version [v1.7.0](https://debezium.io/releases/1.7/#installation) accepts the following parameters:

database.history.kafka.bootstrap.servers=<masked>
database.history.kafka.topic=<masked>

  1. Can be seen in the Mysql example for v1.7 [here](https://debezium.io/documentation/reference/1.7/connectors/mysql.html#mysql-example-configuration).
  2. But the latest version complains about the above two and request a new parameter called `topic.prefix`:

schema.history.internal.kafka.topic=<masked>
schema.history.internal.kafka.bootstrap.servers=<masked>
topic.prefix=<masked>

  1. Which can be seen in the Mysql example for v2.1 [here](https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-example-configuration).
  2. To summarize, even after fixing the configurations changes, it works on v1.7, but doesn&#39;t on v2.1.2. This would mean, either it&#39;s a bug or we are missing some additional configurations.
  3. I will keep you posted if I am able to figure it out.
  4. In the meantime, can you try with a previous version too?
  5. </details>

huangapple
  • 本文由 发表于 2023年2月16日 05:26:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/75465586.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定