如何在Java中从由Kafka创建的_confluent-metrics主题中读取数据。

huangapple go评论181阅读模式
英文:

How to read from the _confluent-metrics topic in Java which is created by kafka

问题

我想获取其中一个我的接收器连接器(sink connector)的 sink-record-active-count 指标值,该连接器的信息位于此处:https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics

我已经在使用 Docker Compose 文件在 Docker Desktop 中运行了所有容器。
docker ps
docker Ps

我已经为任务使用了 confluent-metrics 报告器。

此外,我参考了 https://docs.confluent.io/5.4.0/kafka/metrics-reporter.htmlhttps://neo4j.com/docs/labs/neo4j-streams/current/examples/#_confluent_with_docker,我已经向 kafka-container 添加了环境变量,就像这样:

  1. kafka-service:
  2. image: confluentinc/cp-enterprise-kafka:5.4.0
  3. container_name: kafka
  4. depends_on:
  5. - zookeeper
  6. links:
  7. - zookeeper
  8. ports:
  9. - 9092:9092
  10. expose:
  11. - "29092"
  12. environment:
  13. METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
  14. CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
  15. CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
  16. CONFLUENT_METRICS_REPORTER_TOPIC_CREATE: 'true'
  17. CONFLUENT_METRICS_ENABLE: 'true'
  18. CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: http://kafka-service:29092
  19. KAFKA_BROKER_ID: 1
  20. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  21. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  22. KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  23. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-service:29092,PLAINTEXT_HOST://localhost:9092
  24. KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
  25. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  26. KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  27. KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  28. KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
  29. command:
  30. - bash
  31. - -c
  32. - |
  33. echo '127.0.0.1 kafka-service' >> /etc/hosts
  34. /etc/confluent/docker/run
  35. sleep infinity

在 Kafka 日志中,我收到了以下消息:

  1. INFO Created metrics reporter topic _confluent-metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)

我不太确定如何在 Java 中从这个主题中读取。这个主题是否包含了关于接收器连接器的所需指标?

第三,在此页面 https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics 中提供了 Mbeans 的信息。我也不确定如何使用它们。如果需要使用 JMX,我曾尝试将 KAFKA_JMX_HOSTNAME 设置为 localhost,并在遵循了 https://rmoff.net/2018/09/17/accessing-kafka-docker-containers-jmx-from-host/ 后将 KAFKA_JMX_PORT 设置为 9010,但我不确定接下来该怎么做。

英文:

I want to get the value for sink-record-active-count for one of my sink connectors given here <https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics>

I have all containers Running in docker desktop using docker compose file
docker ps
docker Ps

I have used confluent-metrics reporter for the task.

Also referring to <https://docs.confluent.io/5.4.0/kafka/metrics-reporter.html> and <https://neo4j.com/docs/labs/neo4j-streams/current/examples/#_confluent_with_docker> I have
added the env variables to the kafka-container like this

  1. kafka-service:
  2. image: confluentinc/cp-enterprise-kafka:5.4.0
  3. container_name: kafka
  4. depends_on:
  5. - zookeeper
  6. links:
  7. - zookeeper
  8. ports:
  9. - 9092:9092
  10. expose:
  11. - &quot;29092&quot;
  12. environment:
  13. METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
  14. CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
  15. CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
  16. CONFLUENT_METRICS_REPORTER_TOPIC_CREATE: &#39;true&#39;
  17. CONFLUENT_METRICS_ENABLE: &#39;true&#39;
  18. CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: http://kafka-service:29092
  19. KAFKA_BROKER_ID: 1
  20. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  21. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  22. KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  23. KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-service:29092,PLAINTEXT_HOST://localhost:9092
  24. KAFKA_AUTO_CREATE_TOPICS_ENABLE: &quot;true&quot;
  25. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  26. KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
  27. KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  28. KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
  29. command:
  30. - bash
  31. - -c
  32. - |
  33. echo &#39;127.0.0.1 kafka-service&#39; &gt;&gt; /etc/hosts
  34. /etc/confluent/docker/run
  35. sleep infinity

And in the kafka logs I am getting the message

INFO Created metrics reporter topic _confluent-metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)

I am not quiet sure how can I read from this topic in Java. Also will this topic have the required metrics? related to sink connector?

Thirdly on this page <https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics> there is Mbeans given .. Also i am not sure how to use that . If it needs JMX too I had tried putting KAFKA_JMX_HOSTNAME = localhost and KAFKA_JMX_PORT: 9010 after following this <https://rmoff.net/2018/09/17/accessing-kafka-docker-containers-jmx-from-host/> but I am not sure how to proceed ahead.

答案1

得分: 1

根据我所知,ConfluentMetricsReporter 是一种专有的二进制格式,如果没有 Control Center 中的反序列化库,将无法进行读取。

我建议使用 Prometheus JMX Exporter + Grafana,以便在 Kafka Consumer 应用程序或 Control Center 之外对这些数据进行可视化(Confluent Helm Charts 已经提供了这样的设置)。

另外:请不要编辑任何 /etc/hosts 文件。

英文:

To my knowledge, ConfluentMetricsReporter is proprietary binary format, and cannot be read without the deserialziation libraries that are part of Control Center.

I suggest using Prometheus JMX Exporter + Grafana to have this data be visualized outside of a Kafka Consumer application or Control Center (such a setup is already offered by the Confluent Helm Charts)


Sidenote: Please don't edit any /etc/hosts files

huangapple
  • 本文由 发表于 2020年7月24日 03:04:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/63061417.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定