Index Flux (Spring Webflux) to ElasticSearch using ElasticSearch-Java

huangapple go评论66阅读模式
英文:

Index Flux (Spring Webflux) to ElasticSearch using ElasticSearch-Java

问题

/**
 *我尝试实现的目标:
 *
 *1 - 从Flux(Spring Webflux,Reactor)中获取
 *
 *2 - 使用elasticsearch-java的**批量**API(https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/indexing-bulk.html#_indexing_application_objects)将flux的每个元素索引/保存/插入到ElasticSearch中(不是一个一个地)
 *(不使用spring data elasticsearch)
 *
 *3 - 在步骤2中保留flux以便进行进一步处理
 *
 *我尝试了什么:
 **/

@Service
public final class SomeService implements CommandLineRunner {

    @Autowired
    ElasticsearchClient elasticsearchClient;

    @Override
    public void run(final String[] args) throws IOException {
        Flux<Product> productFlux = Flux.interval(Duration.ofSeconds(1)).map(i -> new Product(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC), String.valueOf(i))); //实际上可能是来自Kafka或HTTP的强劲flux
        Flux<Product> savedFlux = saveAll(productFlux);
        Flux<CompletedProduct> completedProductFlux = savedFlux.map(oneSavedProduct -> toOneCompletedProduct(oneSavedProduct)); //保存后进行某种后处理
        completedProductFlux.subscribe();
    }

    private Flux<Product> saveAll(Flux<Product> productFlux) throws IOException {
        BulkRequest.Builder br = new BulkRequest.Builder();
        for (Product product : productFlux.toIterable()) { // 在这里做了更改
            br.operations(op -> op
                    .index(idx -> idx
                            .index("products")
                            .document(product)
                    )
            );
        }
        BulkResponse result = elasticsearchClient.bulk(br.build());
        System.out.println("我确实关心响应 " + result.ingestTook());
        if (result.errors()) {
            System.out.println("我确实关心响应 Bulk 出现了错误");
            for (BulkResponseItem item: result.items()) {
                if (item.error() != null) {
                    System.out.println(item.error().reason());
                }
            }
        }
        return productFlux;
    }

    private CompletedProduct toOneCompletedProduct(Product oneSavedProduct) {
        return //将产品转换为保存到elasticsearch后的已完成产品;
    }

}

/**
 *注意:
 *步骤1中的flux将来自kafka或http,它是一个强劲的flux
 *
 *对于步骤2,我确实关心响应(在发生错误时进行一些日志记录)
 *
 *问题:
 *如何在反应式spring webflux的反应器管道中使用elasticsearch-java的批量api保存flux的所有元素?
 **/
英文:

What I am trying to achieve:

1 - From a Flux (Spring Webflux, Reactor)

2 - Index/Save/Insert each element of the flux inside ElasticSearch, using the bulk API
(not one by one) of elasticsearch-java https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/current/indexing-bulk.html#_indexing_application_objects (not using spring data elasticsearch)

3 - Retain the flux from step 2 in order to do some further processing

What did I try:

@Service
public final class SomeService implements CommandLineRunner {
@Autowired
ElasticsearchClient elasticsearchClient;
@Override
public void run(final String[] args) throws IOException {
Flux&lt;Product&gt; productFlux = Flux.interval(Duration.ofSeconds(1)).map(i -&gt; new Product(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC), String.valueOf(i))); //Actual would be an aggresive flux from Kafka or http
Flux&lt;Product&gt; savedFlux = saveAll(productFlux);
Flux&lt;CompletedProduct&gt; completedProductFlux = savedFlux.map(oneSavedProduct -&gt; toOneCompletedProduct(oneSavedProduct)); //some kind of post processing which happens after it is saved
completedProductFlux.subscribe();
}
private Flux&lt;Product&gt; saveAll(Flux&lt;Product&gt; productFlux) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (Product product : productFlux.toIterable()) { // changed here
br.operations(op -&gt; op
.index(idx -&gt; idx
.index(&quot;products&quot;)
.document(product)
)
);
}
BulkResponse result = elasticsearchClient.bulk(br.build());
System.out.println(&quot;I do care about the response &quot; + result.ingestTook());
if (result.errors()) {
System.out.println(&quot;I do care about the response Bulk had errors&quot;);
for (BulkResponseItem item: result.items()) {
if (item.error() != null) {
System.out.println(item.error().reason());
}
}
}
return productFlux;
}
private CompletedProduct toOneCompletedProduct(Product oneSavedProduct) {
return // transform a product to a completed product AFTER saving to elasticsearch;
}
}

Note:

The flux from step 1 would be coming from kafka or http, it is an aggressive flux

For step 2, I do care about the response (doing some logging in case of errors)

Issue:

With the above code, I am able to generate the flux of products, but this is not inserting/saving/indexing anything inside elasticsearch.

Question:

How to save all the elements from the flux using the bulk api of elasticsearch-java within a reactor pipeline of reactor spring webflux?

答案1

得分: 3

The code you have written seems to be trying to consume a Flux as an Iterable, which might be the reason it is not working as expected.

In your case, you are trying to use the toIterable method from the Flux API to convert a Flux into an Iterable. This is not recommended because Flux is designed to be non-blocking and backpressure-aware, while Iterable is a blocking API. This could be the reason why the code is not indexing data into Elasticsearch.

The issue you are facing in your code seems to be related to this blocking operation. In your saveAll() method, you are transforming the Flux into an Iterable and then trying to perform a bulk indexing operation on the products. However, because Flux.toIterable() is a blocking operation and the bulk indexing operation is asynchronous, this might not be working as expected.

Instead of consuming a Flux as an Iterable, you should use the Flux API to chain operations together in a non-blocking way. In this case, you could use the collectList method to buffer all the elements of the Flux into a List, then use the map method to transform the List into a BulkRequest.

英文:

The code you have written seems to be trying to consume a Flux as an Iterable, which might be the reason it is not working as expected.

In your case, you are trying to use the toIterable method from the Flux API to convert a Flux into an Iterable. This is not recommended because (reading "Reactor Core Features") Flux is designed to be non-blocking and backpressure-aware, while Iterable is a blocking API. This could be the reason why the code is not indexing data into Elasticsearch.

The issue you are facing in your code seems to be related to this blocking operation. In your saveAll() method, you are transforming the Flux into an Iterable and then trying to perform a bulk indexing operation on the products.
However, because Flux.toIterable() is a blocking operation and the bulk indexing operation is asynchronous, this might not be working as expected.

Instead of consuming a Flux as an Iterable, you should use the Flux API to chain operations together in a non-blocking way. In this case, you could use the collectList method to buffer all the elements of the Flux into a List, then use the map method to transform the List into a BulkRequest.

Index Flux (Spring Webflux) to ElasticSearch using ElasticSearch-Java

For example:

private Flux&lt;Product&gt; saveAll(Flux&lt;Product&gt; productFlux) {
    return productFlux.collectList().flatMap(products -&gt; {
        BulkRequest.Builder br = new BulkRequest.Builder();
        for (Product product : products) {
            br.operations(op -&gt; op
                    .index(idx -&gt; idx
                            .index(&quot;products&quot;)
                            .document(product)
                    )
            );
        }
        return Mono.just(br.build());
    }).flatMap(bulkRequest -&gt; {
        try {
            BulkResponse result = elasticsearchClient.bulk(bulkRequest);
            System.out.println(&quot;print &quot; + result.ingestTook());
            if (result.errors()) {
                System.out.println(&quot;Bulk had errors&quot;);
                for (BulkResponseItem item: result.items()) {
                    if (item.error() != null) {
                        System.out.println(item.error().reason());
                    }
                }
            }
            return productFlux;
        } catch (IOException e) {
            return Flux.error(e);
        }
    });
}

This version of the saveAll method is using the collectList method to buffer all the elements of the Flux into a List, then using the flatMap method to transform the List into a BulkRequest. The flatMap method is also used to send the BulkRequest to Elasticsearch and handle the response.

Please note that this is a simplified example and it assumes that the number of elements in the Flux is small enough to fit into memory. If the Flux can have a large number of elements, you might want to consider using the buffer method to split the Flux into smaller chunks and process each chunk separately.

Also, note that the Elasticsearch bulk API is designed to be used with a large number of requests. If you are only sending a few requests at a time, you might not see any performance benefit from using the bulk API.

Problem: this version still returns the original Flux after sending the BulkRequest, and it doesn't ensure that the indexing operation is completed before the returned Flux is consumed.


Another more robust approach: you could try using Flux.collectList() to gather all the products into a list, and then send this list to Elasticsearch using the bulk API. However, keep in mind that this would also accumulate all the data in memory before sending it, which might not be what you want if the Flux contains a lot of elements.

That would use a Mono this time:

private Mono&lt;Void&gt; saveAll(Flux&lt;Product&gt; productFlux) {
    return productFlux.collectList().flatMap(products -&gt; {
        BulkRequest.Builder br = new BulkRequest.Builder();
        for (Product product : products) {
            br.operations(op -&gt; op
                    .index(idx -&gt; idx
                            .index(&quot;products&quot;)
                            .document(product)
                    )
            );
        }
        return elasticsearchClient.bulk(br.build()).then();
    });
}

The flatMap() operator is used to perform the asynchronous bulk operation for each list of products. The then() operator is used to transform the Mono&lt;BulkResponse&gt; returned by elasticsearchClient.bulk(br.build()) into a Mono&lt;Void&gt;, effectively "ignoring" the BulkResponse. This allows the method to return only when all products have been sent to Elasticsearch.

The saveAll() method now returns a Mono&lt;Void&gt;, which represents the completion of the bulk indexing operation. You can use this to ensure that further processing only happens after all products have been indexed:

Flux&lt;Product&gt; productFlux = ...;
Mono&lt;Void&gt; saved = saveAll(productFlux);
Flux&lt;CompletedProduct&gt; completedProductFlux = saved.thenMany(productFlux.map(this::toOneCompletedProduct));
completedProductFlux.subscribe();

Mono.thenMany() is used to create a new Flux&lt;CompletedProduct&gt; that will start emitting items only after the saveAll() operation has completed.

As you see in this second approach, saveAll() now returns a Mono&lt;Void&gt; instead of a Flux&lt;Product&gt; because the operation of indexing all products to Elasticsearch is a single, asynchronous operation that either completes or fails. This is exactly what a Mono represents: a single, asynchronous computation. The Void type parameter indicates that the Mono does not emit any value upon completion. It only signals the completion or failure of the operation.

In contrast, a Flux represents a sequence of zero or more items, and is used for operations that can produce multiple results over time.

When saveAll() was returning a Flux&lt;Product&gt;, it was actually just returning the original Flux passed to it. This didn't reflect the fact that all products have been indexed to Elasticsearch. It was possible for the returned Flux to be consumed before the indexing operation was completed, which could lead to unexpected behavior.

By returning a Mono&lt;Void&gt;, saveAll() now correctly represents the operation it performs: a single, asynchronous indexing operation that either completes or fails. The caller can use the returned Mono to chain further processing that should happen after the indexing operation is completed, as shown in the previous example.

Then end result (as a simplified example):

@Service
public final class SomeService implements CommandLineRunner {

    @Autowired
    ElasticsearchClient elasticsearchClient;

    @Override
    public void run(final String[] args) throws IOException {
        Flux&lt;Product&gt; productFlux = Flux.interval(Duration.ofSeconds(1)).map(i -&gt; new Product(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC), String.valueOf(i)));
        Mono&lt;Void&gt; saveOperation = saveAll(productFlux);
        Flux&lt;CompletedProduct&gt; completedProductFlux = saveOperation.thenMany(productFlux.map(this::toOneCompletedProduct));
        completedProductFlux.subscribe();
    }

    private Mono&lt;Void&gt; saveAll(Flux&lt;Product&gt; productFlux) {
        return productFlux.collectList().flatMap(products -&gt; {
            BulkRequest.Builder br = new BulkRequest.Builder();
            for (Product product : products) {
                br.operations(op -&gt; op
                        .index(idx -&gt; idx
                                .index(&quot;products&quot;)
                                .document(product)
                        )
                );
            }
            return Mono.just(br.build());
        }).flatMap(bulkRequest -&gt; {
            try {
                BulkResponse result = elasticsearchClient.bulk(bulkRequest);
                System.out.println(&quot;print &quot; + result.ingestTook());
                if (result.errors()) {
                    System.out.println(&quot;Bulk had errors&quot;);
                    for (BulkResponseItem item: result.items()) {
                        if (item.error() != null) {
                            System.out.println(item.error().reason());
                        }
                    }
                }
                return Mono.empty();
            } catch (IOException e) {
                return Mono.error(e);
            }
        });
    }

    private CompletedProduct toOneCompletedProduct(Product oneSavedProduct) {
        return // transform a product to a completed product AFTER saving to elasticsearch;
    }
}

You have:

  • The run() method creates a Flux&lt;Product&gt;, calls saveAll() to save all products to Elasticsearch, and then uses the returned Mono&lt;Void&gt; to create a Flux&lt;CompletedProduct&gt; that starts emitting items only after the save operation is completed.

  • The saveAll() method collects all products into a List, creates a BulkRequest for them, sends the BulkRequest to Elasticsearch, and returns a Mono&lt;Void&gt; that represents the completion of this operation.

  • The toOneCompletedProduct() method transforms a saved product into a completed product. This method is used after the save operation is completed, as indicated by the thenMany() operator in the run() method.

By "simplified example", I mean for instance that error handling is quite minimal in this example. In a real-world scenario, you might want to add more comprehensive error handling to deal with potential failures of the Elasticsearch operation.

答案2

得分: 0

示例代码正在使用普通的Elasticsearch客户端向ES发出批量请求。在使用响应式代码时,我们应该使用响应式Rest客户端ReactiveRestClient仍然需要一个BulkRequest,但内部使用Reactive Web客户端来发送批量请求,因此返回一个Mono&lt;Response&gt;。因此,基本代码应如下所示:

productFlux
  .buffer() //创建足够的负载以进行批量操作
  .flatMap(productList-&gt; {
    //序列化为BulkRequest并发送给Reactiveclient
  });

在执行此操作时,还有其他重要考虑事项:

  1. 每个批量请求将绑定到ES端的单个线程,据我了解其架构是如此。因此,我们需要选择缓冲区,以使我们的批量请求大小既不太小也不太大。
  2. 我们需要控制发送数据的速率。例如,如果我们只是用无限需求(默认订阅者)订阅我们的产品流,那么我们可能会遇到ES拒绝我们的请求的情况,因为其所有内部线程都忙于处理我们以前的请求。如果我们只是创建一个自定义订阅者,创建需求为1(或特定的低数值),那么我们将不必要地减慢索引过程。因此,一个替代方法是根据来自ES的响应创建基于背压的情况。我们可以通过在我们的管道中添加类似于delayWhen的操作符来在获取特定错误时指数级延迟(例如,拒绝执行)。
  3. 我们应该为一些间歇性网络异常等添加重试操作符。我们需要确保我们的发布者能够处理重试。在创建HTTP或Kafka上的Flux时,这很重要。如果我们使用像flatMap这样的操作符,它具有内部缓冲区,允许下游的重试操作符工作。
英文:

The sample code is using a regular Elasticsearch Client to make a bulk request to ES. When working with reactive code, we should use Reactive Rest Client. ReactiveRestClient still requires a BulkRequest, but internally uses Reactive Web Client to post a bulk request and hence returns a Mono&lt;Response&gt;. So basic code should look like

productFlux
  .buffer() //to create sufficient load for bulk
  .flatMap(productList-&gt; {
    // serialize to BulkRequest &amp; send to Reactiveclient
  });

There are other important considerations when doing this:

  1. Each bulk request will get bound to single thread on ES side, as far as I understand its architecture. So, we need to choose the buffer such that our bulk request size is not too small or too big.
  2. We need to control the rate at which we send the data. For example, if we simply subscribe our product flux with infinite demand (Default subscribers), then, we are likely to hit scenario where ES will reject our request because all its internal thread will be busy processing our previous requests. If we simply create a custom subscriber that creates demand of say 1 (or specific low number) then we will unnecessarily slow the indexing process. So, an alternative will be to create a back-pressure based on response from ES. We can do this by adding an operator like delayWhen in our pipe to exponentially delay when we get specific errors (like rejected execution.)
  3. We should add retry operators for some intermittent network exception etc. We need to ensure that our publisher is able to handle retries. This is important when creating a Flux over HTTP or Kafka. If we use an operator like flatMap, it has internal buffers which allows retry operators downstream to work.

huangapple
  • 本文由 发表于 2023年5月29日 22:14:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/76358083.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定