使用segmentio/kafka-go如何读取kafka中的所有消息?

huangapple go评论111阅读模式
英文:

How to read all messages from kafka using segmentio/kafka-go?

问题

我运行了segmentio/kafka-go包文档中的示例,但每次只能获取一条消息。

有没有办法一次读取Kafka中累积的所有消息,并立即解析为[]MyType类型?

  1. func main() {
  2. // 消费消息
  3. kafkaBrokerUrl := "localhost:9092"
  4. topic := "test"
  5. conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrl, topic, 0)
  6. if err != nil {
  7. log.Fatal("failed to dial leader:", err)
  8. }
  9. batch := conn.ReadBatch(10e3, 1e6) // 每次获取10KB最小,1MB最大
  10. b := make([]byte, 10e3) // 每条消息最大10KB
  11. for {
  12. n, err := batch.Read(b)
  13. if err != nil {
  14. break
  15. }
  16. fmt.Println(string(b[:n]))
  17. }
  18. }
英文:

I run the example from the package documentation segmentio/kafka-go, but in it I get 1 message at a time.

Is there any way to read all the messages that have accumulated in Kafka at a time and parse them immediately into []MyType?

  1. func main() {
  2. // to consume messages
  3. kafkaBrokerUrl := "localhost:9092"
  4. topic := "test"
  5. conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrl, topic, 0)
  6. if err != nil {
  7. log.Fatal("failed to dial leader:", err)
  8. }
  9. batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
  10. b := make([]byte, 10e3) // 10KB max per message
  11. for {
  12. n, err := batch.Read(b)
  13. if err != nil {
  14. break
  15. }
  16. fmt.Println(string(b[:n]))
  17. }
  18. }

答案1

得分: 2

Kafka在批处理方面并不起作用,如果你指的是类似滑动窗口机制来获取一段时间内的一批流数据,最好使用Apache Kafka和Apache Flink的连接器。Flink提供了滑动窗口机制、HDFS存储,并且通过检查点提供了更好的容错性。不幸的是,目前可用的Flink Go SDK并不稳定。

英文:

Kafka doesn't help in batch processing , if you mean something like sliding window mechanism to take a batch of streams for a particular time ,size better to use a apache kafka and apache flink connector , flink provides sliding window mechanism , hdfs storage and moreover flink gives a much better fault tolerance by checkpoints . Unfortunately the flink go sdk available is unstable.

答案2

得分: 0

你可以将消息中的'[]Type'替换为'[]类型',并将其解码为'[]类型'。

  1. var ts []类型 // 数据
  2. // 编码数据
  3. var bu bytes.Buffer
  4. enc := gob.NewEncoder(&bu)
  5. err := enc.Encode(ts)
  6. if err != nil {
  7. log.Fatal("编码错误:", err)
  8. }
  9. // 提交消息或多条消息
  10. msg := kafka.Message{
  11. Key: key,
  12. Value: bu.Bytes(),
  13. }
  14. kafkaWriter := &kafka.Writer{
  15. Addr: kafka.TCP([]string{Broker1}...),
  16. Topic: Topic,
  17. Balancer: &kafka.LeastBytes{},
  18. }
  19. err := kafkaWriter.WriteMessages(ctx,
  20. msg,
  21. )
  22. if err != nil {
  23. return err
  24. }

请注意,这只是代码的翻译部分,不包括任何其他内容。

英文:

You can easily submit '[]Type' as the message instead of 'Type'
also, decode it as '[]Type'.

  1. var ts []Type //data
  2. // encode the data
  3. var bu bytes.Buffer
  4. enc := gob.NewEncoder(&bu)
  5. err := enc.Encode(ts)
  6. if err != nil {
  7. log.Fatal("encode error:", err)
  8. }
  9. //Submit the message or messages
  10. msg := kafka.Message{
  11. Key: key,
  12. Value: bu.Bytes(),
  13. }
  14. kafkaWriter := &kafka.Writer{
  15. Addr: kafka.TCP([]string{Broker1}...),
  16. Topic: Topic,
  17. Balancer: &kafka.LeastBytes{},
  18. }
  19. err := kafkaWriter.WriteMessages(ctx,
  20. msg,
  21. )
  22. if err != nil {
  23. return err
  24. }

huangapple
  • 本文由 发表于 2022年8月17日 19:56:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/73388071.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定