使用Go语言向firehose插入数据

huangapple go评论178阅读模式
英文:

Inserting into firehose with Go

问题

我有以下的JSON文件:

  1. {
  2. "@timestamp": "2021-11-19T21:32:55.196Z",
  3. "@version": "1",
  4. "message": "Manual test to firehose."
  5. }

我有以下的Go函数:

  1. func insertIntoFireHose(sess *session.Session, hoseName string) {
  2. svc := firehose.New(sess, aws.NewConfig().WithRegion("us-east-1"))
  3. //firehoseData := getObjectFromS3Bucket(sess)
  4. firehoseData, _ := os.ReadFile("/temp/test.json")
  5. var rec firehose.Record
  6. var recInput firehose.PutRecordInput
  7. dataJson, _ := json.Marshal(firehoseData)
  8. rec.SetData(dataJson)
  9. recInput.SetDeliveryStreamName(hoseName)
  10. recInput.SetRecord(&rec)
  11. res, err1 := svc.PutRecord(&recInput)
  12. if err1 != nil {
  13. log.Fatal(err1)
  14. }
  15. fmt.Println(res)
  16. }

我想要做的是获取一个文件并将其插入到firehose中,但是我得到了以下错误信息:

  1. {"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":
  2. {"type":"not_x_content_exception",
  3. "reason":"not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}

我不太确定我做错了什么。

将记录更改为直接从文件获取数据会返回以下错误:

  1. One or more records are malformed. Please ensure that each record is single valid JSON object and that it does not contain newlines.
英文:

I have the following JSON file

  1. {
  2. "@timestamp": "2021-11-19T21:32:55.196Z",
  3. "@version": "1",
  4. "message": "Manual test to firehose."
  5. }

And I have the following Go function

  1. func insertIntoFireHose(sess *session.Session, hoseName string) {
  2. svc := firehose.New(sess, aws.NewConfig().WithRegion("us-east-1"))
  3. //firehoseData := getObjectFromS3Bucket(sess)
  4. firehoseData, _ := os.ReadFile("/temp/test.json")
  5. var rec firehose.Record
  6. var recInput firehose.PutRecordInput
  7. dataJson, _ := json.Marshal(firehoseData)
  8. rec.SetData(dataJson)
  9. recInput.SetDeliveryStreamName(hoseName)
  10. recInput.SetRecord(&rec)
  11. res, err1 := svc.PutRecord(&recInput)
  12. if err1 != nil {
  13. log.Fatal(err1)
  14. }
  15. fmt.Println(res)
  16. }

What I want to do is get a file and insert it into the firehose, but I get this error message:

  1. {"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":
  2. {"type":"not_x_content_exception",
  3. "reason":"not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}

And I'm not quite sure what I'm doing wrong.

Changing the record to get data directly from the file returns this error:

  1. One or more records are malformed. Please ensure that each record is single valid JSON object and that it does not contain newlines.

答案1

得分: 2

{
"@timestamp": "2021-11-19T21:32:55.196Z",
"@version": "1",
"message": "Manual test to firehose."
}
我认为这不是一个有效的JSON,第3行有一个尾随逗号。

这是有效的JSON:
{
"@timestamp": "2021-11-19T21:32:55.196Z",
"@version": "1",
"message": "Manual test to firehose."
}

而且firehoseData已经是[]byte类型,所以我认为你不需要再次使用json.Marshal

这是marshal的结果:

代码:
package main

import (
"encoding/json"
"fmt"
"os"
)

func main() {
firehoseData, _ := os.ReadFile("./file.json") // 相同的值

  1. fmt.Printf("%+v\n", string(firehoseData))
  2. test, err := json.Marshal(firehoseData)
  3. fmt.Printf("%+v\n", string(test))
  4. fmt.Printf("%+v\n", err)

}

输出:
{
"@timestamp": "2021-11-19T21:32:55.196Z",
"@version": "1",
"message": "Manual test to firehose.",
}
"ew0KICAgICJAdGltZXN0YW1wIjogIjIwMjEtMTEtMTlUMjE6MzI6NTUuMTk2WiIsDQogICAgIkB2ZXJzaW9uIjogIjEiLA0KICAgICJtZXNzYWdlIjogIk1hbnVhbCB0ZXN0IHRvIGZpcmVob3NlLiIsDQp9"

英文:
  1. {
  2. "@timestamp": "2021-11-19T21:32:55.196Z",
  3. "@version": "1",
  4. "message": "Manual test to firehose.",
  5. }

I don't think this is a valid JSON, it has a trailing comma in the 3rd line.

this is the valid one

  1. {
  2. "@timestamp": "2021-11-19T21:32:55.196Z",
  3. "@version": "1",
  4. "message": "Manual test to firehose."
  5. }

and firehoseData is already []byte, so I think you don't need to json.Marshal it again.

This is the result of the marshal

code:

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "os"
  6. )
  7. func main() {
  8. firehoseData, _ := os.ReadFile("./file.json") // same value
  9. fmt.Printf("%+v\n", string(firehoseData))
  10. test, err := json.Marshal(firehoseData)
  11. fmt.Printf("%+v\n", string(test))
  12. fmt.Printf("%+v\n", err)
  13. }

output:

  1. {
  2. "@timestamp": "2021-11-19T21:32:55.196Z",
  3. "@version": "1",
  4. "message": "Manual test to firehose.",
  5. }
  6. "ew0KICAgICJAdGltZXN0YW1wIjogIjIwMjEtMTEtMTlUMjE6MzI6NTUuMTk2WiIsDQogICAgIkB2ZXJzaW9uIjogIjEiLA0KICAgICJtZXNzYWdlIjogIk1hbnVhbCB0ZXN0IHRvIGZpcmVob3NlLiIsDQp9"
  7. <nil>

huangapple
  • 本文由 发表于 2021年11月23日 22:50:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/70082976.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定