AWS MSK Connect使用MSSQL Debezium连接器失败并断开连接。

huangapple go评论55阅读模式
英文:

AWS MSK Connect w/ MSSQL Debezium connector fails with disconnect

问题

我试图使用AWS MSK Connect设置一个mssql debezium连接器,但一直收到以下错误消息:

连接器错误日志:

[Worker-0a949760f6b805d4f] [2023-02-15 19:57:56,122] 警告 [src-connector-014|task-0] [Consumer clientId=dlp.compcare.ccdemo-schemahistory,groupId=dlp.compcare.ccdemo-schemahistory] 引导代理 b-3.stuff.morestuff.c7.kafka.us-east-1.amazonaws.com:9098(id:-2,机架:null)断开连接(org.apache.kafka.clients.NetworkClient:1079)

此错误连续发生一段时间,然后我看到以下错误:

org.apache.kafka.common.errors.TimeoutException: 在获取主题元数据时超时

在集群日志中,当我收到断开连接错误时,我看到相应的错误:

[2023-02-15 20:08:21,627] 信息 [SocketServer listenerType=ZK_BROKER,nodeId=3] 与 /172.32.34.126 (SSL 握手失败) 的身份验证失败(org.apache.kafka.common.network.Selector)

我有一个EC2客户端,我已经设置好以连接到我的集群,并能够使用IAM身份验证连接并运行命令。我已经设置了一个主题,并使用控制台的生产者/消费者从主题中产生和消费。我还验证了当连接器启动时,它会创建 __amazon_msk_connect_status_*__amazon_msk_connect_offsets_* 主题。

我验证了日志中的IP是我连接器分配的IP,通过检查它连接到的弹性网络接口来进行验证。

另外,为了测试目的,我已经从0.0.0.0/0打开了所有流量,以便在运行的SG中运行,并确保IAM角色具有msk*、msk-connect*、kafka和s3

我还验证了RDS上启用了CDC,并且它正常工作。我看到变化被捕获并添加到CDC表中。

我相信问题与IAM身份验证仍然有关,但我不能确定。

集群配置:

auto.create.topics.enable=true
delete.topic.enable=true

工作器配置:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
config.providers=secretManager
config.providers.secretManager.param.aws.region=us-east-1
request.timeout.ms=90000
errors.log.enable=true
errors.log.include.messages=true

连接器配置:

connector.class=io.debezium.connector.sqlserver.SqlServerConnector
tasks.max=1
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.include.list=dbo
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.security.protocol=SASL_SSL
database.instance=MSSQLSERVER
topic.prefix=dlp.compcare.ccdemo
schema.history.internal.kafka.topic=dlp.compcare.ccdemo.history
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
database.history.sasl.mechanism=AWS_MSK_IAM
database.encrypt=false
database.history.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.user=debezium
database.names=Intermodal_CCDEMO
database.history.producer.security.protocol=SASL_SSL
database.server.name=ccdemo_1
schema.history.internal.kafka.bootstrap.servers=b-1:9098
database.port=1433
database.hostname=my-mssql-rds.rds.amazonaws.com
database.history.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.password=${secretManager:dlp-compcare:dbpassword}
table.include.list=dbo.EquipmentSetup
database.history.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM

我能够使用Postgres RDS执行相同的过程而没有任何问题。

我已尝试了我能想到的一切,所以任何帮助都将不胜感激!

在设置集群/连接器时,我还参考了以下内容:

英文:

I am trying to setup a mssql debezium connector with AWS MSK Connect but keep getting the following error messages:

Connector error log:

> [Worker-0a949760f6b805d4f] [2023-02-15 19:57:56,122] WARN [src-connector-014|task-0] [Consumer clientId=dlp.compcare.ccdemo-schemahistory, groupId=dlp.compcare.ccdemo-schemahistory] Bootstrap broker b-3.stuff.morestuff.c7.kafka.us-east-1.amazonaws.com:9098 (id: -2 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1079)

This error happens continuously for a bit then I see this error:

> org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

In the cluster logs I see a corresponding error when I get the disconnect error:

> [2023-02-15 20:08:21,627] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Failed authentication with /172.32.34.126 (SSL handshake failed) (org.apache.kafka.common.network.Selector)

I have an ec2 client that i've setup to connect to my cluster and am able to connect and run commands against the cluster using IAM auth. I have setup a topic and produced and consumed from the topic using the console producer/consumers. I've also verified that when the connector start up it is creating the __amazon_msk_connect_status_* and __amazon_msk_connect_offsets_* topics.

I've verified that ip in the logs is the ip assigned to my connector by checking the Elastic Network Interface it was attached to.

Also for testing purposes I've opened up all traffic from 0.0.0.0/0 for the SG they are running in and also made sure the IAM role has msk*, msk-connect*, kafka*, and s3*.

I've also verified CDC is enabled on the RDS and that it is working properly. I see changes being picked and added to the CDC tables.

I believe the issue is related to IAM auth still but am not certain.
Cluster Config:

auto.create.topics.enable=true
delete.topic.enable=true

worker config:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
config.providers=secretManager
config.providers.secretManager.param.aws.region=us-east-1
request.timeout.ms=90000
errors.log.enable=true
errors.log.include.messages=true

Connector Config:

connector.class=io.debezium.connector.sqlserver.SqlServerConnector
tasks.max=1
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.include.list=dbo
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.security.protocol=SASL_SSL
database.instance=MSSQLSERVER
topic.prefix=dlp.compcare.ccdemo
schema.history.internal.kafka.topic=dlp.compcare.ccdemo.history
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
database.history.sasl.mechanism=AWS_MSK_IAM
database.encrypt=false
database.history.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.user=debezium
database.names=Intermodal_CCDEMO
database.history.producer.security.protocol=SASL_SSL
database.server.name=ccdemo_1
schema.history.internal.kafka.bootstrap.servers=b-1:9098
database.port=1433
database.hostname=my-mssql-rds.rds.amazonaws.com
database.history.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.password=${secretManager:dlp-compcare:dbpassword}
table.include.list=dbo.EquipmentSetup
database.history.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM

I was able to do this same process but with a postgres rds with no issues.

I've tried everything I can think of so any an all help would be greatly appreciated!

I also referenced the following when setting up the cluster/connector:

答案1

得分: 2

经过大量的研究以及AWS支持代表的帮助,我最终解决了我的问题。这与IAM身份验证有关。这个属性

schema.history.internal.kafka.topic

需要您指定如何进行身份验证以创建模式历史等主题。

为了做到这一点,您需要将以下属性添加到您的连接器

schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.consumer.security.protocol=SASL_SSL
schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM

schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.producer.security.protocol=SASL_SSL
schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM

我已经针对SQL Server和MySQL进行了测试,对我有效。

英文:

After a ton of digging and some help from an AWS support rep I finally figure out my issue. It was related to IAM authentication. This property

schema.history.internal.kafka.topic

Requires you to specify how to authenticate to create the topics for schema history etc.

In order to do that you need to add the following properties to your connector

schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.consumer.security.protocol=SASL_SSL
schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM

schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.producer.security.protocol=SASL_SSL
schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM

I tested this for SQL Server and MySql and it worked for me.

答案2

得分: 1

我在连接到Aurora MYSQL数据库时遇到了相同的问题。

我正在使用最新版本的Debezium v2.1.2

然而,使用相同的设置,当我使用以前的版本v1.7.0时,一切都正常工作。

我注意到版本v1.7.0接受以下参数:

database.history.kafka.topic=<masked>```
可以在v1.7的Mysql示例中看到[这里](https://debezium.io/documentation/reference/1.7/connectors/mysql.html#mysql-example-configuration)。

但最新版本对上述两个参数提出了投诉,并请求一个名为`topic.prefix`的新参数:

```schema.history.internal.kafka.topic=<masked>
schema.history.internal.kafka.bootstrap.servers=<masked>
topic.prefix=<masked>```
可以在v2.1的Mysql示例中看到[这里](https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-example-configuration)。

总之,即使在修复配置更改后,它在v1.7上工作,但在v2.1.2上不工作。这意味着要么是一个错误,要么我们遗漏了一些附加的配置。

如果我能解决它,我会通知你。

与此同时,您是否可以尝试使用以前的版本?

<details>
<summary>英文:</summary>

I have been facing the same issue when connecting to the Aurora MYSQL DB.

I was using the latest version of Debezium [v2.1.2](https://debezium.io/releases/2.1/#installation)



However, with the same setup when I used the previous version [v1.7.0](https://debezium.io/releases/1.7/#installation), it worked just fine.

I have noticed version [v1.7.0](https://debezium.io/releases/1.7/#installation) accepts the following parameters:


database.history.kafka.bootstrap.servers=<masked>
database.history.kafka.topic=<masked>

Can be seen in the Mysql example for v1.7 [here](https://debezium.io/documentation/reference/1.7/connectors/mysql.html#mysql-example-configuration).

But the latest version complains about the above two and request a new parameter called `topic.prefix`:

schema.history.internal.kafka.topic=<masked>
schema.history.internal.kafka.bootstrap.servers=<masked>
topic.prefix=<masked>


Which can be seen in the Mysql example for v2.1 [here](https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-example-configuration).

To summarize, even after fixing the configurations changes, it works on v1.7, but doesn&#39;t on v2.1.2. This would mean, either it&#39;s a bug or we are missing some additional configurations.

I will keep you posted if I am able to figure it out. 

In the meantime, can you try with a previous version too?

</details>



huangapple
  • 本文由 发表于 2023年2月16日 05:26:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/75465586.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定