英文:
how to access kafka installed in docker with golang on host
问题
我需要使用Golang访问Kafka,所以我在Docker中安装了Kafka和Zookeeper。
- 这是Kafka的安装脚本:
# 拉取镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
# 运行Kafka和Zookeeper
docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper
docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092:9092 --link zookeeper:zk -t wurstmeister/kafka
# 进入容器
docker exec -it ${CONTAINER ID} /bin/bash
cd opt/kafka_2.11-0.10.1.1/
# 创建一个主题
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka
# 在终端1中启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
# 在终端2中启动消费者
bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning
当我在生产者中输入一些消息时,消费者将立即收到它。所以我认为Kafka正常工作。
- 现在我需要使用Golang创建一个消费者来访问Kafka。
这是我的Golang演示代码:
import "github.com/bsm/sarama-cluster"
func Consumer() {
// 初始化配置,启用错误和通知
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
// 初始化消费者
brokers := []string{"192.168.9.100:9092"}
topics := []string{"mykafka"}
consumer, err := cluster.NewConsumer(brokers, "my-group-id", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// 捕获SIGINT以触发关闭
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// 消费消息,监视错误和通知
for {
select {
case msg, more := <-consumer.Messages():
if more {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // 标记消息已处理
}
case err, more := <-consumer.Errors():
if more {
log.Printf("Error: %s\n", err.Error())
}
case ntf, more := <-consumer.Notifications():
if more {
log.Printf("Rebalanced: %+v\n", ntf)
}
case <-signals:
return
}
}
}
实际上,这个演示代码是从GitHub仓库的演示中复制过来的:sarama-cluster
运行代码时,我遇到了一个错误:
kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
我在启动Kafka时使用了端口映射,但在Golang中无法访问它。
有没有办法使用curl访问Kafka?
我尝试过:
curl http://192.168.99.10:9092
然后Kafka报告了一个错误:
[2017-08-02 06:39:15,232] WARN Unexpected error from /192.168.99.1; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:95)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at kafka.network.Processor.poll(SocketServer.scala:499)
at kafka.network.Processor.run(SocketServer.scala:435)
at java.lang.Thread.run(Thread.java:748)
顺便说一下:
我使用的是Windows 7。
Docker机器的IP是192.168.99.100。
这让我很疯狂。
有什么建议或解决方案吗?感谢!
英文:
I need to use golang to access kafka,so i installed a kafka & zookepper in docker.
1.here is kafka install script:
# pull images
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
# run kafka & zookepper
docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper
docker run --name kafka -e HOST_IP=localhost -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_BROKER_ID=1 -e ZK=zk -p 9092:9092 --link zookeeper:zk -t wurstmeister/kafka
# enter container
docker exec -it ${CONTAINER ID} /bin/bash
cd opt/kafka_2.11-0.10.1.1/
# make a tpoic
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka
# start a producer in terminal-1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
# start another terminal-2 and start a consumer
bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning
when i type some message in producer, the consumer will get it immediately.
so i assumed that the kafka is working fine
2.Now i need to create a consumer with golang to access kafka.
here is my golang demo code:
import "github.com/bsm/sarama-cluster"
func Consumer(){
// init (custom) config, enable errors and notifications
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
// init consumer
brokers := []string{"192.168.9.100:9092"}
topics := []string{"mykafka"}
consumer, err := cluster.NewConsumer(brokers, "my-group-id", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume messages, watch errors and notifications
for {
select {
case msg, more := <-consumer.Messages():
if more {
fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
case err, more := <-consumer.Errors():
if more {
log.Printf("Error: %s\n", err.Error())
}
case ntf, more := <-consumer.Notifications():
if more {
log.Printf("Rebalanced: %+v\n", ntf)
}
case <-signals:
return
}
}
}
actually this demo code is copied from a github repo's demo:sarama-cluster
When running the code, i got an error:
kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
i did use a port map when start kafka,but just can't access it in golang
is there a way to use curl to access kafka?
i'v tried:
curl http://192.168.99.10:9092
and kafka report an error:
[2017-08-02 06:39:15,232] WARN Unexpected error from /192.168.99.1; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:95)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at kafka.network.Processor.poll(SocketServer.scala:499)
at kafka.network.Processor.run(SocketServer.scala:435)
at java.lang.Thread.run(Thread.java:748)
BTW:
i use windows 7
dcoker machine's ip :192.168.99.100
it's drived me crazy
Is there some advice or solution? appreciate!!!
答案1
得分: 2
如果你想创建一个从Kafka监听主题的消费者,可以尝试以下方法。
我使用了教程中的confluent-kafka-go库:https://github.com/confluentinc/confluent-kafka-go
以下是main.go文件中的代码:
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// 客户端将自动尝试从所有错误中恢复。
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
如果你使用Docker构建:请按照此注释添加适当的软件包。
对于基于Debian和Ubuntu的发行版,请从标准软件源或使用Confluent的Deb软件源安装librdkafka-dev。
对于基于Redhat的发行版,请使用Confluent的YUM软件源安装librdkafka-devel。
对于MacOS X,请使用Homebrew安装librdkafka。如果你还没有pkg-config,你可能还需要brew install pkg-config。brew install librdkafka pkg-config。
对于Alpine:apk add librdkafka-dev pkgconf
confluent-kafka-go不支持Windows。
对于Alpine,请记住安装社区版本,因为它无法安装最新版本1.1.0的librdkafka(不使用Alpine社区版本)
祝你好运!
英文:
If you want to create a consumer to listen a topic from Kafka, let's try that way.
I used confluent-kafka-go from the tutorial: https://github.com/confluentinc/confluent-kafka-go
This is the code on main.go file:
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
If you use docker to build: follow this comment to add suitable packages
For Debian and Ubuntu based distros, install librdkafka-dev from the standard repositories or using Confluent's Deb repository.
For Redhat based distros, install librdkafka-devel using Confluent's YUM repository.
For MacOS X, install librdkafka from Homebrew. You may also need to brew install pkg-config if you don't already have it. brew install librdkafka pkg-config.
For Alpine: apk add librdkafka-dev pkgconf
confluent-kafka-go is not supported on Windows.
> With Alpine, please remember that install the community version, because it cannot install librdkafka with max version 1.1.0 (not use Alpine community version)
Good luck!
答案2
得分: 0
不确定是否可以使用curl与kafka一起使用。但是你可以使用kafka-console-consumer。
kafka-console-consumer.bat --bootstrap-server 192.168.9.100:9092 --topic mykafka --from-beginning
英文:
Not sure, if it is possible to use with curl with kafka. But you can use the kafka-console-consumer.
kafka-console-consumer.bat --bootstrap-server 192.168.9.100:9092 --topic mykafka --from-beginning
答案3
得分: 0
我找到了原因。
原因是kafka的设置不正确。
这是server.properties
文件的内容:
############################# Socket Server Settings #############################
# Socket服务器监听的地址。如果未配置,它将使用java.net.InetAddress.getCanonicalHostName()返回的值。
# 格式:
# listeners = listener_name://host_name:port
# 示例:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Broker向生产者和消费者广告的主机名和端口。如果未设置,它将使用"listeners"的值(如果已配置)。否则,它将使用java.net.InetAddress.getCanonicalHostName()返回的值。
#advertised.listeners=PLAINTEXT://your.host.name:9092
如果listeners
未设置,kafka只会接收来自java.net.InetAddress.getCanonicalHostName()
(即localhost)的请求。
所以我应该设置:
listeners = PLAINTEXT://0.0.0.0:9092
这样就可以了。
英文:
I'v found the reason.
because the kafka's settings is not correct
this is server.properties
:
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
if the listeners
is not set , kafka will only receive request from java.net.InetAddress.getCanonicalHostName()
which means localhost
so i shuld set :
listeners = PLAINTEXT://0.0.0.0:9092
this will work
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论