记录更多来自Kafka源连接器的数据。

huangapple go评论93阅读模式
英文:

Log more data from Kafka Source Connector

问题

我在AWS中安装了Kinesis Connector插件,用于将我的Kinesis流与我的MSK集群连接。它正在工作,但是我在Cloudwatch中看到的日志并不是很有帮助:

[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)

我该如何增强所获得的日志记录?例如,每分钟写入集群的记录数量。

我看到了文档https://docs.confluent.io/platform/current/connect/logging.html,我认为我看到的是默认的写入stdout的日志记录。但是我不确定如何更改Kinesis连接器的日志级别以显示更多信息。

这是我的当前配置:

name=msk-connector-kinesis
connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
tasks.max=3

kafka.topic=my-topic
kinesis.region=eu-central-1
kinesis.stream=kinesis_stream_eu-central-1

confluent.topic.bootstrap.servers=:9098,:9098,:9098
confluent.topic.replication.factor=3

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.consumer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
confluent.topic.producer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all

英文:

I installed the Kinesis Connector plugin in AWS, to connect my Kinesis Stream with my MSK cluster. It is working, but the logging that I see in Cloudwatch is not very helpful:

[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)

How can I enhance the logging that I get? For example, to see the number of records that are being written into the cluster per minute.

I saw the documentation https://docs.confluent.io/platform/current/connect/logging.html and I think that I am seeing the default that are being written into stdout. But I am not sure how to change the loglevel of the Kinesis connector to display more information.

This is my current configuration:

name=msk-connector-kinesis
connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
tasks.max=3

kafka.topic=my-topic
kinesis.region=eu-central-1
kinesis.stream=kinesis_stream_eu-central-1

confluent.topic.bootstrap.servers=<server1>:9098,<server2>:9098,<server3>:9098
confluent.topic.replication.factor=3

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.consumer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
confluent.topic.producer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter


errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all

wefe

答案1

得分: 1

OP所需的度量标准“每分钟写入集群的记录数”可以在Cloudwatch中找到,作为MSK Connect发出的指标的一部分。这是因为您正在通过MSK Connect功能运行连接器(请参见问题中OP的评论)。

由于这是一个源连接器(从源推送数据到MSK),您要查找的记录是SourceRecordPollRateSourceRecordWriteRate

以下是查询这些指标的示例:

(摘自AWS Big Data Blog

我认为OP发布的原始信息可能无效(尝试增加日志记录以获取连接器的生产者速率等指标),但是值得一提的是,MSK Connect将推送连接器生成的日志记录的严重级别为INFO、WARN、ERROR和FATAL到Cloudwatch Logs、Amazon S3或Kinesis Firehose Stream。

根据OP提供的输入,这个工作正常运行:

[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)

英文:

The required metric by the OP

> "number of records that are being written into the cluster per minute"

can be found in Cloudwatch as part of the metrics emitted by MSK Connect. This is the case since you are running your Connector via the MSK Connect feature (see comments of OP in the question).

Since this is a Source Connector (pushes data to MSK from a source), the records you are looking for are SourceRecordPollRate and SourceRecordWriteRate.

An example of the query of these metrics:

记录更多来自Kafka源连接器的数据。

(taken from the AWS Big Data Blog)

I don't think the original lead the OP posted is valid (try to increase logging to get the metrics like the producer rates of the Connector), but nevertheless is worth mentioning that MSK Connect will push the Connector generated log records for severity levels INFO, WARN, ERROR and FATAL to either Cloudwatch Logs, Amazon S3 or a Kinesis Firehose Stream.

As per the provided inputs from the OP this is working as expected:

[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)

huangapple
  • 本文由 发表于 2023年8月9日 16:25:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76865883.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定