英文:
Implement while loop in Reactor to fetch latest Elasticsearch index
问题
My index name in reactive elastic search are as follow:
logs-2020.08.18
logs-2020.08.17
logs-2020.08.16
It will be created daily basis.
I wanna fetch the latest index name and get the logs using reactiveElasticsearchClient or spring data.
Is it possible?
I tried the following way in my spring webflux application:
I have the below code snippet to find index availability:
public Flux<Log> getLogFromLatestIndex(String serialId) {
Calendar cal = Calendar.getInstance();
String currentIndex = StringUtils.EMPTY;
boolean indexExists = false;
while (!indexExists) {
currentIndex = String.format("logs-%s", format(cal.getTime(), "yyyy.MM.dd"));
indexExists = isIndexExists(currentIndex);
cal.add(Calendar.DATE, -1); // Decrease day 1 until you find index
}
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(matchQuery("serialId", serialId))
.withIndices(currentIndex)
.build();
return reactiveElasticsearchTemplate.find(searchQuery, Log.class);
}
public Mono<Boolean> isIndexExists(String indexName) {
return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName));
}
How do I fetch boolean value without using block here?
indexExists = isIndexExists(currentIndex);
Obviously I will get the below error:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
英文:
My index name in reactive elastic search are as follow:
logs-2020.08.18
logs-2020.08.17
logs-2020.08.16
It will be created daily basis.
I wanna fetch the latest index name and get the logs using reactiveElasticsearchClient or spring data.
Is it possible?
I tried the following way in my spring webflux application:
I have the below code snippet to find index availability:
public Flux<Log> getLogFromLatestIndex(String serialId) {
Calendar cal = Calendar.getInstance();
String currentIndex = StringUtils.EMPTY;
boolean indexExists = false;
while (!indexExists) {
currentIndex = String.format("logs-%s”, format(cal.getTime(), "yyyy.MM.dd"));
indexExists = isIndexExists(currentIndex).block();
cal.add(Calendar.DATE, -1); // Decrease day 1 until you find index
}
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(matchQuery("serialId", serialId))
.withIndices(currentIndex)
.build();
return reactiveElasticsearchTemplate.find(searchQuery, Log.class);
}
public Mono<Boolean> isIndexExists(String indexName) {
return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName));
}
How do I fetch boolean value without using block here
indexExists = isIndexExists(currentIndex).block();
Obviously I will get the below error:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
答案1
得分: 3
您可以使用Flux.generate
和(take/skip)(Until/While)
在响应式编程中执行while循环。
备注:
- 将
Calendar
替换为LocalDate
,因为它是不可变的,更适合函数式/响应式编程。 isIndexExists
方法返回一个Tuple
,以便引用索引名称,但显然可以根据需要替换为更具描述性的类。
public Flux<Log> getLog(String serialId) {
return Flux.generate(LocalDate::now, this::generateNextDate)
.map(day -> String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))))
.concatMap(this::isIndexExists)
.skipUntil(Tuple2::getT2) // 检查索引是否存在的布尔值,并丢弃不存在的索引
.next() // 获取第一个存在的索引
.flatMapMany(tuple -> findLogs(tuple.getT1(), serialId));
}
private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink<LocalDate> sink) {
sink.next(currentDay);
return currentDay.minusDays(1);
}
private Mono<Tuple2<String, Boolean>> isIndexExists(String indexName) {
return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
.map(exists -> Tuples.of(indexName, exists));
}
private Flux<Log> findLogs(String index, String serialId) {
// 在这里执行您的其他ES查询
}
英文:
You can use Flux.generate
and (take/skip)(Until/While)
to do a while loop in reactor.
Notes:
- replaced
Calendar
withLocalDate
as that is immutable and fits better with functional/reactive programming. - the
isIndexExists
method returns aTuple
to have the reference for the index name but obviously it can be replaced with some more descriptive class as needed
public Flux<Log> getLog(String serialId) {
return Flux.generate(LocalDate::now, this::generateNextDate)
.map(day -> String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))))
.concatMap(this::isIndexExists)
.skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
.next() // takes first existing
.flatMapMany(tuple -> findLogs(tuple.getT1(), serialId));
}
private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink<LocalDate> sink) {
sink.next(currentDay);
return currentDay.minusDays(1);
}
private Mono<Tuple2<String, Boolean>> isIndexExists(String indexName) {
return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
.map(exists -> Tuples.of(indexName, exists));
}
private Flux<Log> findLogs(String index, String serialId) {
// your other ES query here
}
答案2
得分: 0
application.yml
spring.data.elasticsearch.client.reactive.endpoints: ip1:9200,ip2:9200,ip3:9200
spring.data.elasticsearch.rest.uris: ip1:9200,ip2:9200,ip3:9200
Configuration.java
@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elasticSearchEndpoint;
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchEndpoint.split(","))
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
Controller.java
@GetMapping("/getLog/{serialId}")
public Flux<Log> getLog(@PathVariable String serialId) {
return loggerService.getLog(serialId);
}
The rest of the code is related to indexing and querying Elasticsearch data. If you have specific questions about that part, please let me know.
英文:
application.yml
spring.data.elasticsearch.client.reactive.endpoints: ip1:9200,ip2:9200,ip3:9200
spring.data.elasticsearch.rest.uris: ip1:9200,ip2:9200,ip3:9200
Configuration.java
@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elasticSearchEndpoint;
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchEndpoint.split(","))
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}
Controller.java
@GetMapping("/getLog/{serialId}")
public Flux<Log> getLog(@PathVariable String serialId) {
return loggerService.getLog(serialId);
}
Rest are all your code. I am just printing index name inside map. Although I am having index logs-2020.08.21 in elastic search, it is keep on printing indices like logs-2020.08.20, logs-2020.08.19, logs-2020.08.18 and so on, eventually throws error.
Note: When I tried with single ip in application.yml, I got the same error.
public Flux<Log> getLog(String serialId) {
return Flux.generate(LocalDate::now, this::generateNextDate)
.map(day -> {
System.out.println(String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))));
return String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd")));
})
.flatMap(this::isIndexExists)
.skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
.next() // takes first existing
.flatMapMany(tuple -> findLogs(tuple.getT1(), serialId));
}
private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink<LocalDate> sink) {
sink.next(currentDay);
return currentDay.minusDays(1);
}
private Mono<Tuple2<String, Boolean>> isIndexExists(String indexName) {
return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
.map(exists -> Tuples.of(indexName, exists));
}
private Flux<Log> findLogs(String index, String serialId) {
// your other ES query here
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论