连接Sprig到Kafka已经开始使用Docker Compose进行本地开发。

huangapple go评论128阅读模式
英文:

connecting sprig to kafka started using docker compose for localhost development

问题

抱歉,您的代码片段很长,翻译整个代码片段可能会导致超出字数限制。请问您是否有特定部分需要翻译或有特定问题需要解答?

英文:

After hours of trying to find a solution for this problem I decided to post it here.

I have the following docker-compose which starts zookeper, 2 kafka brokers and kafdrop

  1. version: '2'
  2. networks:
  3. kafka-net:
  4. driver: bridge
  5. services:
  6. zookeeper-server:
  7. image: 'bitnami/zookeeper:latest'
  8. networks:
  9. - kafka-net
  10. ports:
  11. - '2181:2181'
  12. environment:
  13. - ALLOW_ANONYMOUS_LOGIN=yes
  14. kafka-broker-1:
  15. image: 'bitnami/kafka:latest'
  16. networks:
  17. - kafka-net
  18. ports:
  19. - '9092:9092'
  20. - '29092:29092'
  21. environment:
  22. - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
  23. - KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE://kafka-broker-1:9092,OUTSIDE://localhost:29092
  24. - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
  25. - KAFKA_CFG_LISTENERS=INSIDE://:9092,OUTSIDE://:29092
  26. - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE
  27. - KAFKA_CFG_BROKER_ID=1
  28. - ALLOW_PLAINTEXT_LISTENER=yes
  29. depends_on:
  30. - zookeeper-server
  31. kafka-broker-2:
  32. image: 'bitnami/kafka:latest'
  33. networks:
  34. - kafka-net
  35. ports:
  36. - '9093:9092'
  37. - '29093:29092'
  38. environment:
  39. - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
  40. - KAFKA_CFG_ADVERTISED_LISTENERS=INSIDE://kafka-broker-2:9093,OUTSIDE://localhost:29093
  41. - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
  42. - KAFKA_CFG_LISTENERS=INSIDE://:9093,OUTSIDE://:29093
  43. - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INSIDE
  44. - KAFKA_CFG_BROKER_ID=2
  45. - ALLOW_PLAINTEXT_LISTENER=yes
  46. depends_on:
  47. - zookeeper-server
  48. kafdrop-web:
  49. image: obsidiandynamics/kafdrop
  50. networks:
  51. - kafka-net
  52. ports:
  53. - '9000:9000'
  54. environment:
  55. - KAFKA_BROKERCONNECT=kafka-broker-1:9092,kafka-broker-2:9093
  56. depends_on:
  57. - kafka-broker-1
  58. - kafka-broker-2

Then I am trying to connect my Spring command line app to Kafka. I took the quickstart example from spring kafka docs.

Properties file

  1. spring.kafka.bootstrap-servers=localhost:29092,localhost:29093
  2. spring.kafka.consumer.group-id=cg1
  3. spring.kafka.consumer.auto-offset-reset=earliest
  4. logging.level.org.springframework.kafka=debug
  5. logging.level.org.apache.kafka=debug

And the app code

  1. @SpringBootApplication
  2. public class App {
  3. public static void main(String[] args) {
  4. SpringApplication.run(App.class, args);
  5. }
  6. }
  7. @Component
  8. public class Runner implements CommandLineRunner {
  9. public static Logger logger = LoggerFactory.getLogger(Runner.class);
  10. @Autowired
  11. private KafkaTemplate<String, String> template;
  12. private final CountDownLatch latch = new CountDownLatch(3);
  13. @Override
  14. public void run(String... args) throws Exception {
  15. this.template.send("myTopic", "foo1");
  16. this.template.send("myTopic", "foo2");
  17. this.template.send("myTopic", "foo3");
  18. latch.await(60, TimeUnit.SECONDS);
  19. logger.info("All received");
  20. }
  21. @KafkaListener(topics = "myTopic")
  22. public void listen(ConsumerRecord<?, ?> cr) throws Exception {
  23. logger.info(cr.toString());
  24. latch.countDown();
  25. }
  26. }

The logs I get when I run the app

  1. 2020-04-07 01:06:45.074 DEBUG 7560 --- [ main] KafkaListenerAnnotationBeanPostProcessor : 1 @KafkaListener methods processed on bean 'runner': {public void com.vdt.learningkafka.Runner.listen(org.apache.kafka.clients.consumer.ConsumerRecord) throws java.lang.Exception=[@org.springframework.kafka.annotation.KafkaListener(autoStartup=, beanRef=__listener, clientIdPrefix=, concurrency=, containerFactory=, containerGroup=, errorHandler=, groupId=, id=, idIsGroup=true, properties=[], splitIterables=true, topicPartitions=[], topicPattern=, topics=[myTopic])]}
  2. 2020-04-07 01:06:45.284 INFO 7560 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
  3. bootstrap.servers = [localhost:29092, localhost:29093]
  4. client.dns.lookup = default
  5. client.id =
  6. connections.max.idle.ms = 300000
  7. metadata.max.age.ms = 300000
  8. metric.reporters = []
  9. metrics.num.samples = 2
  10. metrics.recording.level = INFO
  11. metrics.sample.window.ms = 30000
  12. receive.buffer.bytes = 65536
  13. reconnect.backoff.max.ms = 1000
  14. reconnect.backoff.ms = 50
  15. request.timeout.ms = 120000
  16. retries = 5
  17. retry.backoff.ms = 100
  18. sasl.client.callback.handler.class = null
  19. sasl.jaas.config = null
  20. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  21. sasl.kerberos.min.time.before.relogin = 60000
  22. sasl.kerberos.service.name = null
  23. sasl.kerberos.ticket.renew.jitter = 0.05
  24. sasl.kerberos.ticket.renew.window.factor = 0.8
  25. sasl.login.callback.handler.class = null
  26. sasl.login.class = null
  27. sasl.login.refresh.buffer.seconds = 300
  28. sasl.login.refresh.min.period.seconds = 60
  29. sasl.login.refresh.window.factor = 0.8
  30. sasl.login.refresh.window.jitter = 0.05
  31. sasl.mechanism = GSSAPI
  32. security.protocol = PLAINTEXT
  33. send.buffer.bytes = 131072
  34. ssl.cipher.suites = null
  35. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  36. ssl.endpoint.identification.algorithm = https
  37. ssl.key.password = null
  38. ssl.keymanager.algorithm = SunX509
  39. ssl.keystore.location = null
  40. ssl.keystore.password = null
  41. ssl.keystore.type = JKS
  42. ssl.protocol = TLS
  43. ssl.provider = null
  44. ssl.secure.random.implementation = null
  45. ssl.trustmanager.algorithm = PKIX
  46. ssl.truststore.location = null
  47. ssl.truststore.password = null
  48. ssl.truststore.type = JKS
  49. 2020-04-07 01:06:45.305 DEBUG 7560 --- [ main] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Setting bootstrap cluster metadata Cluster(id = null, nodes = [localhost:29093 (id: -2 rack: null), localhost:29092 (id: -1 rack: null)], partitions = [], controller = null).
  50. 2020-04-07 01:06:45.411 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name connections-closed:
  51. 2020-04-07 01:06:45.413 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name connections-created:
  52. 2020-04-07 01:06:45.414 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name successful-authentication:
  53. 2020-04-07 01:06:45.414 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name successful-reauthentication:
  54. 2020-04-07 01:06:45.415 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name successful-authentication-no-reauth:
  55. 2020-04-07 01:06:45.415 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name failed-authentication:
  56. 2020-04-07 01:06:45.415 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name failed-reauthentication:
  57. 2020-04-07 01:06:45.415 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name reauthentication-latency:
  58. 2020-04-07 01:06:45.416 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name bytes-sent-received:
  59. 2020-04-07 01:06:45.416 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name bytes-sent:
  60. 2020-04-07 01:06:45.417 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name bytes-received:
  61. 2020-04-07 01:06:45.418 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name select-time:
  62. 2020-04-07 01:06:45.419 DEBUG 7560 --- [ main] org.apache.kafka.common.metrics.Metrics : Added sensor with name io-time:
  63. 2020-04-07 01:06:45.428 INFO 7560 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
  64. 2020-04-07 01:06:45.428 INFO 7560 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
  65. 2020-04-07 01:06:45.428 INFO 7560 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586210805426
  66. 2020-04-07 01:06:45.430 DEBUG 7560 --- [ main] o.a.k.clients.admin.KafkaAdminClient : [AdminClient clientId=adminclient-1] Kafka admin client initialized
  67. 2020-04-07 01:06:45.433 DEBUG 7560 --- [ main] o.a.k.clients.admin.KafkaAdminClient : [AdminClient clientId=adminclient-1] Queueing Call(callName=describeTopics, deadlineMs=1586210925432) with a timeout 120000 ms from now.
  68. 2020-04-07 01:06:45.434 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating connection to node localhost:29093 (id: -2 rack: null) using address localhost/127.0.0.1
  69. 2020-04-07 01:06:45.442 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--2.bytes-sent
  70. 2020-04-07 01:06:45.443 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--2.bytes-received
  71. 2020-04-07 01:06:45.443 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--2.latency
  72. 2020-04-07 01:06:45.445 DEBUG 7560 --- [| adminclient-1] o.apache.kafka.common.network.Selector : [AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -2
  73. 2020-04-07 01:06:45.569 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Completed connection to node -2. Fetching API versions.
  74. 2020-04-07 01:06:45.569 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating API versions fetch from node -2.
  75. 2020-04-07 01:06:45.576 DEBUG 7560 --- [| adminclient-1] o.apache.kafka.common.network.Selector : [AdminClient clientId=adminclient-1] Connection with localhost/127.0.0.1 disconnected
  76. java.io.EOFException: null
  77. at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96) ~[kafka-clients-2.3.1.jar:na]
  78. at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) ~[kafka-clients-2.3.1.jar:na]
  79. at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) ~[kafka-clients-2.3.1.jar:na]
  80. at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) ~[kafka-clients-2.3.1.jar:na]
  81. at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) ~[kafka-clients-2.3.1.jar:na]
  82. at org.apache.kafka.common.network.Selector.poll(Selector.java:483) ~[kafka-clients-2.3.1.jar:na]
  83. at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539) ~[kafka-clients-2.3.1.jar:na]
  84. at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152) ~[kafka-clients-2.3.1.jar:na]
  85. at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
  86. 2020-04-07 01:06:45.577 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node -2 disconnected.
  87. 2020-04-07 01:06:45.578 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating connection to node localhost:29092 (id: -1 rack: null) using address localhost/127.0.0.1
  88. 2020-04-07 01:06:45.578 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-sent
  89. 2020-04-07 01:06:45.579 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.bytes-received
  90. 2020-04-07 01:06:45.579 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--1.latency
  91. 2020-04-07 01:06:45.580 DEBUG 7560 --- [| adminclient-1] o.apache.kafka.common.network.Selector : [AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
  92. 2020-04-07 01:06:45.580 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Completed connection to node -1. Fetching API versions.
  93. 2020-04-07 01:06:45.580 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating API versions fetch from node -1.
  94. 2020-04-07 01:06:45.586 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Recorded API versions for node -1: (Produce(0): 0 to 8 [usable: 7], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 8], LeaderAndIsr(4): 0 to 4 [usable: 2], StopReplica(5): 0 to 2 [usable: 1], UpdateMetadata(6): 0 to 6 [usable: 5], ControlledShutdown(7): 0 to 3 [usable: 2], OffsetCommit(8): 0 to 8 [usable: 7], OffsetFetch(9): 0 to 6 [usable: 5], FindCoordinator(10): 0 to 3 [usable: 2], JoinGroup(11): 0 to 6 [usable: 5], Heartbeat(12): 0 to 4 [usable: 3], LeaveGroup(13): 0 to 4 [usable: 2], SyncGroup(14): 0 to 4 [usable: 3], DescribeGroups(15): 0 to 5 [usable: 3], ListGroups(16): 0 to 3 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 2], CreateTopics(19): 0 to 5 [usable: 3], DeleteTopics(20): 0 to 4 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 2 [usable: 1], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 1], ElectPreferredLeaders(43): 0 to 2 [usable: 0], IncrementalAlterConfigs(44): 0 to 1 [usable: 0], UNKNOWN(45): 0, UNKNOWN(46): 0, UNKNOWN(47): 0)
  95. 2020-04-07 01:06:45.594 DEBUG 7560 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Updating cluster metadata to Cluster(id = gSypiCeoSlyuSR4ks5qwwA, nodes = [localhost:29092 (id: 1 rack: null), localhost:29093 (id: 2 rack: null)], partitions = [], controller = localhost:29093 (id: 2 rack: null))
  96. 2020-04-07 01:06:45.595 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating connection to node localhost:29093 (id: 2 rack: null) using address localhost/127.0.0.1
  97. 2020-04-07 01:06:45.596 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-2.bytes-sent
  98. 2020-04-07 01:06:45.597 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-2.bytes-received
  99. 2020-04-07 01:06:45.597 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-2.latency
  100. 2020-04-07 01:06:45.598 DEBUG 7560 --- [| adminclient-1] o.apache.kafka.common.network.Selector : [AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
  101. 2020-04-07 01:06:45.598 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Completed connection to node 2. Fetching API versions.
  102. 2020-04-07 01:06:45.598 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating API versions fetch from node 2.
  103. 2020-04-07 01:06:45.598 DEBUG 7560 --- [| adminclient-1] o.apache.kafka.common.network.Selector : [AdminClient clientId=adminclient-1] Connection with localhost/127.0.0.1 disconnected
  104. java.io.EOFException: null
  105. at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96) ~[kafka-clients-2.3.1.jar:na]
  106. at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) ~[kafka-clients-2.3.1.jar:na]
  107. at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) ~[kafka-clients-2.3.1.jar:na]
  108. at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) ~[kafka-clients-2.3.1.jar:na]
  109. at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) ~[kafka-clients-2.3.1.jar:na]
  110. at org.apache.kafka.common.network.Selector.poll(Selector.java:483) ~[kafka-clients-2.3.1.jar:na]
  111. at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539) ~[kafka-clients-2.3.1.jar:na]
  112. at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152) ~[kafka-clients-2.3.1.jar:na]
  113. at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
  114. 2020-04-07 01:06:45.598 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Node 2 disconnected.
  115. 2020-04-07 01:06:45.598 DEBUG 7560 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Requesting metadata update.
  116. 2020-04-07 01:06:45.598 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating connection to node localhost:29092 (id: 1 rack: null) using address localhost/127.0.0.1
  117. 2020-04-07 01:06:45.599 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-1.bytes-sent
  118. 2020-04-07 01:06:45.600 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-1.bytes-received
  119. 2020-04-07 01:06:45.600 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Added sensor with name node-1.latency
  120. 2020-04-07 01:06:45.600 DEBUG 7560 --- [| adminclient-1] o.apache.kafka.common.network.Selector : [AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 1
  121. 2020-04-07 01:06:45.600 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Completed connection to node 1. Fetching API versions.
  122. 2020-04-07 01:06:45.600 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating API versions fetch from node 1.
  123. 2020-04-07 01:06:45.602 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Recorded API versions for node 1: (Produce(0): 0 to 8 [usable: 7], Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5], Metadata(3): 0 to 9 [usable: 8], LeaderAndIsr(4): 0 to 4 [usable: 2], StopReplica(5): 0 to 2 [usable: 1], UpdateMetadata(6): 0 to 6 [usable: 5], ControlledShutdown(7): 0 to 3 [usable: 2], OffsetCommit(8): 0 to 8 [usable: 7], OffsetFetch(9): 0 to 6 [usable: 5], FindCoordinator(10): 0 to 3 [usable: 2], JoinGroup(11): 0 to 6 [usable: 5], Heartbeat(12): 0 to 4 [usable: 3], LeaveGroup(13): 0 to 4 [usable: 2], SyncGroup(14): 0 to 4 [usable: 3], DescribeGroups(15): 0 to 5 [usable: 3], ListGroups(16): 0 to 3 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3 [usable: 2], CreateTopics(19): 0 to 5 [usable: 3], DeleteTopics(20): 0 to 4 [usable: 3], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 2 [usable: 1], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 2], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 1], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 2 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 2 [usable: 1], ElectPreferredLeaders(43): 0 to 2 [usable: 0], IncrementalAlterConfigs(44): 0 to 1 [usable: 0], UNKNOWN(45): 0, UNKNOWN(46): 0, UNKNOWN(47): 0)
  124. 2020-04-07 01:06:45.605 DEBUG 7560 --- [| adminclient-1] o.a.k.c.a.i.AdminMetadataManager : [AdminClient clientId=adminclient-1] Updating cluster metadata to Cluster(id = gSypiCeoSlyuSR4ks5qwwA, nodes = [localhost:29092 (id: 1 rack: null), localhost:29093 (id: 2 rack: null)], partitions = [], controller = localhost:29093 (id: 2 rack: null))
  125. 2020-04-07 01:06:45.639 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating connection to node localhost:29093 (id: 2 rack: null) using address localhost/127.0.0.1
  126. 2020-04-07 01:06:45.640 DEBUG 7560 --- [| adminclient-1] o.apache.kafka.common.network.Selector : [AdminClient clientId=adminclient-1] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2
  127. 2020-04-07 01:06:45.640 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Completed connection to node 2. Fetching API versions.
  128. 2020-04-07 01:06:45.640 DEBUG 7560 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Initiating API versions fetch from node 2.
  129. 2020-04-07 01:06:45.641 DEBUG 7560 --- [| adminclient-1] o.apache.kafka.common.network.Selector : [AdminClient clientId=adminclient-1] Connection with localhost/127.0.0.1 disconnected
  130. java.io.EOFException: null
  131. at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96) ~[kafka-clients-2.3.1.jar:na]
  132. at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) ~[kafka-clients-2.3.1.jar:na]
  133. at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) ~[kafka-clients-2.3.1.jar:na]
  134. at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) ~[kafka-clients-2.3.1.jar:na]
  135. at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) ~[kafka-clients-2.3.1.jar:na]
  136. at org.apache.kafka.common.network.Selector.poll(Selector.java:483) ~[kafka-clients-2.3.1.jar:na]
  137. at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539) ~[kafka-clients-2.3.1.jar:na]
  138. at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152) ~[kafka-clients-2.3.1.jar:na]
  139. at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

The configuration from docker seems to be fine since Kafdrop successfully connects brokers and also, from what I get from the logs, the spring application manages to connect to the brokers but immediately after that the connection is getting closed.

答案1

得分: 1

连接基于Docker的应用程序与运行在Docker容器内的Kafka时,必须使用容器的名称(用作其地址的别名)来引用Kafka容器:

  1. spring.kafka.bootstrap-servers=kafka-broker-1:29092,kafka-broker-2:29093

如果尝试将Docker容器内的应用程序连接到localhost:29092,它会尝试连接到同一容器的本地主机,而不是外部网络。

从Docker的配置来看,Kafdrop成功连接了代理

是的,检查docker-compose.yml中的kafdrop-web部分,并查看它如何连接到Kafka代理。这里使用了容器的名称:

  1. KAFKA_BROKERCONNECT=kafka-broker-1:9092,kafka-broker-2:9093
英文:

To connect a Docker-based application with Kafka, also running inside the Docker container, you have to refer the the name (alias for its address) of the container with Kafka:

  1. spring.kafka.bootstrap-servers=kafka-broker-1:29092,kafka-broker-2:29093

If you try to connect the application inside the Docker container to localhost:29092, it tries to connect to the localhost of the very same container and not the outer network.

> The configuration from docker seems to be fine since Kafdrop successfully connects brokers

Yes, check the kafdrop-web inside the docker-compose.yml and take a look how it connects to the Kafka broker. There are used the names of the containers:

  1. KAFKA_BROKERCONNECT=kafka-broker-1:9092,kafka-broker-2:9093

huangapple
  • 本文由 发表于 2020年4月7日 06:14:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/61069792.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定