使用segmentio/kafka-go如何读取kafka中的所有消息?

huangapple go评论145阅读模式
英文:

How to read all messages from kafka using segmentio/kafka-go?

问题

我运行了segmentio/kafka-go包文档中的示例,但每次只能获取一条消息。

有没有办法一次读取Kafka中累积的所有消息,并立即解析为[]MyType类型?

func main() {

	// 消费消息
	kafkaBrokerUrl := "localhost:9092"
	topic := "test"

	conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrl, topic, 0)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	batch := conn.ReadBatch(10e3, 1e6) // 每次获取10KB最小,1MB最大

	b := make([]byte, 10e3) // 每条消息最大10KB
	for {
		n, err := batch.Read(b)
		if err != nil {
			break
		}
		fmt.Println(string(b[:n]))
	}
}
英文:

I run the example from the package documentation segmentio/kafka-go, but in it I get 1 message at a time.

Is there any way to read all the messages that have accumulated in Kafka at a time and parse them immediately into []MyType?


func main() {

	// to consume messages
	kafkaBrokerUrl := "localhost:9092"
	topic := "test"

	conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrl, topic, 0)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

	b := make([]byte, 10e3) // 10KB max per message
	for {
		n, err := batch.Read(b)
		if err != nil {
			break
		}
		fmt.Println(string(b[:n]))
	}
}

答案1

得分: 2

Kafka在批处理方面并不起作用,如果你指的是类似滑动窗口机制来获取一段时间内的一批流数据,最好使用Apache Kafka和Apache Flink的连接器。Flink提供了滑动窗口机制、HDFS存储,并且通过检查点提供了更好的容错性。不幸的是,目前可用的Flink Go SDK并不稳定。

英文:

Kafka doesn't help in batch processing , if you mean something like sliding window mechanism to take a batch of streams for a particular time ,size better to use a apache kafka and apache flink connector , flink provides sliding window mechanism , hdfs storage and moreover flink gives a much better fault tolerance by checkpoints . Unfortunately the flink go sdk available is unstable.

答案2

得分: 0

你可以将消息中的'[]Type'替换为'[]类型',并将其解码为'[]类型'。

var ts []类型 // 数据

// 编码数据
var bu bytes.Buffer
enc := gob.NewEncoder(&bu)
err := enc.Encode(ts)
if err != nil {
    log.Fatal("编码错误:", err)
}

// 提交消息或多条消息
msg := kafka.Message{
    Key:   key,
    Value: bu.Bytes(),
}
kafkaWriter := &kafka.Writer{
    Addr:     kafka.TCP([]string{Broker1}...),
    Topic:    Topic,
    Balancer: &kafka.LeastBytes{},
}
err := kafkaWriter.WriteMessages(ctx,
    msg,
)
if err != nil {
    return err
}

请注意,这只是代码的翻译部分,不包括任何其他内容。

英文:

You can easily submit '[]Type' as the message instead of 'Type'
also, decode it as '[]Type'.

var ts []Type //data

// encode the data 
var bu bytes.Buffer        
enc := gob.NewEncoder(&bu) 
err := enc.Encode(ts)
if err != nil {
	log.Fatal("encode error:", err)
}

//Submit the message or messages
msg := kafka.Message{
		Key:   key,
		Value: bu.Bytes(),
	}
kafkaWriter := &kafka.Writer{
	Addr:     kafka.TCP([]string{Broker1}...),
	Topic:    Topic,
	Balancer: &kafka.LeastBytes{},
}
err := kafkaWriter.WriteMessages(ctx,
	msg,
)
if err != nil {
	return err
}

huangapple
  • 本文由 发表于 2022年8月17日 19:56:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/73388071.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定