在Elasticsearch Olivere包中无法插入批量数据。

huangapple go评论166阅读模式
英文:

Failed to insert bulk data in elasticsearch Olivere Package

问题

我正在尝试插入大量数据。除了响应属性错误:true之外,我没有收到任何错误。以下是我的代码片段。

func BulkInsert(ctx context.Context, products []model.Pt) error {
    Counter++
    // 从客户端创建新的批量处理器
    bulkProcessor, err := ElasticVar.BulkProcessor().
        Name("Bulk Worker 1").
        Workers(5).
        BulkActions(1000).
        BulkSize(2 << 20).
        FlushInterval(1 * time.Second).
        Stats(true).
        After(after).
        Do(context.Background())
    if err != nil {
        log.Println("NewBulkProcessorService err", err)
        return err
    }
    defer bulkProcessor.Close()
    storeIds := map[string]string{}
    // 入队文档
    for _, product := range products {
        if _, ok := storeIds[product.Id]; ok {
            continue
        } else {
            storeIds[product.StoreId] = product.StoreId
        }

        dataJSON, err := json.Marshal(product)
        if err != nil {
            // 查找失败的文档,例如重新提交
            log.Println("elastic marshal failed", err)
            return errors.New("elastic marshal failed")
        }
        bulkProcessor.Add(elastic.NewBulkIndexRequest().
            OpType("index").
            Index(Index).
            Type(Type).
            Id(product.Id).
            Doc(string(dataJSON)))
    }
    stats := bulkProcessor.Stats()
    fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
    fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
    fmt.Printf("Number of requests indexed            : %d\n", stats.Indexed)
    fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
    fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
    fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
    fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)
    fmt.Printf("\n\n")
    for i, w := range stats.Workers {
        fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
        fmt.Printf("           Last response time       : %v\n", w.LastDuration)
    }
    fmt.Printf(`Inserted Count: %d`, Counter)
    return nil
}

func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
    if err != nil {
        log.Printf("bulk commit failed, err: %v\n", err)
    }
    // 在批量提交成功的情况下,可以进行任何操作
    log.Printf("commit response=%v\n", response.Errors)
    log.Printf("commit successfully, len(requests)=%d\n", len(requests))
}

我正在批量添加所有数据。

在响应后的回调函数中,错误为true。当我查询数据时,没有任何记录被插入。请帮忙解决。

英文:

I am trying to insert bulk data. I am not getting any error except for in response property error: true. Here is my code snippet.

func BulkInsert(ctx context.Context, products []model.Pt) error {
	Counter++
	// create new bulk processor from client
	bulkProcessor, err := ElasticVar.BulkProcessor().
		Name(&quot;Bulk Worker 1&quot;).
		Workers(5).
		BulkActions(1000).
		BulkSize(2 &lt;&lt; 20).
		FlushInterval(1 * time.Second).
		Stats(true).
		After(after).
		Do(context.Background())
	if err != nil {
		log.Println(&quot;NewBulkProcessorService err&quot;, err)
		return err
	}
	defer bulkProcessor.Close()
	storeIds := map[string]string{}
	// Enqueue the document
	for _, product := range products {
		if _, ok := storeIds[product.Id]; ok {
			continue
		} else {
			storeIds[product.StoreId] = product.StoreId
		}

		dataJSON, err := json.Marshal(product)
		if err != nil {
			// Look up the failed documents with res.Failed(), and e.g. recommit
			log.Println(&quot;elastic marshal failed&quot;, err)
			return errors.New(&quot;elastic marshal failed&quot;)
		}
		bulkProcessor.Add(elastic.NewBulkIndexRequest().
			OpType(&quot;index&quot;).
			Index(Index).
			Type(Type).
			Id(product.Id).
			Doc(string(dataJSON)))
	}
	stats := bulkProcessor.Stats()
	fmt.Printf(&quot;Number of times flush has been invoked: %d\n&quot;, stats.Flushed)
	fmt.Printf(&quot;Number of times workers committed reqs: %d\n&quot;, stats.Committed)
	fmt.Printf(&quot;Number of requests indexed            : %d\n&quot;, stats.Indexed)
	fmt.Printf(&quot;Number of requests reported as created: %d\n&quot;, stats.Created)
	fmt.Printf(&quot;Number of requests reported as updated: %d\n&quot;, stats.Updated)
	fmt.Printf(&quot;Number of requests reported as success: %d\n&quot;, stats.Succeeded)
	fmt.Printf(&quot;Number of requests reported as failed : %d\n&quot;, stats.Failed)
	fmt.Printf(&quot;\n\n&quot;)
	for i, w := range stats.Workers {
		fmt.Printf(&quot;Worker %d: Number of requests queued: %d\n&quot;, i, w.Queued)
		fmt.Printf(&quot;           Last response time       : %v\n&quot;, w.LastDuration)
	}
	fmt.Printf(`Inserted Count: %d`, Counter)
	return nil
}

func after(executionID int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
	if err != nil {
		log.Printf(&quot;bulk commit failed, err: %v\n&quot;, err)
	}
	// do what ever you want in case bulk commit success
	log.Printf(&quot;commit response=%v\n&quot;, response.Errors)
	log.Printf(&quot;commit successfully, len(requests)=%d\n&quot;, len(requests))
}

I am adding all the data in bulk.

In the callback after the response. error is true. When I query the data. None of the records is inserted. Please help.

答案1

得分: 0

我能够通过访问创建的方法中的错误字符串来调试它。

for _, info := range response.Created() {
    fmt.Println("nBulk response created error:", info.Error)
    fmt.Println("nBulk response created: error reason4", info.Error.Reason)
    fmt.Println("nBulk response IndexProduct:", info.IndexProduct)
}

问题是我传递的Type是"product"而不是"doc"。

英文:

I was able to debug it by access the error string through created mether

for _, info := range response.Created() {
	fmt.Println(&quot;nBulk response created error:&quot;, info.Error)
	fmt.Println(&quot;nBulk response created: error reason4&quot;, info.Error.Reason)
	fmt.Println(&quot;nBulk response IndexProduct:&quot;, info.IndexProduct)
}

The problem was the I was passing Type = "product" instead of Type = "doc"

huangapple
  • 本文由 发表于 2022年1月19日 02:37:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/70760753.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定