使用Elasticsearch从传入的Pulsar中插入批量数据。

huangapple go评论96阅读模式
英文:

Go elasticsearch insert bulk data from incoming pulsar

问题

我必须使用goelastic库从Pulsar中批量插入数据。但是我遇到了一个问题。

首先,Pulsar每次以部分批量的方式发送1000条数据。然后,当我插入到Elasticsearch时,有时会出现问题。这个问题如下所示。这个问题导致数据丢失。谢谢回答...
ERROR: circuit_breaking_exception: [parent] Data too large, data for [indices:data/write/bulk

展开收缩
] would be [524374312/500mb], which is larger than the limit of [510027366/486.3mb], real usage: [524323448/500mb], new bytes reserved: [50864/49.6kb], usages [request=0/0b, fielddata=160771183/153.3mb, in_flight_requests=50864/49.6kb, model_inference=0/0b, eql_sequence=0/0b, accounting=6898128/6.5mb]

这部分是批量代码。


func InsertElastic(y []models.CP, ElasticStruct *config.ElasticStruct) {
	fmt.Println("------------------")
	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:      enum.IndexName,
		Client:     ElasticStruct.Client,
		FlushBytes: 10e+6,
	})
	if err != nil {
		panic(err)
	}
	start := time.Now().UTC()

	for _, x := range y {
		data, err := json.Marshal(x)
		if err != nil {
			panic(err)
		}
		err = bi.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				Action: "index",

				Body: bytes.NewReader(data),

				OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
					i++
				},

				OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
					if err != nil {
						log.Printf("ERROR: %s", err)
					} else {
						log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
					}
				},
			},
		)
		if err != nil {
			log.Fatalf("Unexpected error: %s", err)
		}
		x++

	}
	if err := bi.Close(context.Background()); err != nil {
		log.Fatalf("Unexpected error: %s", err)
	}
	dur := time.Since(start)
	fmt.Println(dur)
	fmt.Println("Success writing data to elastic : ", i)
	fmt.Println("Success incoming data from pulsar : ", x)
	fmt.Println("Difference : ", x-i)
	fmt.Println("Now : ", time.Now().UTC().String())
	if i < x {
		fmt.Println("FATAL")
	}
	fmt.Println("------------------")

}
英文:

I have to use goelastic library inserting the datas bulkly from coming pulsar. But i have a problem.

Firstly, pulsar send 1000 datas per partial bulkly. Then when i insert the elastic, there are a problem sometimes. This problem is attached. This problem cause data loss. Thanks for answer...
ERROR: circuit_breaking_exception: [parent] Data too large, data for [indices:data/write/bulk

展开收缩
] would be [524374312/500mb], which is larger than the limit of [510027366/486.3mb], real usage: [524323448/500mb], new bytes reserved: [50864/49.6kb], usages [request=0/0b, fielddata=160771183/153.3mb, in_flight_requests=50864/49.6kb, model_inference=0/0b, eql_sequence=0/0b, accounting=6898128/6.5mb]

This section is bulk code.


func InsertElastic(y []models.CP, ElasticStruct *config.ElasticStruct) {
	fmt.Println(&quot;------------------&quot;)
	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:      enum.IndexName,
		Client:     ElasticStruct.Client,
		FlushBytes: 10e+6,
	})
	if err != nil {
		panic(err)
	}
	start := time.Now().UTC()

	for _, x := range y {
		data, err := json.Marshal(x)
		if err != nil {
			panic(err)
		}
		err = bi.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				Action: &quot;index&quot;,

				Body: bytes.NewReader(data),

				OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
					i++
				},

				OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
					if err != nil {
						log.Printf(&quot;ERROR: %s&quot;, err)
					} else {
						log.Printf(&quot;ERROR: %s: %s&quot;, res.Error.Type, res.Error.Reason)
					}
				},
			},
		)
		if err != nil {
			log.Fatalf(&quot;Unexpected error: %s&quot;, err)
		}
		x++

	}
	if err := bi.Close(context.Background()); err != nil {
		log.Fatalf(&quot;Unexpected error: %s&quot;, err)
	}
	dur := time.Since(start)
	fmt.Println(dur)
	fmt.Println(&quot;Success writing data to elastic : &quot;, i)
	fmt.Println(&quot;Success incoming data from pulsar : &quot;, x)
	fmt.Println(&quot;Difference : &quot;, x-i)
	fmt.Println(&quot;Now : &quot;, time.Now().UTC().String())
	if i &lt; x {
		fmt.Println(&quot;FATAL&quot;)
	}
	fmt.Println(&quot;------------------&quot;)

}

答案1

得分: 0

Tldr;

看起来你的节点上的JVM堆内存不足。

你正在触发一个断路器,以避免Elasticsearch内存溢出(OOM)。

解决方案

  • 增加JVM内存,你可以在这里找到一些关于调整节点大小的文档。
  • 减小批量请求的大小。
英文:

Tldr;

It seems like you do not have enough JVM heap on your node.

You are hitting a circuit breaker to avoid Elasticsearch to be Out Of Memory(OOM).

Solution(s)

  • Increase the JVM memory, you will find here some documentation to size your nodes.
  • Smaller bulk request

huangapple
  • 本文由 发表于 2022年9月21日 13:29:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/73795759.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定