how to access kafka installed in docker with golang on host

huangapple go评论134阅读模式
英文:

how to access kafka installed in docker with golang on host

问题

我需要使用Golang访问Kafka,所以我在Docker中安装了Kafka和Zookeeper。

  1. 这是Kafka的安装脚本:
  1. # 拉取镜像
  2. docker pull wurstmeister/zookeeper
  3. docker pull wurstmeister/kafka
  4. # 运行Kafka和Zookeeper
  5. docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper
  6. docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092:9092 --link zookeeper:zk -t wurstmeister/kafka
  7. # 进入容器
  8. docker exec -it ${CONTAINER ID} /bin/bash
  9. cd opt/kafka_2.11-0.10.1.1/
  10. # 创建一个主题
  11. bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka
  12. # 在终端1中启动生产者
  13. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
  14. # 在终端2中启动消费者
  15. bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning

当我在生产者中输入一些消息时,消费者将立即收到它。所以我认为Kafka正常工作。

  1. 现在我需要使用Golang创建一个消费者来访问Kafka。

这是我的Golang演示代码:

  1. import "github.com/bsm/sarama-cluster"
  2. func Consumer() {
  3. // 初始化配置,启用错误和通知
  4. config := cluster.NewConfig()
  5. config.Consumer.Return.Errors = true
  6. config.Group.Return.Notifications = true
  7. // 初始化消费者
  8. brokers := []string{"192.168.9.100:9092"}
  9. topics := []string{"mykafka"}
  10. consumer, err := cluster.NewConsumer(brokers, "my-group-id", topics, config)
  11. if err != nil {
  12. panic(err)
  13. }
  14. defer consumer.Close()
  15. // 捕获SIGINT以触发关闭
  16. signals := make(chan os.Signal, 1)
  17. signal.Notify(signals, os.Interrupt)
  18. // 消费消息,监视错误和通知
  19. for {
  20. select {
  21. case msg, more := <-consumer.Messages():
  22. if more {
  23. fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
  24. consumer.MarkOffset(msg, "") // 标记消息已处理
  25. }
  26. case err, more := <-consumer.Errors():
  27. if more {
  28. log.Printf("Error: %s\n", err.Error())
  29. }
  30. case ntf, more := <-consumer.Notifications():
  31. if more {
  32. log.Printf("Rebalanced: %+v\n", ntf)
  33. }
  34. case <-signals:
  35. return
  36. }
  37. }
  38. }

实际上,这个演示代码是从GitHub仓库的演示中复制过来的:sarama-cluster

运行代码时,我遇到了一个错误:

  1. kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

我在启动Kafka时使用了端口映射,但在Golang中无法访问它。

有没有办法使用curl访问Kafka?
我尝试过:

  1. curl http://192.168.99.10:9092

然后Kafka报告了一个错误:

  1. [2017-08-02 06:39:15,232] WARN Unexpected error from /192.168.99.1; closing connection (org.apache.kafka.common.network.Selector)
  2. org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
  3. at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:95)
  4. at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
  5. at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
  6. at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
  7. at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379)
  8. at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
  9. at kafka.network.Processor.poll(SocketServer.scala:499)
  10. at kafka.network.Processor.run(SocketServer.scala:435)
  11. at java.lang.Thread.run(Thread.java:748)

顺便说一下:

我使用的是Windows 7。

Docker机器的IP是192.168.99.100。

这让我很疯狂。

有什么建议或解决方案吗?感谢!

英文:

I need to use golang to access kafka,so i installed a kafka & zookepper in docker.

1.here is kafka install script:

  1. # pull images
  2. docker pull wurstmeister/zookeeper
  3. docker pull wurstmeister/kafka
  4. # run kafka &amp; zookepper
  5. docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper
  6. docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092:9092 --link zookeeper:zk -t wurstmeister/kafka
  7. # enter container
  8. docker exec -it ${CONTAINER ID} /bin/bash
  9. cd opt/kafka_2.11-0.10.1.1/
  10. # make a tpoic
  11. bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka
  12. # start a producer in terminal-1
  13. bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
  14. # start another terminal-2 and start a consumer
  15. bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning

when i type some message in producer, the consumer will get it immediately.
so i assumed that the kafka is working fine

2.Now i need to create a consumer with golang to access kafka.

here is my golang demo code:

  1. import &quot;github.com/bsm/sarama-cluster&quot;
  2. func Consumer(){
  3. // init (custom) config, enable errors and notifications
  4. config := cluster.NewConfig()
  5. config.Consumer.Return.Errors = true
  6. config.Group.Return.Notifications = true
  7. // init consumer
  8. brokers := []string{&quot;192.168.9.100:9092&quot;}
  9. topics := []string{&quot;mykafka&quot;}
  10. consumer, err := cluster.NewConsumer(brokers, &quot;my-group-id&quot;, topics, config)
  11. if err != nil {
  12. panic(err)
  13. }
  14. defer consumer.Close()
  15. // trap SIGINT to trigger a shutdown.
  16. signals := make(chan os.Signal, 1)
  17. signal.Notify(signals, os.Interrupt)
  18. // consume messages, watch errors and notifications
  19. for {
  20. select {
  21. case msg, more := &lt;-consumer.Messages():
  22. if more {
  23. fmt.Fprintf(os.Stdout, &quot;%s/%d/%d\t%s\t%s\n&quot;, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
  24. consumer.MarkOffset(msg, &quot;&quot;) // mark message as processed
  25. }
  26. case err, more := &lt;-consumer.Errors():
  27. if more {
  28. log.Printf(&quot;Error: %s\n&quot;, err.Error())
  29. }
  30. case ntf, more := &lt;-consumer.Notifications():
  31. if more {
  32. log.Printf(&quot;Rebalanced: %+v\n&quot;, ntf)
  33. }
  34. case &lt;-signals:
  35. return
  36. }
  37. }

}

actually this demo code is copied from a github repo's demo:sarama-cluster

When running the code, i got an error:

  1. kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

i did use a port map when start kafka,but just can't access it in golang

is there a way to use curl to access kafka?
i'v tried:

  1. curl http://192.168.99.10:9092

and kafka report an error:

  1. [2017-08-02 06:39:15,232] WARN Unexpected error from /192.168.99.1; closing connection (org.apache.kafka.common.network.Selector)
  2. org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
  3. at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:95)
  4. at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
  5. at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
  6. at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
  7. at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379)
  8. at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
  9. at kafka.network.Processor.poll(SocketServer.scala:499)
  10. at kafka.network.Processor.run(SocketServer.scala:435)
  11. at java.lang.Thread.run(Thread.java:748)

BTW:

i use windows 7

dcoker machine's ip :192.168.99.100

it's drived me crazy

Is there some advice or solution? appreciate!!!

答案1

得分: 2

如果你想创建一个从Kafka监听主题的消费者,可以尝试以下方法。
我使用了教程中的confluent-kafka-go库:https://github.com/confluentinc/confluent-kafka-go

以下是main.go文件中的代码:

  1. import (
  2. "fmt"
  3. "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
  4. )
  5. func main() {
  6. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  7. "bootstrap.servers": "localhost",
  8. "group.id": "myGroup",
  9. "auto.offset.reset": "earliest",
  10. })
  11. if err != nil {
  12. panic(err)
  13. }
  14. c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
  15. for {
  16. msg, err := c.ReadMessage(-1)
  17. if err == nil {
  18. fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
  19. } else {
  20. // 客户端将自动尝试从所有错误中恢复。
  21. fmt.Printf("Consumer error: %v (%v)\n", err, msg)
  22. }
  23. }
  24. c.Close()
  25. }

如果你使用Docker构建:请按照此注释添加适当的软件包。

对于基于Debian和Ubuntu的发行版,请从标准软件源或使用Confluent的Deb软件源安装librdkafka-dev。

对于基于Redhat的发行版,请使用Confluent的YUM软件源安装librdkafka-devel。

对于MacOS X,请使用Homebrew安装librdkafka。如果你还没有pkg-config,你可能还需要brew install pkg-config。brew install librdkafka pkg-config。

对于Alpine:apk add librdkafka-dev pkgconf
confluent-kafka-go不支持Windows。

对于Alpine,请记住安装社区版本,因为它无法安装最新版本1.1.0的librdkafka(不使用Alpine社区版本)

祝你好运!

英文:

If you want to create a consumer to listen a topic from Kafka, let's try that way.
I used confluent-kafka-go from the tutorial: https://github.com/confluentinc/confluent-kafka-go

This is the code on main.go file:

  1. import (
  2. &quot;fmt&quot;
  3. &quot;gopkg.in/confluentinc/confluent-kafka-go.v1/kafka&quot;
  4. )
  5. func main() {
  6. c, err := kafka.NewConsumer(&amp;kafka.ConfigMap{
  7. &quot;bootstrap.servers&quot;: &quot;localhost&quot;,
  8. &quot;group.id&quot;: &quot;myGroup&quot;,
  9. &quot;auto.offset.reset&quot;: &quot;earliest&quot;,
  10. })
  11. if err != nil {
  12. panic(err)
  13. }
  14. c.SubscribeTopics([]string{&quot;myTopic&quot;, &quot;^aRegex.*[Tt]opic&quot;}, nil)
  15. for {
  16. msg, err := c.ReadMessage(-1)
  17. if err == nil {
  18. fmt.Printf(&quot;Message on %s: %s\n&quot;, msg.TopicPartition, string(msg.Value))
  19. } else {
  20. // The client will automatically try to recover from all errors.
  21. fmt.Printf(&quot;Consumer error: %v (%v)\n&quot;, err, msg)
  22. }
  23. }
  24. c.Close()
  25. }

If you use docker to build: follow this comment to add suitable packages

For Debian and Ubuntu based distros, install librdkafka-dev from the standard repositories or using Confluent's Deb repository.

For Redhat based distros, install librdkafka-devel using Confluent's YUM repository.

For MacOS X, install librdkafka from Homebrew. You may also need to brew install pkg-config if you don't already have it. brew install librdkafka pkg-config.

For Alpine: apk add librdkafka-dev pkgconf
confluent-kafka-go is not supported on Windows.

> With Alpine, please remember that install the community version, because it cannot install librdkafka with max version 1.1.0 (not use Alpine community version)

Good luck!

答案2

得分: 0

不确定是否可以使用curl与kafka一起使用。但是你可以使用kafka-console-consumer。

kafka-console-consumer.bat --bootstrap-server 192.168.9.100:9092 --topic mykafka --from-beginning

英文:

Not sure, if it is possible to use with curl with kafka. But you can use the kafka-console-consumer.

  1. kafka-console-consumer.bat --bootstrap-server 192.168.9.100:9092 --topic mykafka --from-beginning

答案3

得分: 0

我找到了原因。
原因是kafka的设置不正确。

这是server.properties文件的内容:

  1. ############################# Socket Server Settings #############################
  2. # Socket服务器监听的地址。如果未配置,它将使用java.net.InetAddress.getCanonicalHostName()返回的值。
  3. # 格式:
  4. # listeners = listener_name://host_name:port
  5. # 示例:
  6. # listeners = PLAINTEXT://your.host.name:9092
  7. #listeners=PLAINTEXT://:9092
  8. # Broker向生产者和消费者广告的主机名和端口。如果未设置,它将使用"listeners"的值(如果已配置)。否则,它将使用java.net.InetAddress.getCanonicalHostName()返回的值。
  9. #advertised.listeners=PLAINTEXT://your.host.name:9092

如果listeners未设置,kafka只会接收来自java.net.InetAddress.getCanonicalHostName()(即localhost)的请求。

所以我应该设置:

  1. listeners = PLAINTEXT://0.0.0.0:9092

这样就可以了。

英文:

I'v found the reason.
because the kafka's settings is not correct

this is server.properties:

  1. ############################# Socket Server Settings #############################
  2. # The address the socket server listens on. It will get the value returned from
  3. # java.net.InetAddress.getCanonicalHostName() if not configured.
  4. # FORMAT:
  5. # listeners = listener_name://host_name:port
  6. # EXAMPLE:
  7. # listeners = PLAINTEXT://your.host.name:9092
  8. #listeners=PLAINTEXT://:9092
  9. # Hostname and port the broker will advertise to producers and consumers. If not set,
  10. # it uses the value for &quot;listeners&quot; if configured. Otherwise, it will use the value
  11. # returned from java.net.InetAddress.getCanonicalHostName().
  12. #advertised.listeners=PLAINTEXT://your.host.name:9092

if the listeners is not set , kafka will only receive request from java.net.InetAddress.getCanonicalHostName() which means localhost

so i shuld set :

  1. listeners = PLAINTEXT://0.0.0.0:9092

this will work

huangapple
  • 本文由 发表于 2017年8月3日 09:19:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/45473299.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定