英文:
How to read all messages from kafka using segmentio/kafka-go?
问题
我运行了segmentio/kafka-go
包文档中的示例,但每次只能获取一条消息。
有没有办法一次读取Kafka中累积的所有消息,并立即解析为[]MyType
类型?
func main() {
// 消费消息
kafkaBrokerUrl := "localhost:9092"
topic := "test"
conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrl, topic, 0)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
batch := conn.ReadBatch(10e3, 1e6) // 每次获取10KB最小,1MB最大
b := make([]byte, 10e3) // 每条消息最大10KB
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
}
英文:
I run the example from the package documentation segmentio/kafka-go
, but in it I get 1 message at a time.
Is there any way to read all the messages that have accumulated in Kafka at a time and parse them immediately into []MyType
?
func main() {
// to consume messages
kafkaBrokerUrl := "localhost:9092"
topic := "test"
conn, err := kafka.DialLeader(context.Background(), "tcp", kafkaBrokerUrl, topic, 0)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
b := make([]byte, 10e3) // 10KB max per message
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
}
答案1
得分: 2
Kafka在批处理方面并不起作用,如果你指的是类似滑动窗口机制来获取一段时间内的一批流数据,最好使用Apache Kafka和Apache Flink的连接器。Flink提供了滑动窗口机制、HDFS存储,并且通过检查点提供了更好的容错性。不幸的是,目前可用的Flink Go SDK并不稳定。
英文:
Kafka doesn't help in batch processing , if you mean something like sliding window mechanism to take a batch of streams for a particular time ,size better to use a apache kafka and apache flink connector , flink provides sliding window mechanism , hdfs storage and moreover flink gives a much better fault tolerance by checkpoints . Unfortunately the flink go sdk available is unstable.
答案2
得分: 0
你可以将消息中的'[]Type'替换为'[]类型',并将其解码为'[]类型'。
var ts []类型 // 数据
// 编码数据
var bu bytes.Buffer
enc := gob.NewEncoder(&bu)
err := enc.Encode(ts)
if err != nil {
log.Fatal("编码错误:", err)
}
// 提交消息或多条消息
msg := kafka.Message{
Key: key,
Value: bu.Bytes(),
}
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP([]string{Broker1}...),
Topic: Topic,
Balancer: &kafka.LeastBytes{},
}
err := kafkaWriter.WriteMessages(ctx,
msg,
)
if err != nil {
return err
}
请注意,这只是代码的翻译部分,不包括任何其他内容。
英文:
You can easily submit '[]Type' as the message instead of 'Type'
also, decode it as '[]Type'.
var ts []Type //data
// encode the data
var bu bytes.Buffer
enc := gob.NewEncoder(&bu)
err := enc.Encode(ts)
if err != nil {
log.Fatal("encode error:", err)
}
//Submit the message or messages
msg := kafka.Message{
Key: key,
Value: bu.Bytes(),
}
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP([]string{Broker1}...),
Topic: Topic,
Balancer: &kafka.LeastBytes{},
}
err := kafkaWriter.WriteMessages(ctx,
msg,
)
if err != nil {
return err
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论