英文:
Go elasticsearch insert bulk data from incoming pulsar
问题
我必须使用goelastic库从Pulsar中批量插入数据。但是我遇到了一个问题。
首先,Pulsar每次以部分批量的方式发送1000条数据。然后,当我插入到Elasticsearch时,有时会出现问题。这个问题如下所示。这个问题导致数据丢失。谢谢回答...
ERROR: circuit_breaking_exception: [parent] Data too large, data for [indices:data/write/bulk
这部分是批量代码。
func InsertElastic(y []models.CP, ElasticStruct *config.ElasticStruct) {
fmt.Println("------------------")
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: enum.IndexName,
Client: ElasticStruct.Client,
FlushBytes: 10e+6,
})
if err != nil {
panic(err)
}
start := time.Now().UTC()
for _, x := range y {
data, err := json.Marshal(x)
if err != nil {
panic(err)
}
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(data),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
i++
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
log.Fatalf("Unexpected error: %s", err)
}
x++
}
if err := bi.Close(context.Background()); err != nil {
log.Fatalf("Unexpected error: %s", err)
}
dur := time.Since(start)
fmt.Println(dur)
fmt.Println("Success writing data to elastic : ", i)
fmt.Println("Success incoming data from pulsar : ", x)
fmt.Println("Difference : ", x-i)
fmt.Println("Now : ", time.Now().UTC().String())
if i < x {
fmt.Println("FATAL")
}
fmt.Println("------------------")
}
英文:
I have to use goelastic library inserting the datas bulkly from coming pulsar. But i have a problem.
Firstly, pulsar send 1000 datas per partial bulkly. Then when i insert the elastic, there are a problem sometimes. This problem is attached. This problem cause data loss. Thanks for answer...
ERROR: circuit_breaking_exception: [parent] Data too large, data for [indices:data/write/bulk
This section is bulk code.
func InsertElastic(y []models.CP, ElasticStruct *config.ElasticStruct) {
fmt.Println("------------------")
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: enum.IndexName,
Client: ElasticStruct.Client,
FlushBytes: 10e+6,
})
if err != nil {
panic(err)
}
start := time.Now().UTC()
for _, x := range y {
data, err := json.Marshal(x)
if err != nil {
panic(err)
}
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
Action: "index",
Body: bytes.NewReader(data),
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
i++
},
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
log.Fatalf("Unexpected error: %s", err)
}
x++
}
if err := bi.Close(context.Background()); err != nil {
log.Fatalf("Unexpected error: %s", err)
}
dur := time.Since(start)
fmt.Println(dur)
fmt.Println("Success writing data to elastic : ", i)
fmt.Println("Success incoming data from pulsar : ", x)
fmt.Println("Difference : ", x-i)
fmt.Println("Now : ", time.Now().UTC().String())
if i < x {
fmt.Println("FATAL")
}
fmt.Println("------------------")
}
答案1
得分: 0
Tldr;
看起来你的节点上的JVM堆内存不足。
你正在触发一个断路器,以避免Elasticsearch内存溢出(OOM)。
解决方案
- 增加JVM内存,你可以在这里找到一些关于调整节点大小的文档。
- 减小批量请求的大小。
英文:
Tldr;
It seems like you do not have enough JVM heap on your node.
You are hitting a circuit breaker to avoid Elasticsearch to be Out Of Memory(OOM).
Solution(s)
- Increase the JVM memory, you will find here some documentation to size your nodes.
- Smaller bulk request
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论