Golang的Confluent Kafka消费者无法接收消息。

huangapple go评论102阅读模式
英文:

golang confluent kafka consumer doesn't receive messages

问题

我正在尝试创建一个简单的生产者/消费者Kafka组合。由于生产者在confluent的GitHub页面上的示例中成功工作,所以我在实现消费者时遇到了问题。我使用的是云Kafka代理,即Cloudkarafka。下面是consumer.go的代码:

func main() {
    config := &kafka.ConfigMap{
        "metadata.broker.list":            "XXXXXXX", // Cloudkarafka提供给我的3个主机
        "security.protocol":               "SASL_SSL",
        "sasl.mechanisms":                 "SCRAM-SHA-256",
        "sasl.username":                   "XXXXXXXX", // Cloudkarafka提供的用户名
        "sasl.password":                   "XXXXXXXX", // Cloudkarafka提供的密码
        "group.id":                        "cloudkarafka-example",
        "go.events.channel.enable":        true,
        "go.application.rebalance.enable": true,
        "default.topic.config":            kafka.ConfigMap{"auto.offset.reset": "earliest"},
        //"debug":                           "generic,broker,security",
    }
    topic := "XXXXXX" + "A" // 用户名 + "A"

    consumer, err := kafka.NewConsumer(config)

    if err != nil {
        panic(fmt.Sprintf("Failed to create consumer: %s", err))
    }

    topics := []string{topic}
    //consumer.SubscribeTopics(topics, nil)
    err = consumer.SubscribeTopics(topics, nil)
    run := true
    for run == true {
        ev := consumer.Poll(0)
        switch e := ev.(type) {
        case *kafka.Message:
            fmt.Printf("%% Message on %s:\n%s\n",
                e.TopicPartition, string(e.Value))
        case kafka.PartitionEOF:
            fmt.Printf("%% Reached %v\n", e)
        case kafka.Error:
            fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
            run = false
        default:
            fmt.Printf("Ignored %v\n", e)
        }
    }

    consumer.Close()
}

我遇到的问题是,尽管我将消息发送到相同的主题,但消费者始终停留在默认情况下,并且不断输出**"Ignored "**。由于我对这些主题感到初学者,任何帮助和建议都将不胜感激。

附注:我使用的是Windows 11,在详细信息中显示"confluent-kafka-go在Windows上不受支持",但代码可以正常工作,只是停留在默认状态,生产者部分也可以正常工作。

producer.go:

config := &kafka.ConfigMap{
    "metadata.broker.list": "XXXXXXXXXX",
    "security.protocol":    "SASL_SSL",
    "sasl.mechanisms":      "SCRAM-SHA-256",
    "sasl.username":        "XXXXXXXXX",
    "sasl.password":        "XXXXXXXXX",
    "group.id":             "cloudkarafka-example",
    "default.topic.config": kafka.ConfigMap{"auto.offset.reset": "earliest"},
    //"debug":                           "generic,broker,security",
}
topic := "XXXXX-" + "A"
p, err := kafka.NewProducer(config)
if err != nil {
    fmt.Printf("Failed to create producer: %s\n", err)
    os.Exit(1)
}
fmt.Printf("Created Producer %v\n", p)
deliveryChan := make(chan kafka.Event)

for i := 0; i < 10; i++ {
    value := fmt.Sprintf("[%d] Hello Go!", i+1)
    err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan)
    e := <-deliveryChan
    m := e.(*kafka.Message)
    if m.TopicPartition.Error != nil {
        fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
    } else {
        fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
            *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
    }
}
close(deliveryChan)
英文:

I was trying to create a simple producer / consumer kafka duo. Since producer was successfully working, according to the examples in confluent's github page, I had trouble while implementing consumer. I use cloud kafka broker, which is Cloudkarafka. The consumer.go code is below:

func main() {
config := &amp;kafka.ConfigMap{
	&quot;metadata.broker.list&quot;:            &quot;XXXXXXX&quot;, // 3 hosts Cloudkarafka provides to me
	&quot;security.protocol&quot;:               &quot;SASL_SSL&quot;,
	&quot;sasl.mechanisms&quot;:                 &quot;SCRAM-SHA-256&quot;,
	&quot;sasl.username&quot;:                   &quot;XXXXXXXX&quot;, // My username provided by Cloudkarafka
	&quot;sasl.password&quot;:                   &quot;XXXXXXXX&quot;, // My password provided by 
	&quot;group.id&quot;:                        &quot;cloudkarafka-example&quot;,
	&quot;go.events.channel.enable&quot;:        true,
	&quot;go.application.rebalance.enable&quot;: true,
	&quot;default.topic.config&quot;:            kafka.ConfigMap{&quot;auto.offset.reset&quot;: &quot;earliest&quot;},
	//&quot;debug&quot;:                           &quot;generic,broker,security&quot;,
}
topic := &quot;XXXXXX&quot; + &quot;A&quot; // username + &quot;A&quot;

consumer, err := kafka.NewConsumer(config)

if err != nil {
	panic(fmt.Sprintf(&quot;Failed to create consumer: %s&quot;, err))
}

topics := []string{topic}
//consumer.SubscribeTopics(topics, nil)
err = consumer.SubscribeTopics(topics, nil)
run := true
for run == true {
	ev := consumer.Poll(0)
	switch e := ev.(type) {
	case *kafka.Message:
		fmt.Printf(&quot;%% Message on %s:\n%s\n&quot;,
			e.TopicPartition, string(e.Value))
	case kafka.PartitionEOF:
		fmt.Printf(&quot;%% Reached %v\n&quot;, e)
	case kafka.Error:
		fmt.Fprintf(os.Stderr, &quot;%% Error: %v\n&quot;, e)
		run = false
	default:
		fmt.Printf(&quot;Ignored %v\n&quot;, e)
	}
}

consumer.Close()
}

The problem here I get is, even though I produce messages to the same topic, consumer always stays in the default case, and constantly gives the output "Ignored &lt;nil> ". Since I feel beginner to these topics, any help & suggestion would be appreciated.

ps: I use Windows 11, in the details it says "confluent-kafka-go is not supported on Windows" but the code works just stays in default state, also the producer part just works fine.

producer.go:

config := &amp;kafka.ConfigMap{
	&quot;metadata.broker.list&quot;: &quot;XXXXXXXXXX&quot;,
	&quot;security.protocol&quot;:    &quot;SASL_SSL&quot;,
	&quot;sasl.mechanisms&quot;:      &quot;SCRAM-SHA-256&quot;,
	&quot;sasl.username&quot;:        &quot;XXXXXXXXX&quot;,
	&quot;sasl.password&quot;:        &quot;XXXXXXXXX&quot;,
	&quot;group.id&quot;:             &quot;cloudkarafka-example&quot;,
	&quot;default.topic.config&quot;: kafka.ConfigMap{&quot;auto.offset.reset&quot;: &quot;earliest&quot;},
	//&quot;debug&quot;:                           &quot;generic,broker,security&quot;,
}
topic := &quot;XXXXX-&quot; + &quot;A&quot;
p, err := kafka.NewProducer(config)
if err != nil {
	fmt.Printf(&quot;Failed to create producer: %s\n&quot;, err)
	os.Exit(1)
}
fmt.Printf(&quot;Created Producer %v\n&quot;, p)
deliveryChan := make(chan kafka.Event)

for i := 0; i &lt; 10; i++ {
	value := fmt.Sprintf(&quot;[%d] Hello Go!&quot;, i+1)
	err = p.Produce(&amp;kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &amp;topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan)
	e := &lt;-deliveryChan
	m := e.(*kafka.Message)
	if m.TopicPartition.Error != nil {
		fmt.Printf(&quot;Delivery failed: %v\n&quot;, m.TopicPartition.Error)
	} else {
		fmt.Printf(&quot;Delivered message to topic %s [%d] at offset %v\n&quot;,
			*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
	}
}
close(deliveryChan)

答案1

得分: 1

Poll()在超时时会返回nil。由于您指定了0ms的超时时间,我怀疑您看到的是没有消息可消费的消费者的行为。

也就是说,您要求它等待0ms以获取新消息,但从来没有新消息,因此每次调用Poll()都会立即返回nil。在没有特定的nil情况下,这些情况会由您的default情况处理。

  1. 您确定您正在向消费者订阅的相同主题发布消息吗?正如Andrey在评论中指出的那样,您的主题ID可能不同,或者您在消费者和生产者示例代码中对其进行了不同的混淆。为了避免在这些问题上产生不确定性,尝试首先使用不需要混淆的配置来重现您的问题可能更有帮助。

  2. 您从Subscribe()中得到了任何error吗?(为什么您没有检查这个错误?)

  3. 您等待了多长时间来消费消息?一个代理服务器可能需要几秒钟的时间来接受一个新的消费者加入到一个组中;在0ms的超时时间内,您可能会看到很多“没有消息”的事件,直到最终开始接收任何等待的消息。

对于一个最小化的工作示例,我建议尽可能简单地保持事情简单:

  • 如果您使用Poll()来读取消息,则不需要配置go.events.channel.enable

  • 如果您对初始偏移量不感兴趣,也不需要修改它们,则不需要配置go.application.rebalance.enable

  • 如果您对PartitionEOF等事件不感兴趣(您可能不感兴趣),那么您可能希望考虑使用更高级的consumer.ReadMessage()函数而不是Poll()ReadMessage仅返回消息或错误,并忽略所有其他事件)。

英文:

Poll() will return nil on timeout. Since you are specifying a timeout of 0ms, I suspect that what you are seeing is the behaviour of a consumer with no messages to consume.

i.e. you are asking it to wait 0ms for new messages, there are never any new messages so the Poll() call is immediately returning nil every time, all the time. Without a specific nil case, these are handled by your default case.

  1. Are you SURE you are producing messages to the same topic your consumer is subscribed to?<br/><br/>As Andrey pointed out in his comment, either your topic ids are different or you have obfuscated them differently in your consumer vs producer example code. It may be more helpful to attempt first reproducing your problem with a configuration that does not require obfuscation, to avoid uncertainty on such points.

  2. Are you getting any error from Subscribe() (why aren't you checking this)?

  3. How long have you waited to see messages consumed?<br/><br/>It can take a few seconds for a broker to accept a new consumer into a group; with a 0ms timeout, you may see lots of "no message" events before you eventually start receiving any waiting messages.

For a minimal, working example, I'd suggest keeping things as simple as possible:

  • You don't need to configure go.events.channel.enable if you are using Poll() to read messages

  • You don't need to configure go.application.rebalance.enable if you aren't interested in, and don't need to modify, initial offsets.

  • If you aren't interested in events such as PartitionEOF etc (and you likely aren't) then you might want to consider using the higher-level consumer.ReadMessage() function rather than Poll() (ReadMessage returns only messages or errors and ignores all other events).

答案2

得分: 0

我遇到了这个确切的问题,该问题在库中有记录。

go.events.channel.enable(布尔型,默认为false)-【已弃用】启用Events()通道。消息和事件将被推送到Events()通道,而Poll()接口将被禁用。
go.events.channel.size(整型,默认为1000)- Events()通道的大小。
go.logs.channel.enable(布尔型,默认为false)- 将日志转发到Logs()通道。
go.logs.channel(chan kafka.LogEvent,默认为nil)- 将日志转发到应用程序提供的通道,而不是Logs()通道。需要设置go.logs.channel.enable=true。

警告:由于通道(以及队列)的缓冲特性,使用事件通道可能会收到过时的事件和消息。将go.events.channel.size最小化可以减少过时事件和消息的风险和数量,但无法完全消除这个因素。当通道大小为1时,最多只有一个事件或消息可能过时。

不要将go.events.channel.enable设置为true

英文:

I have faced this exact issue the problem is documented in the library

//   go.events.channel.enable (bool, false) - [deprecated] Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled.
//   go.events.channel.size (int, 1000) - Events() channel size
//   go.logs.channel.enable (bool, false) - Forward log to Logs() channel.
//   go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true.
//
// WARNING: Due to the buffering nature of channels (and queues in general) the
// use of the events channel risks receiving outdated events and
// messages. Minimizing go.events.channel.size reduces the risk
// and number of outdated events and messages but does not eliminate
// the factor completely. With a channel size of 1 at most one
// event or message may be outdated.

Dont set go.events.channel.enable to true

huangapple
  • 本文由 发表于 2022年1月4日 00:02:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/70568398.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定