英文:
Zero-downtime indexing with Hibernate-Search
问题
尝试实现根据以下建议进行的零停机
重建索引的逻辑:https://docs.jboss.org/hibernate/search/6.2/reference/en-US/html_single/#backend-elasticsearch-indexlayout-strategy-simple。
依赖详情:
- springBootVersion=3.1.1
- hibernateVersion=6.1.7.Final
- hibernate_search=6.2.0.Beta1
- 后端:Elastic Search
以下是我们实现的伪代码。一切都在功能上执行。
然而,在如何启动MassIndexer
(方法processIndex
)方面需要一些建议。我们选择的方法是为每个实体执行一次MassIndexer
。
尝试将massindexer
作为一个类级别的变量字段,但它会导致应用程序启动失败。
提前致谢!
以下是MassIndexServiceImpl.java
的伪代码:
// 代码省略...
@Service
@Transactional
public class MassIndexServiceImpl implements MassIndexService {
private final Logger log = LoggerFactory.getLogger(MassIndexServiceImpl.class);
private final EntityManager entityManager;
private final MassIndexingMonitor monitor;
private final ElasticsearchClient client;
private final SearchSession searchSession;
public MassIndexServiceImpl(EntityManager entityManager) {
// 代码省略...
}
@Async
@Override
public void reindexAliases() {
// 代码省略...
}
@Async
@Override
public void reindexAliasList(List<String> types) {
// 代码省略...
}
public void processIndex(AtomicInteger successful, AtomicInteger failures, SearchIndexedEntity<?> entity) {
// 代码省略...
}
}
希望对您有所帮助。
英文:
Trying to implement the logic for Zero-downtime
reindexing as advised on: https://docs.jboss.org/hibernate/search/6.2/reference/en-US/html_single/#backend-elasticsearch-indexlayout-strategy-simple.
Dependency details:
- springBootVersion=3.1.1
- hibernateVersion=6.1.7.Final
- hibernate_search=6.2.0.Beta1
- backend: Elastic Search
Below is the pseudo code that we have implemented. Everything executes functionally.
However need some suggestions in implementing on how to initiate MassIndexer
(method processIndex
). The approach we have opted is executing MassIndexer
one per each entity.
Tried to use massindexer as a class level variable field but it fails the application start.
Thanks in advance!!
Here is the pseudo code for MassIndexServiceImpl.java
:
import MassIndexService;
import org.apache.commons.****;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.hibernate.search.backend.elasticsearch.***;
import org.hibernate.search.mapper.orm.***;
import org.hibernate.search.mapper.pojo.massindexing.***;
import org.slf4j.*;
import org.springframework.****;
import java.*****;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.IndexState;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import jakarta.persistence.EntityManager;
@Service
@Transactional
public class MassIndexServiceImpl implements MassIndexService {
private final Logger log = LoggerFactory.getLogger(MassIndexServiceImpl.class);
private final EntityManager entityManager;
private final MassIndexingMonitor monitor;
private final ElasticsearchClient client;
private final SearchSession searchSession;
public MassIndexServiceImpl(EntityManager entityManager) {
this.entityManager = entityManager;
// Create the low-level client
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
// And create the API client
client = new ElasticsearchClient(transport);
monitor = new PojoMassIndexingLoggingMonitor(1000);
searchSession = Search.session(entityManager);
}
/**
* Zero-Downtime Implementation Method for MassIndexing All Entities in the running instance
*
* @param
* @return void.
*/
@Async
@Override
public void reindexAliases() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
SearchMapping mapping = Search.mapping(entityManager.getEntityManagerFactory());
AtomicInteger successful = new AtomicInteger();
AtomicInteger failures = new AtomicInteger();
mapping
.allIndexedEntities()
.stream()
.forEach(entity -> {
processIndex(successful, failures, entity);
});
stopWatch.stop();
log.error(
"Parallel reIndexing complete - in {} with {} successful and {} failed indexes of the Total: {}",
stopWatch.formatTime(),
successful.get(),
failures.get(),
mapping.allIndexedEntities().size()
);
}
/**
* Zero-Downtime Implementation Method for MassIndexing Selected few Entities in the running instance
*
* @param types
* @return void.
*/
@Async
@Override
public void reindexAliasList(List<String> types) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
SearchMapping mapping = Search.mapping(entityManager.getEntityManagerFactory());
AtomicInteger successful = new AtomicInteger();
AtomicInteger failures = new AtomicInteger();
mapping
.allIndexedEntities()
.stream()
.filter(sEntity -> types.stream().anyMatch(e -> e.equals(sEntity.jpaName())))
.forEach(entity -> {
processIndex(successful, failures, entity);
});
stopWatch.stop();
log.error(
"Parallel reIndexing complete - in {} with {} successful and {} failed indexes of the Total: {}",
stopWatch.formatTime(),
successful.get(),
failures.get(),
mapping.allIndexedEntities().size()
);
}
/**
* Zero-Downtime Implementation Utility for MassIndexing in the running instance
*
* @param successful
* @param failures
* @param entity
* @return void.
*/
public void processIndex(AtomicInteger successful, AtomicInteger failures, SearchIndexedEntity<?> entity) {
/**
* Get the Alias names from the entity descriptor {@Link: ElasticsearchIndexDescriptor}
*/
ElasticsearchIndexDescriptor descriptor = entity.indexManager().unwrap(ElasticsearchIndexManager.class).descriptor();
String readName = descriptor.readName();
String writeName = descriptor.writeName();
try {
/**
* Get the Original index name from Elastic search client {@link: co.elastic.clients.elasticsearch.ElasticsearchClient}
*/
Optional<String> iName = MapUtils
.emptyIfNull(client.indices().getAlias(b -> b.name(writeName)).result())
.entrySet()
.stream()
.findAny()
.map(entry -> entry.getKey());
iName.ifPresent(origIndexName -> {
try {
String newIndexName = StringUtils.substringBefore(origIndexName, "-") + "-" + Instant.now().getEpochSecond();
IndexState indexState = client.indices().get(g -> g.index(origIndexName)).get(origIndexName);
/**
* Remove the alias association for write index to present index
*/
client
.indices()
.updateAliases(ub -> ub.actions(ab -> ab.remove(rb -> rb.index(origIndexName).alias(writeName))))
.acknowledged();
/**
* Create new Index by copying all settings, mappings and aliases to new Index
*/
client
.indices()
.create(ci ->
ci
.index(newIndexName)
.mappings(m -> m.allField(indexState.mappings().allField()))
.mappings(indexState.mappings())
.settings(sb ->
sb
.analysis(indexState.settings().index().analysis())
.mapping(m ->
m
.totalFields(t -> t.limit(200000))
.depth(t -> t.limit(20))
.nestedFields(t -> t.limit(200000))
.nestedObjects(t -> t.limit(20000))
)
)
.aliases(indexState.aliases())
)
.index();
log.debug("indexing for: {}", entity.javaClass());
/**
* Initiate massIndexing
*/
MassIndexer massIndexer = searchSession.massIndexer(entity.javaClass());
massIndexer
.monitor(monitor)
.batchSizeToLoadObjects(500)
.threadsToLoadObjects(2)
.typesToIndexInParallel(1)
.start()
.thenRun(() -> {
try {
/**
* 1. Remove the alias association for read index to present index
* 2. Delete the original Index
*/
client
.indices()
.updateAliases(ub -> ub.actions(ab -> ab.remove(rb -> rb.index(origIndexName).alias(readName))))
.acknowledged();
client.indices().delete(d -> d.index(List.of(origIndexName))).acknowledged();
successful.addAndGet(1);
log.debug("Indexing successful for: {}", entity.javaClass().getName());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.exceptionally(throwable -> {
failures.addAndGet(1);
try {
/**
* 1. Delete the new index
* 2. Create alias association back for write alias to the original Index
*/
client.indices().delete(d -> d.index(List.of(newIndexName))).acknowledged();
client
.indices()
.updateAliases(ub -> ub.actions(ab -> ab.add(rb -> rb.index(origIndexName).alias(writeName))))
.acknowledged();
} catch (IOException e) {
throw new RuntimeException(e);
}
log.error("Mass indexing failed!: {} {}", entity.javaClass().getName(), throwable);
return null;
});
} catch (IOException e) {
log.error("Mass indexing attempt failed!: {} {}", entity.javaClass().getName(), e);
throw new RuntimeException(e);
}
});
} catch (IOException e) {
log.error("Mass indexing attempt failed!: {} {}", entity.javaClass().getName(), e);
throw new RuntimeException(e);
}
}
}
答案1
得分: 0
这将在后台以并行方式有效地重新索引所有类型,因为 MassIndexer#start
不会阻塞您的线程。您将同时启动与类型数量相等的大量索引器,因此无论 typesToIndexInParallel()
如何设置,都将导致高度并行的重新索引。
如果您想一次只索引一个类型,请使用 MassIndexer#startAndWait()
。
如果您想同时并行索引 N
种类型,请按以下方式拆分您的代码:
- 为所有类型创建新的索引
- 对所有类型执行大量索引器(
.massIndexer(Object.class)
)并使用typesToIndexInParallel(N)
- 为所有类型恢复别名
英文:
This will effectively reindex all your types in parallel, in the background, since MassIndexer#start
does not block your thread. You're starting as many mass indexers in parallel as you have types, so yes this will lead to heavily parallel reindexing, regardless of of typesToIndexInParallel()
.
If you want to index one type at a time, use MassIndexer#startAndWait()
.
If you want to index N
types at a time, in parallel, then split your code differently:
- Create new indexes for all types
- Execute the mass indexer on all types (
.massIndexer(Object.class)
) withtypesToIndexInParallel(N)
- Restore aliases for all types
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论