重新从Goroutine连接到Kafka

huangapple go评论89阅读模式
英文:

Reconnect to kafka from Goroutine

问题

我想使用Golang向Kafka写入消息。我有以下代码。

package kafkaK

import (
	"context"
	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/snappy"
	"time"
)

var writer *kafka.Writer

func Configure() (w *kafka.Writer, err error) {
	dialer := &kafka.Dialer{
		Timeout:  10 * time.Second,
		ClientID: "123",
	}

	config := kafka.WriterConfig{
		Brokers:          []string{"localhost:9092"},
		Topic:            "test",
		Balancer:         &kafka.LeastBytes{},
		Dialer:           dialer,
		WriteTimeout:     10 * time.Second,
		ReadTimeout:      10 * time.Second,
		CompressionCodec: snappy.NewCompressionCodec(),
	}
	w = kafka.NewWriter(config)
	writer = w
	return w, nil
}

func Push(parent context.Context, key, value []byte) (err error) {
	message := kafka.Message{
		Key:   key,
		Value: value,
		Time:  time.Now(),
	}
	return writer.WriteMessages(parent, message)
}

我在单独的Goroutine中写入到Kafka。

func sendMessage(message string) {
	err := kafkaK.Push(context.Background(), nil, []byte(message))
	if err != nil {
		fmt.Println(err)
	}
}

调用看起来像这样。

go sendMessage("message #" + strconv.Itoa(i))

1)有时候Kafka不可用,我想重新连接到它。如果不同的Goroutine使用相同的kafka对象,并且只有它们知道发生了错误并且可以发起重新连接,应该如何正确地处理?也许还有其他方法吗?
2)使用哪个库更好地与Kafka一起使用?

我认为可以使用通道或上下文,但我对Go还不太熟悉,所以还不太明白如何实现。

英文:

I want to write a message to kafka using Golang. I have the following code.

package kafkaK

import (
	"context"
	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/snappy"
	"time"
)

var writer *kafka.Writer

func Configure() (w *kafka.Writer, err error) {
	dialer := &kafka.Dialer{
		Timeout:  10 * time.Second,
		ClientID: "123",
	}

	config := kafka.WriterConfig{
		Brokers:          []string{"localhost:9092"},
		Topic:            "test",
		Balancer:         &kafka.LeastBytes{},
		Dialer:           dialer,
		WriteTimeout:     10 * time.Second,
		ReadTimeout:      10 * time.Second,
		CompressionCodec: snappy.NewCompressionCodec(),
	}
	w = kafka.NewWriter(config)
	writer = w
	return w, nil
}

func Push(parent context.Context, key, value []byte) (err error) {
	message := kafka.Message{
		Key:   key,
		Value: value,
		Time:  time.Now(),
	}
	return writer.WriteMessages(parent, message)
}

I write to kafka in separate Goroutines.

func sendMessage(message string) {
	err := kafkaK.Push(context.Background(), nil, []byte(message))
	if err != nil {
		fmt.Println(err)
	}
}

The call looks like

go sendMessage("message #" + strconv.Itoa(i))
  1. It happens that kafka is unavailable, and I would like to reconnect to it. How to do it correctly if different Goroutines use the same kafka object, and only they know that an error has occurred and can initiate reconnection. Perhaps there is some other approach?
  2. Which library is better to use for working with kafka?

I think it's possible to use a channel or context, but I'm new to go, so I don't really understand how it can be implemented yet

答案1

得分: 3

首先,我想指出你在使用kafka-go中使用了一个已弃用的接口。这个文档指定了你可以使用kafkago.Writer{}来提供配置以连接和写入你的kafka集群。现在来回答你的问题:

发生kafka不可用的情况,我想重新连接它。如果不同的Goroutines使用相同的kafka对象,并且只有它们知道发生了错误并可以启动重新连接,应该如何正确地处理?也许还有其他方法吗?

使用像kafka-go这样的高级库,你不必担心处理连接或瞬态错误,比如kafka集群的临时不可用性。当你在kafka writer上使用WriteMessages方法时,它会缓冲你的消息,并在使用你提供的配置连接到代理后定期刷新。因此,只要配置正确,kafka-go会为你处理剩下的事情。

哪个库更适合与kafka一起使用?

kafka-go绝对是广泛使用和积极维护的用于读写kafka的库。

另外,回到你的示例,这是我会如何重构它的代码:

package kafkak

import (
	"context"
	"fmt"
	"github.com/segmentio/kafka-go"
	"sync"
	"time"
)

// 定义一个接口,以便你可以为测试而模拟它。
type kproducer interface {
	WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

type KafkaK struct {
	p      kproducer
	dataCh chan kafka.Message // 将消息写入此通道,以便单独的goroutine可以处理它们
	errCh  chan error // 从此通道获取错误
	stopCh chan struct{}
}

func NewKafkaK(p kproducer) *KafkaK {
	k := &KafkaK{
		p:      p,
		dataCh: make(chan kafka.Message),
		errCh:  make(chan error),
		stopCh: make(chan struct{}),
	}

	go k.processLoop()

	return k
}

func (k *KafkaK) Push(key, value string) {
	m := kafka.Message{
		Key:   []byte(key),
		Value: []byte(value),
		Time:  time.Now(),
	}

	k.dataCh <- m
}

func (k *KafkaK) processLoop() {
	for {
		select {
		case msg := <-k.dataCh:
			ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*5))
			err := k.p.WriteMessages(ctx, msg)
			k.errCh <- err
		case <-k.stopCh:
			break
		}
	}
}

func (k *KafkaK) Stop() {
	close(k.stopCh)
	close(k.errCh)
}

func (k *KafkaK) Errs() <-chan error {
	return k.errCh
}
英文:

First I want to point out that you're using a deprecated interface in kafka-go. This documentation specifies that you can use kafkago.Writer{} to supply the config to connect and write to you kafka cluster. Now to answer your questions:

> It happens that kafka is unavailable, and I would like to reconnect to it. How to do it correctly if different Goroutines use the same kafka object, and only they know that an error has occurred and can initiate reconnection. Perhaps there is some other approach?

With a high level library like kafka-go, you don't have to worry about handling connections or transient errors like temporary unavailability of your kafka cluster. When you use the WriteMessages method on kafka writer, it buffers your messages and flushes periodically after connecting to your brokers using the config you provide to it. So, as long as the config is correct kafka-go will handle the rest for you.

> Which library is better to use for working with kafka?

kafka-go is definitely widely used and actively maintained for reading and writing to kafka.

Also, back to your example, this is how I would refactor it:

package kafkak
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;github.com/segmentio/kafka-go&quot;
&quot;sync&quot;
&quot;time&quot;
)
// Define an interface so you can mock it for testing.
type kproducer interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}
type KafkaK struct {
p      kproducer
dataCh chan kafka.Message // Write messages to this channel so a separate go routine can process them
errCh  chan error // Fetch errors from this channel
stopCh chan struct{}
}
func NewKafkaK(p kproducer) *KafkaK {
k := &amp;KafkaK{
p:      p,
dataCh: make(chan kafka.Message),
errCh:  make(chan error),
stopCh: make(chan struct{}),
}
go k.processLoop()
return k
}
func (k *KafkaK) Push(key, value string) {
m := kafka.Message{
Key:   []byte(key),
Value: []byte(value),
Time:  time.Now(),
}
k.dataCh &lt;- m
}
func (k *KafkaK) processLoop() {
for {
select {
case msg := &lt;-k.dataCh:
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*5))
err := k.p.WriteMessages(ctx, msg)
k.errCh &lt;- err
case &lt;-k.stopCh:
break
}
}
}
func (k *KafkaK) Stop() {
close(k.stopCh)
close(k.errCh)
}
func (k *KafkaK) Errs() &lt;-chan error {
return k.errCh
}

huangapple
  • 本文由 发表于 2023年5月4日 03:16:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/76167316.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定