如何在Java中从由Kafka创建的_confluent-metrics主题中读取数据。

huangapple go评论138阅读模式
英文:

How to read from the _confluent-metrics topic in Java which is created by kafka

问题

我想获取其中一个我的接收器连接器(sink connector)的 sink-record-active-count 指标值,该连接器的信息位于此处:https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics

我已经在使用 Docker Compose 文件在 Docker Desktop 中运行了所有容器。
docker ps
docker Ps

我已经为任务使用了 confluent-metrics 报告器。

此外,我参考了 https://docs.confluent.io/5.4.0/kafka/metrics-reporter.htmlhttps://neo4j.com/docs/labs/neo4j-streams/current/examples/#_confluent_with_docker,我已经向 kafka-container 添加了环境变量,就像这样:

kafka-service:
    image: confluentinc/cp-enterprise-kafka:5.4.0
    container_name: kafka
    depends_on:
      - zookeeper
    links:
      - zookeeper
    ports:
      - 9092:9092
    expose:
      - "29092"
    environment:
      METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_REPORTER_TOPIC_CREATE: 'true'
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: http://kafka-service:29092
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-service:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
    command:
      - bash
      - -c 
      - |
        echo '127.0.0.1 kafka-service' >> /etc/hosts
        /etc/confluent/docker/run
        sleep infinity        

在 Kafka 日志中,我收到了以下消息:

INFO Created metrics reporter topic _confluent-metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)

我不太确定如何在 Java 中从这个主题中读取。这个主题是否包含了关于接收器连接器的所需指标?

第三,在此页面 https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics 中提供了 Mbeans 的信息。我也不确定如何使用它们。如果需要使用 JMX,我曾尝试将 KAFKA_JMX_HOSTNAME 设置为 localhost,并在遵循了 https://rmoff.net/2018/09/17/accessing-kafka-docker-containers-jmx-from-host/ 后将 KAFKA_JMX_PORT 设置为 9010,但我不确定接下来该怎么做。

英文:

I want to get the value for sink-record-active-count for one of my sink connectors given here <https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics>

I have all containers Running in docker desktop using docker compose file
docker ps
docker Ps

I have used confluent-metrics reporter for the task.

Also referring to <https://docs.confluent.io/5.4.0/kafka/metrics-reporter.html> and <https://neo4j.com/docs/labs/neo4j-streams/current/examples/#_confluent_with_docker> I have
added the env variables to the kafka-container like this

 kafka-service:
    image: confluentinc/cp-enterprise-kafka:5.4.0
    container_name: kafka
    depends_on:
      - zookeeper
    links:
      - zookeeper
    ports:
      - 9092:9092
    expose:
      - &quot;29092&quot;
    environment:
      METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_REPORTER_TOPIC_CREATE: &#39;true&#39;
      CONFLUENT_METRICS_ENABLE: &#39;true&#39;
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: http://kafka-service:29092
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-service:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: &quot;true&quot;
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
    command:
      - bash
      - -c 
      - |
        echo &#39;127.0.0.1 kafka-service&#39; &gt;&gt; /etc/hosts
        /etc/confluent/docker/run
        sleep infinity  

And in the kafka logs I am getting the message

INFO Created metrics reporter topic _confluent-metrics (io.confluent.metrics.reporter.ConfluentMetricsReporter)

I am not quiet sure how can I read from this topic in Java. Also will this topic have the required metrics? related to sink connector?

Thirdly on this page <https://docs.confluent.io/current/connect/managing/monitoring.html#sink-task-metrics> there is Mbeans given .. Also i am not sure how to use that . If it needs JMX too I had tried putting KAFKA_JMX_HOSTNAME = localhost and KAFKA_JMX_PORT: 9010 after following this <https://rmoff.net/2018/09/17/accessing-kafka-docker-containers-jmx-from-host/> but I am not sure how to proceed ahead.

答案1

得分: 1

根据我所知,ConfluentMetricsReporter 是一种专有的二进制格式,如果没有 Control Center 中的反序列化库,将无法进行读取。

我建议使用 Prometheus JMX Exporter + Grafana,以便在 Kafka Consumer 应用程序或 Control Center 之外对这些数据进行可视化(Confluent Helm Charts 已经提供了这样的设置)。

另外:请不要编辑任何 /etc/hosts 文件。

英文:

To my knowledge, ConfluentMetricsReporter is proprietary binary format, and cannot be read without the deserialziation libraries that are part of Control Center.

I suggest using Prometheus JMX Exporter + Grafana to have this data be visualized outside of a Kafka Consumer application or Control Center (such a setup is already offered by the Confluent Helm Charts)


Sidenote: Please don't edit any /etc/hosts files

huangapple
  • 本文由 发表于 2020年7月24日 03:04:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/63061417.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定