英文:
Log more data from Kafka Source Connector
问题
我在AWS中安装了Kinesis Connector插件,用于将我的Kinesis流连接到我的MSK集群。它正在运行,但我在Cloudwatch中看到的日志并不是很有帮助:
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} 成功在8毫秒内提交了offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} 正在提交offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} 正在刷新0条未处理的消息以进行offset提交 (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} 成功在4毫秒内提交了offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
我如何增强获取的日志?例如,查看每分钟写入集群的记录数量。
我看到了文档https://docs.confluent.io/platform/current/connect/logging.html,并且我认为我正在看到默认写入stdout的日志。但我不确定如何更改Kinesis连接器的日志级别以显示更多信息。
这是我的当前配置:
name=msk-connector-kinesis
connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
tasks.max=3
kafka.topic=my-topic
kinesis.region=eu-central-1
kinesis.stream=kinesis_stream_eu-central-1
confluent.topic.bootstrap.servers=
confluent.topic.replication.factor=3
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.consumer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
confluent.topic.producer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all
英文:
I installed the Kinesis Connector plugin in AWS, to connect my Kinesis Stream with my MSK cluster. It is working, but the logging that I see in Cloudwatch is not very helpful:
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
How can I enhance the logging that I get? For example, to see the number of records that are being written into the cluster per minute.
I saw the documentation https://docs.confluent.io/platform/current/connect/logging.html and I think that I am seeing the default that are being written into stdout. But I am not sure how to change the loglevel of the Kinesis connector to display more information.
This is my current configuration:
name=msk-connector-kinesis
connector.class=io.confluent.connect.kinesis.KinesisSourceConnector
tasks.max=3
kafka.topic=my-topic
kinesis.region=eu-central-1
kinesis.stream=kinesis_stream_eu-central-1
confluent.topic.bootstrap.servers=<server1>:9098,<server2>:9098,<server3>:9098
confluent.topic.replication.factor=3
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.consumer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.security.protocol=SASL_SSL
confluent.topic.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
confluent.topic.producer.sasl.mechanism=AWS_MSK_IAM
confluent.topic.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
confluent.topic.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all
wefe
答案1
得分: 1
需要的指标由 OP 指定
> "每分钟写入群集的记录数"
可以在 Cloudwatch 中找到,作为 由 MSK Connect 发出的指标的一部分。这是因为您通过 MSK Connect 功能运行 Connector(请参阅问题中的 OP 的评论)。
由于这是一个源连接器(从源推送数据到 MSK),您要查找的记录是 SourceRecordPollRate
和 SourceRecordWriteRate
。
这些指标的查询示例:
(摘自 AWS 大数据博客)
我认为 OP 发布的原始信息可能无效(尝试增加日志记录以获取 Connector 的生产率等指标),但无论如何,值得提到的是,MSK Connect 将把 Connector 生成的日志记录 推送到 Cloudwatch Logs、Amazon S3 或 Kinesis Firehose Stream 的 INFO、WARN、ERROR 和 FATAL 严重性级别。
根据 OP 提供的输入,这正如预期的那样运行:
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} 成功在 8 毫秒内提交偏移量 (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} 提交偏移量 (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} 提交 0 条未完成消息以进行偏移提交 (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} 成功在 4 毫秒内提交偏移量 (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
英文:
The required metric by the OP
> "number of records that are being written into the cluster per minute"
can be found in Cloudwatch as part of the metrics emitted by MSK Connect. This is the case since you are running your Connector via the MSK Connect feature (see comments of OP in the question).
Since this is a Source Connector (pushes data to MSK from a source), the records you are looking for are SourceRecordPollRate
and SourceRecordWriteRate
.
An example of the query of these metrics:
(taken from the AWS Big Data Blog)
I don't think the original lead the OP posted is valid (try to increase logging to get the metrics like the producer rates of the Connector), but nevertheless is worth mentioning that MSK Connect will push the Connector generated log records for severity levels INFO, WARN, ERROR and FATAL to either Cloudwatch Logs, Amazon S3 or a Kinesis Firehose Stream.
As per the provided inputs from the OP this is working as expected:
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-0|offsets] WorkerSourceTask{id=msk-connector-kinesis-0} Finished commitOffsets successfully in 8 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,145] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502)
[Worker-07578247b0d45ad42] [2023-08-09 07:51:20,149] INFO [msk-connector-kinesis|task-1|offsets] WorkerSourceTask{id=msk-connector-kinesis-1} Finished commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:583)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论