Docker Kafka Avro Console Consumer – 连接被拒绝

huangapple go评论72阅读模式
英文:

Docker Kafka Avro Console Consumer - Connection Refused

问题

以下是你提供的内容的翻译:

所以我正在学习有关Kafka的知识,并尝试在本地环境中使用Docker Compose文件进行设置。我正在按照以下示例进行操作:

https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html

按照这个示例,我已经取得了相当大的进展,直到我进入第8步的后半部分。

当我在Kafka Connect容器内尝试执行以下命令时:kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic quickstart-jdbc-test --from-beginning --max-messages 10,我在Kafka Connect容器内收到了以下消息,但我无法确定它试图连接到什么:

[2020-10-07 20:45:44,784] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[...]
[2020-10-07 20:45:46,032] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.net.ConnectException: Connection refused (Connection refused)
[...]

Docker-Compose 文件:

version: '3.8'
services:
  zookeeper:
    image: bitnami/zookeeper:latest
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    ports:
      - '2181'
  kafka:
    image: bitnami/kafka:latest
    environment:
      - KAFKA_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ZOOKEEPER_PROTOCOL=PLAINTEXT
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9091,CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9091,CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    volumes:
      - "./Components/scripts/files/:/tmp/"
    ports:
      - '9093:9093'
  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    ports:
      - '8081'
    depends_on:
      - kafka
      - zookeeper
  kafka-connect-avro:
    image: confluentinc/cp-kafka-connect:latest
    environment:
      - CONNECT_BOOTSTRAP_SERVERS=kafka:9092 
      - CONNECT_REST_PORT=8083 
      - CONNECT_GROUP_ID=quickstart-avro 
      - CONNECT_CONFIG_STORAGE_TOPIC=quickstart-avro-config 
      - CONNECT_OFFSET_STORAGE_TOPIC=quickstart-avro-offsets 
      - CONNECT_STATUS_STORAGE_TOPIC=quickstart-avro-status
      - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 
      - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 
      - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 
      - CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
      - CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter 
      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 
      - CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter 
      - CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect-avro
      - CONNECT_LOG4J_ROOT_LOGLEVEL=FATAL 
      - CONNECT_PLUGIN_PATH=/usr/share/java/, /etc/kafka-connect/jars
    volumes:
      - "./Components/Avro/jars:/etc/kafka-connect/jars"
      - "./Components/Avro/file:/tmp/quickstart"
      - "./Components/Avro/plugin/confluentinc-kafka-connect-jdbc-5.5.2:/usr/share/java/kafka-connect-jdbc"
    ports:
      - '8083:8083'
    depends_on:
      - kafka
  mysql:
    image: mysql
    environment:
      - MYSQL_ROOT_PASSWORD=confluent 
      - MYSQL_USER=confluent 
      - MYSQL_PASSWORD=confluent 
      - MYSQL_DATABASE=connect_test 
    ports:
      - "3306:3306" 
    volumes:
      - "./volumes/mysql:/var/lib/mysql"

注意:我只翻译了你提供的内容,没有添加任何额外的信息。

英文:

so I'm learning about Kafka and just trying to set it up in a local environment with the help
of a docker compose file. I'm following the example from:

https://docs.confluent.io/5.0.0/installation/docker/docs/installation/connect-avro-jdbc.html

Following this example I've made it pretty far until I get into the later half of step 8.

When trying to execute the following: kafka-avro-console-consumer --bootstrap-server kafka:9092 --topic quickstart-jdbc-test --from-beginning --max-messages 10 inside of the Kafka Connect container, I get the following message where I'm unable to determine what it is trying to connect to:

[2020-10-07 20:45:44,784] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2020-10-07 20:45:45,431] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-7022
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2020-10-07 20:45:45,547] INFO Kafka version : 2.0.0-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-07 20:45:45,547] INFO Kafka commitId : 4b1dd33f255ddd2f (org.apache.kafka.common.utils.AppInfoParser)
[2020-10-07 20:45:45,731] INFO Cluster ID: 1ia_Zkc1S1efVJ8JbxiqFA (org.apache.kafka.clients.Metadata)
[2020-10-07 20:45:45,733] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Discovered group coordinator kafka:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-07 20:45:45,736] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-10-07 20:45:45,737] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-07 20:45:45,815] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-07 20:45:45,816] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Setting newly assigned partitions [quickstart-jdbc-test-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2020-10-07 20:45:45,867] INFO [Consumer clientId=consumer-1, groupId=console-consumer-7022] Resetting offset for partition quickstart-jdbc-test-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher)
Processed a total of 1 messages
[2020-10-07 20:45:46,032] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at sun.net.NetworkClient.doConnect(NetworkClient.java:180)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
at sun.net.www.http.HttpClient.New(HttpClient.java:339)
at sun.net.www.http.HttpClient.New(HttpClient.java:357)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1220)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1156)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1050)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:984)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1564)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1492)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:185)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:118)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:191)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:167)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:160)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:152)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:116)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

Docker-Compose File:

version: '3.8'
services:
  zookeeper:
    image: bitnami/zookeeper:latest
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    ports:
      - '2181' 
  kafka:
    image: bitnami/kafka:latest
    environment:
      - KAFKA_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ZOOKEEPER_PROTOCOL=PLAINTEXT
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9091,CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9091,CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
    volumes:
      - "./Components/scripts/files/:/tmp/"
    ports:
      - '9093:9093'
  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    ports:
      - '8081'
    depends_on:
      - kafka
      - zookeeper
  kafka-connect-avro:
    image: confluentinc/cp-kafka-connect:latest
    environment:
      - CONNECT_BOOTSTRAP_SERVERS=kafka:9092 
      - CONNECT_REST_PORT=8083 
      - CONNECT_GROUP_ID=quickstart-avro 
      - CONNECT_CONFIG_STORAGE_TOPIC=quickstart-avro-config 
      - CONNECT_OFFSET_STORAGE_TOPIC=quickstart-avro-offsets 
      - CONNECT_STATUS_STORAGE_TOPIC=quickstart-avro-status
      - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 
      - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 
      - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 
      - CONNECT_KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
      - CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter 
      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 
      - CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter 
      - CONNECT_REST_ADVERTISED_HOST_NAME=kafka-connect-avro
      - CONNECT_LOG4J_ROOT_LOGLEVEL=FATAL 
      - CONNECT_PLUGIN_PATH=/usr/share/java/, /etc/kafka-connect/jars
    volumes:
      - "./Components/Avro/jars:/etc/kafka-connect/jars"
      - "./Components/Avro/file:/tmp/quickstart"
      - "./Components/Avro/plugin/confluentinc-kafka-connect-jdbc-5.5.2:/usr/share/java/kafka-connect-jdbc"
    ports:
      - '8083:8083'
    depends_on:
      - kafka
  mysql:
    image: mysql
    environment:
      - MYSQL_ROOT_PASSWORD=confluent 
      - MYSQL_USER=confluent 
      - MYSQL_PASSWORD=confluent 
      - MYSQL_DATABASE=connect_test 
    ports:
      - "3306:3306" 
    volumes:
      - "./volumes/mysql:/var/lib/mysql"

答案1

得分: 6

kafka-avro-console-consumer 默认使用 http://localhost:8081 作为模式注册中心。

您需要添加另一个参数以连接到模式注册中心容器,例如 --property schema.registry.url=http://schema-registry:8081

https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-avro.html#sr-test-drive-avro

或者,您可以进入模式注册中心容器,并使用相同的命令运行,不需要额外的参数,因为它将默认使用本地实例。

英文:

kafka-avro-console-consumer defaults to use http://localhost:8081 for the schema registry

You need to add another argument to connect to the schema registry container, such as --property schema.registry.url=http://schema-registry:8081

https://docs.confluent.io/current/schema-registry/serdes-develop/serdes-avro.html#sr-test-drive-avro

Or you can exec into the schema registry container and run the same command with no additional arguments since it'll default to the local instance

huangapple
  • 本文由 发表于 2020年10月8日 05:03:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/64252285.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定