在 Reactor 中实现 while 循环以获取最新的 Elasticsearch 索引。

huangapple go评论75阅读模式
英文:

Implement while loop in Reactor to fetch latest Elasticsearch index

问题

My index name in reactive elastic search are as follow:

logs-2020.08.18
logs-2020.08.17
logs-2020.08.16

It will be created daily basis.

I wanna fetch the latest index name and get the logs using reactiveElasticsearchClient or spring data.
Is it possible?

I tried the following way in my spring webflux application:

I have the below code snippet to find index availability:

public Flux<Log> getLogFromLatestIndex(String serialId) {
    Calendar cal = Calendar.getInstance();
    String currentIndex = StringUtils.EMPTY;
    boolean indexExists = false;
    while (!indexExists) {
        currentIndex = String.format("logs-%s", format(cal.getTime(), "yyyy.MM.dd"));
        indexExists = isIndexExists(currentIndex);
        cal.add(Calendar.DATE, -1); // Decrease day 1 until you find index
    }

    SearchQuery searchQuery = new NativeSearchQueryBuilder()
            .withQuery(matchQuery("serialId", serialId))
            .withIndices(currentIndex)
            .build();

    return reactiveElasticsearchTemplate.find(searchQuery, Log.class);
}

public Mono<Boolean> isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName));
}

How do I fetch boolean value without using block here?

indexExists = isIndexExists(currentIndex);

Obviously I will get the below error:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

英文:

My index name in reactive elastic search are as follow:

logs-2020.08.18
logs-2020.08.17
logs-2020.08.16

It will be created daily basis.

I wanna fetch the latest index name and get the logs using reactiveElasticsearchClient or spring data.
Is it possible?

I tried the following way in my spring webflux application:

I have the below code snippet to find index availability:

public Flux&lt;Log&gt; getLogFromLatestIndex(String serialId) {
    Calendar cal = Calendar.getInstance();
    String currentIndex = StringUtils.EMPTY;
    boolean indexExists = false;
    while (!indexExists) {
        currentIndex = String.format(&quot;logs-%s”, format(cal.getTime(), &quot;yyyy.MM.dd&quot;));
        indexExists = isIndexExists(currentIndex).block();
        cal.add(Calendar.DATE, -1); // Decrease day 1 until you find index
    }

    SearchQuery searchQuery = new NativeSearchQueryBuilder()
            .withQuery(matchQuery(&quot;serialId&quot;, serialId))
            .withIndices(currentIndex)
            .build();

    return reactiveElasticsearchTemplate.find(searchQuery, Log.class);
}

public Mono&lt;Boolean&gt; isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName));
}

How do I fetch boolean value without using block here

indexExists = isIndexExists(currentIndex).block();

Obviously I will get the below error:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

答案1

得分: 3

您可以使用Flux.generate(take/skip)(Until/While)在响应式编程中执行while循环。

备注:

  • Calendar替换为LocalDate,因为它是不可变的,更适合函数式/响应式编程。
  • isIndexExists方法返回一个Tuple,以便引用索引名称,但显然可以根据需要替换为更具描述性的类。
public Flux&lt;Log&gt; getLog(String serialId) {
    return Flux.generate(LocalDate::now, this::generateNextDate)
               .map(day -&gt; String.format(&quot;logs-%s&quot;, day.format(DateTimeFormatter.ofPattern(&quot;yyyy.MM.dd&quot;))))
               .concatMap(this::isIndexExists)
               .skipUntil(Tuple2::getT2) // 检查索引是否存在的布尔值,并丢弃不存在的索引
               .next() // 获取第一个存在的索引
               .flatMapMany(tuple -&gt; findLogs(tuple.getT1(), serialId));
}

private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink&lt;LocalDate&gt; sink) {
    sink.next(currentDay);
    return currentDay.minusDays(1);
}

private Mono&lt;Tuple2&lt;String, Boolean&gt;&gt; isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
            .map(exists -&gt; Tuples.of(indexName, exists));
}

private Flux&lt;Log&gt; findLogs(String index, String serialId) {
    // 在这里执行您的其他ES查询
}
英文:

You can use Flux.generate and (take/skip)(Until/While) to do a while loop in reactor.

Notes:

  • replaced Calendar with LocalDate as that is immutable and fits better with functional/reactive programming.
  • the isIndexExists method returns a Tuple to have the reference for the index name but obviously it can be replaced with some more descriptive class as needed
public Flux&lt;Log&gt; getLog(String serialId) {
    return Flux.generate(LocalDate::now, this::generateNextDate)
               .map(day -&gt; String.format(&quot;logs-%s&quot;, day.format(DateTimeFormatter.ofPattern(&quot;yyyy.MM.dd&quot;))))
               .concatMap(this::isIndexExists)
               .skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
               .next() // takes first existing
               .flatMapMany(tuple -&gt; findLogs(tuple.getT1(), serialId));
}

private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink&lt;LocalDate&gt; sink) {
    sink.next(currentDay);
    return currentDay.minusDays(1);
}

private Mono&lt;Tuple2&lt;String, Boolean&gt;&gt; isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
            .map(exists -&gt; Tuples.of(indexName, exists));
}

private Flux&lt;Log&gt; findLogs(String index, String serialId) {
    // your other ES query here
}

答案2

得分: 0

application.yml

spring.data.elasticsearch.client.reactive.endpoints: ip1:9200,ip2:9200,ip3:9200
spring.data.elasticsearch.rest.uris: ip1:9200,ip2:9200,ip3:9200
Configuration.java

@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {

    @Value("${spring.data.elasticsearch.client.reactive.endpoints}")
    private String elasticSearchEndpoint;

    @Override
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
            .connectedTo(elasticSearchEndpoint.split(","))
            .withWebClientConfigurer(webClient -> {
                ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                        .codecs(configurer -> configurer.defaultCodecs()
                                .maxInMemorySize(-1))
                        .build();
                return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
            })
            .build();
        return ReactiveRestClients.create(clientConfiguration);
    }
}
Controller.java

@GetMapping("/getLog/{serialId}")
public Flux<Log> getLog(@PathVariable String serialId) {
    return loggerService.getLog(serialId);
}

The rest of the code is related to indexing and querying Elasticsearch data. If you have specific questions about that part, please let me know.

英文:

application.yml

spring.data.elasticsearch.client.reactive.endpoints: ip1:9200,ip2:9200,ip3:9200
spring.data.elasticsearch.rest.uris: ip1:9200,ip2:9200,ip3:9200

Configuration.java

@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {

    @Value(&quot;${spring.data.elasticsearch.client.reactive.endpoints}&quot;)
    private String elasticSearchEndpoint;

    @Override
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
            .connectedTo(elasticSearchEndpoint.split(&quot;,&quot;))
            .withWebClientConfigurer(webClient -&gt; {
                ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                        .codecs(configurer -&gt; configurer.defaultCodecs()
                                .maxInMemorySize(-1))
                        .build();
                return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
            })
            .build();
    return ReactiveRestClients.create(clientConfiguration);

    }
}

Controller.java

@GetMapping(&quot;/getLog/{serialId}&quot;)
public Flux&lt;Log&gt; getLog(@PathVariable String serialId) {
    return loggerService.getLog(serialId);
}

Rest are all your code. I am just printing index name inside map. Although I am having index logs-2020.08.21 in elastic search, it is keep on printing indices like logs-2020.08.20, logs-2020.08.19, logs-2020.08.18 and so on, eventually throws error.

Note: When I tried with single ip in application.yml, I got the same error.

public Flux&lt;Log&gt; getLog(String serialId) {
    return Flux.generate(LocalDate::now, this::generateNextDate)
           .map(day -&gt; {
                System.out.println(String.format(&quot;logs-%s&quot;, day.format(DateTimeFormatter.ofPattern(&quot;yyyy.MM.dd&quot;))));
                return String.format(&quot;logs-%s&quot;, day.format(DateTimeFormatter.ofPattern(&quot;yyyy.MM.dd&quot;)));
            })               
           .flatMap(this::isIndexExists)
           .skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
           .next() // takes first existing
           .flatMapMany(tuple -&gt; findLogs(tuple.getT1(), serialId));
}

private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink&lt;LocalDate&gt; sink) {
    sink.next(currentDay);
    return currentDay.minusDays(1);
}

private Mono&lt;Tuple2&lt;String, Boolean&gt;&gt; isIndexExists(String indexName) {
    return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
        .map(exists -&gt; Tuples.of(indexName, exists));
}

private Flux&lt;Log&gt; findLogs(String index, String serialId) {
    // your other ES query here
}

huangapple
  • 本文由 发表于 2020年8月20日 18:52:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/63503519.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定