英文:
Failed to insert bulk data in elasticsearch Olivere Package
问题
我正在尝试插入大量数据。除了响应属性错误:true之外,我没有收到任何错误。以下是我的代码片段。
func BulkInsert(ctx context.Context, products []model.Pt) error {
Counter++
// 从客户端创建新的批量处理器
bulkProcessor, err := ElasticVar.BulkProcessor().
Name("Bulk Worker 1").
Workers(5).
BulkActions(1000).
BulkSize(2 << 20).
FlushInterval(1 * time.Second).
Stats(true).
After(after).
Do(context.Background())
if err != nil {
log.Println("NewBulkProcessorService err", err)
return err
}
defer bulkProcessor.Close()
storeIds := map[string]string{}
// 入队文档
for _, product := range products {
if _, ok := storeIds[product.Id]; ok {
continue
} else {
storeIds[product.StoreId] = product.StoreId
}
dataJSON, err := json.Marshal(product)
if err != nil {
// 查找失败的文档,例如重新提交
log.Println("elastic marshal failed", err)
return errors.New("elastic marshal failed")
}
bulkProcessor.Add(elastic.NewBulkIndexRequest().
OpType("index").
Index(Index).
Type(Type).
Id(product.Id).
Doc(string(dataJSON)))
}
stats := bulkProcessor.Stats()
fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
fmt.Printf("Number of requests indexed : %d\n", stats.Indexed)
fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)
fmt.Printf("\n\n")
for i, w := range stats.Workers {
fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
fmt.Printf(" Last response time : %v\n", w.LastDuration)
}
fmt.Printf(`Inserted Count: %d`, Counter)
return nil
}
func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
log.Printf("bulk commit failed, err: %v\n", err)
}
// 在批量提交成功的情况下,可以进行任何操作
log.Printf("commit response=%v\n", response.Errors)
log.Printf("commit successfully, len(requests)=%d\n", len(requests))
}
我正在批量添加所有数据。
在响应后的回调函数中,错误为true。当我查询数据时,没有任何记录被插入。请帮忙解决。
英文:
I am trying to insert bulk data. I am not getting any error except for in response property error: true. Here is my code snippet.
func BulkInsert(ctx context.Context, products []model.Pt) error {
Counter++
// create new bulk processor from client
bulkProcessor, err := ElasticVar.BulkProcessor().
Name("Bulk Worker 1").
Workers(5).
BulkActions(1000).
BulkSize(2 << 20).
FlushInterval(1 * time.Second).
Stats(true).
After(after).
Do(context.Background())
if err != nil {
log.Println("NewBulkProcessorService err", err)
return err
}
defer bulkProcessor.Close()
storeIds := map[string]string{}
// Enqueue the document
for _, product := range products {
if _, ok := storeIds[product.Id]; ok {
continue
} else {
storeIds[product.StoreId] = product.StoreId
}
dataJSON, err := json.Marshal(product)
if err != nil {
// Look up the failed documents with res.Failed(), and e.g. recommit
log.Println("elastic marshal failed", err)
return errors.New("elastic marshal failed")
}
bulkProcessor.Add(elastic.NewBulkIndexRequest().
OpType("index").
Index(Index).
Type(Type).
Id(product.Id).
Doc(string(dataJSON)))
}
stats := bulkProcessor.Stats()
fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
fmt.Printf("Number of requests indexed : %d\n", stats.Indexed)
fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)
fmt.Printf("\n\n")
for i, w := range stats.Workers {
fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
fmt.Printf(" Last response time : %v\n", w.LastDuration)
}
fmt.Printf(`Inserted Count: %d`, Counter)
return nil
}
func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
log.Printf("bulk commit failed, err: %v\n", err)
}
// do what ever you want in case bulk commit success
log.Printf("commit response=%v\n", response.Errors)
log.Printf("commit successfully, len(requests)=%d\n", len(requests))
}
I am adding all the data in bulk.
In the callback after the response. error is true. When I query the data. None of the records is inserted. Please help.
答案1
得分: 0
我能够通过访问创建的方法中的错误字符串来调试它。
for _, info := range response.Created() {
fmt.Println("nBulk response created error:", info.Error)
fmt.Println("nBulk response created: error reason4", info.Error.Reason)
fmt.Println("nBulk response IndexProduct:", info.IndexProduct)
}
问题是我传递的Type是"product"而不是"doc"。
英文:
I was able to debug it by access the error string through created mether
for _, info := range response.Created() {
fmt.Println("nBulk response created error:", info.Error)
fmt.Println("nBulk response created: error reason4", info.Error.Reason)
fmt.Println("nBulk response IndexProduct:", info.IndexProduct)
}
The problem was the I was passing Type = "product" instead of Type = "doc"
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论