英文:
Send data in cloudevents format to Kafka topic
问题
你好!根据你的描述,你想将数据以CloudEvents格式发送到Kafka。你提到的代码中,event.SetData(cloudevents.ApplicationJSON, j)
这一行将数据以JSON格式设置到了CloudEvents事件中。然而,你遇到了将CloudEvents事件发送到Kafka的问题。
要解决这个问题,你需要将CloudEvents事件序列化为字符串,然后将其作为消息发送到Kafka。你可以使用json.Marshal
函数将事件序列化为JSON字符串,然后将其设置为ProducerMessage
的值。
以下是修改后的代码示例:
j, err := json.Marshal(event)
if err != nil {
log.Fatal(err)
}
producerMsg := &sarama.ProducerMessage{
Topic: tName,
Value: sarama.StringEncoder(j),
}
_, _, err = producer.SendMessage(producerMsg)
这样,你就可以将CloudEvents事件以字符串形式发送到Kafka了。希望对你有帮助!如果还有其他问题,请随时提问。
英文:
Right now I have this code and it works fine. (It sends some json format data to Kafka topic)
j, err := json.Marshal(data)
if err != nil {
log.Fatal(err)
}
msg := &sarama.ProducerMessage{
Topic: tName,
Value: sarama.StringEncoder(j),
}
_, _, err = producer.SendMessage(msg)
but somebody wishes ho have this data in cloudevents format. -> https://github.com/cloudevents/sdk-go
so what should I do because this Event structure can not be directly casted to string.
type Event struct {
Context EventContext
DataEncoded []byte
// DataBase64 indicates if the event, when serialized, represents
// the data field using the base64 encoding.
// In v0.3, this field is superseded by DataContentEncoding
DataBase64 bool
FieldErrors map[string]error
}
so this code won't even compile.
j, err := json.Marshal(data)
if err != nil {
log.Fatal(err)
}
//...
event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
event.SetData(cloudevents.ApplicationJSON, j)
producerMsg := &sarama.ProducerMessage{
Topic: s.outputTopic,
Value: sarama.StringEncoder(event),
}
_, _, err = s.producer.SendMessage(producerMsg)
What should I do to send this Event to Kafka? Try to cast event.DataEncoded to string or something like that?
btw.
Programming language is golang.
答案1
得分: 2
你是否看到了文档中关于序列化事件的部分?
https://github.com/cloudevents/sdk-go#serializedeserialize-a-cloudevent
event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
// 这里的data是一个map[string]interface{},或者其他表示上述"example.type"模式类型的结构体类型
event.SetData(cloudevents.ApplicationJSON, data)
bytes, err := json.Marshal(event)
if err != nil {
log.Fatal(err)
}
producerMsg := &sarama.ProducerMessage{
Topic: s.outputTopic,
Value: bytes, // 你已经编码了事件
}
否则,请确保查看使用CloudEvent客户端的示例代码 https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/sender/main.go
sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
defer sender.Close(context.Background())
c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
event := cloudevents.NewEvent()
event.Set...
c.Send(..., event)
...
英文:
Did you see the section of the docs that serialized the event?
https://github.com/cloudevents/sdk-go#serializedeserialize-a-cloudevent
event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
// data here is a map[string] interface{}, or some other Struct type representing the "example.type" schema type above
event.SetData(cloudevents.ApplicationJSON, data)
bytes, err := json.Marshal(event)
if err != nil {
log.Fatal(err)
}
producerMsg := &sarama.ProducerMessage{
Topic: s.outputTopic,
Value: bytes, // you've already encoded the event
}
Otherwise, be sure to look at the sample code provided that uses the CloudEvent client https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/sender/main.go
sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
defer sender.Close(context.Background())
c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
event := cloudevents.NewEvent()
event.Set...
c.Send(..., event)
...
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论