Zero-downtime indexing with Hibernate-Search

huangapple go评论60阅读模式
英文:

Zero-downtime indexing with Hibernate-Search

问题

尝试实现根据以下建议进行的零停机重建索引的逻辑:https://docs.jboss.org/hibernate/search/6.2/reference/en-US/html_single/#backend-elasticsearch-indexlayout-strategy-simple。
依赖详情:

  1. springBootVersion=3.1.1
  2. hibernateVersion=6.1.7.Final
  3. hibernate_search=6.2.0.Beta1
  4. 后端:Elastic Search

以下是我们实现的伪代码。一切都在功能上执行。

然而,在如何启动MassIndexer(方法processIndex)方面需要一些建议。我们选择的方法是为每个实体执行一次MassIndexer

尝试将massindexer作为一个类级别的变量字段,但它会导致应用程序启动失败。

提前致谢!

以下是MassIndexServiceImpl.java的伪代码:

// 代码省略...
@Service
@Transactional
public class MassIndexServiceImpl implements MassIndexService {

    private final Logger log = LoggerFactory.getLogger(MassIndexServiceImpl.class);

    private final EntityManager entityManager;

    private final MassIndexingMonitor monitor;

    private final ElasticsearchClient client;

    private final SearchSession searchSession;

    public MassIndexServiceImpl(EntityManager entityManager) {
        // 代码省略...
    }

    @Async
    @Override
    public void reindexAliases() {
        // 代码省略...
    }

    @Async
    @Override
    public void reindexAliasList(List<String> types) {
        // 代码省略...
    }

    public void processIndex(AtomicInteger successful, AtomicInteger failures, SearchIndexedEntity<?> entity) {
        // 代码省略...
    }
}

希望对您有所帮助。

英文:

Trying to implement the logic for Zero-downtime reindexing as advised on: https://docs.jboss.org/hibernate/search/6.2/reference/en-US/html_single/#backend-elasticsearch-indexlayout-strategy-simple.
Dependency details:

  1. springBootVersion=3.1.1
  2. hibernateVersion=6.1.7.Final
  3. hibernate_search=6.2.0.Beta1
  4. backend: Elastic Search

Below is the pseudo code that we have implemented. Everything executes functionally.

However need some suggestions in implementing on how to initiate MassIndexer (method processIndex). The approach we have opted is executing MassIndexer one per each entity.

Tried to use massindexer as a class level variable field but it fails the application start.

Thanks in advance!!

Here is the pseudo code for MassIndexServiceImpl.java:

import MassIndexService;
import org.apache.commons.****;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.hibernate.search.backend.elasticsearch.***;
import org.hibernate.search.mapper.orm.***;
import org.hibernate.search.mapper.pojo.massindexing.***;
import org.slf4j.*;
import org.springframework.****;
import java.*****;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.IndexState;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import jakarta.persistence.EntityManager;
@Service
@Transactional
public class MassIndexServiceImpl implements MassIndexService {
private final Logger log = LoggerFactory.getLogger(MassIndexServiceImpl.class);
private final EntityManager entityManager;
private final MassIndexingMonitor monitor;
private final ElasticsearchClient client;
private final SearchSession searchSession;
public MassIndexServiceImpl(EntityManager entityManager) {
this.entityManager = entityManager;
// Create the low-level client
RestClient restClient = RestClient.builder(new HttpHost(&quot;localhost&quot;, 9200)).build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
// And create the API client
client = new ElasticsearchClient(transport);
monitor = new PojoMassIndexingLoggingMonitor(1000);
searchSession = Search.session(entityManager);
}
/**
* Zero-Downtime Implementation Method for MassIndexing All Entities in the running instance
*
* @param
* @return void.
*/
@Async
@Override
public void reindexAliases() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
SearchMapping mapping = Search.mapping(entityManager.getEntityManagerFactory());
AtomicInteger successful = new AtomicInteger();
AtomicInteger failures = new AtomicInteger();
mapping
.allIndexedEntities()
.stream()
.forEach(entity -&gt; {
processIndex(successful, failures, entity);
});
stopWatch.stop();
log.error(
&quot;Parallel reIndexing complete - in {} with {} successful and {} failed indexes of the Total: {}&quot;,
stopWatch.formatTime(),
successful.get(),
failures.get(),
mapping.allIndexedEntities().size()
);
}
/**
* Zero-Downtime Implementation Method for MassIndexing Selected few Entities in the running instance
*
* @param types
* @return void.
*/
@Async
@Override
public void reindexAliasList(List&lt;String&gt; types) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
SearchMapping mapping = Search.mapping(entityManager.getEntityManagerFactory());
AtomicInteger successful = new AtomicInteger();
AtomicInteger failures = new AtomicInteger();
mapping
.allIndexedEntities()
.stream()
.filter(sEntity -&gt; types.stream().anyMatch(e -&gt; e.equals(sEntity.jpaName())))
.forEach(entity -&gt; {
processIndex(successful, failures, entity);
});
stopWatch.stop();
log.error(
&quot;Parallel reIndexing complete - in {} with {} successful and {} failed indexes of the Total: {}&quot;,
stopWatch.formatTime(),
successful.get(),
failures.get(),
mapping.allIndexedEntities().size()
);
}
/**
* Zero-Downtime Implementation Utility for MassIndexing in the running instance
*
* @param successful
* @param failures
* @param entity
* @return void.
*/
public void processIndex(AtomicInteger successful, AtomicInteger failures, SearchIndexedEntity&lt;?&gt; entity) {
/**
* Get the Alias names from the entity descriptor {@Link: ElasticsearchIndexDescriptor}
*/
ElasticsearchIndexDescriptor descriptor = entity.indexManager().unwrap(ElasticsearchIndexManager.class).descriptor();
String readName = descriptor.readName();
String writeName = descriptor.writeName();
try {
/**
* Get the Original index name from Elastic search client {@link: co.elastic.clients.elasticsearch.ElasticsearchClient}
*/
Optional&lt;String&gt; iName = MapUtils
.emptyIfNull(client.indices().getAlias(b -&gt; b.name(writeName)).result())
.entrySet()
.stream()
.findAny()
.map(entry -&gt; entry.getKey());
iName.ifPresent(origIndexName -&gt; {
try {
String newIndexName = StringUtils.substringBefore(origIndexName, &quot;-&quot;) + &quot;-&quot; + Instant.now().getEpochSecond();
IndexState indexState = client.indices().get(g -&gt; g.index(origIndexName)).get(origIndexName);
/**
* Remove the alias association for write index to present index
*/
client
.indices()
.updateAliases(ub -&gt; ub.actions(ab -&gt; ab.remove(rb -&gt; rb.index(origIndexName).alias(writeName))))
.acknowledged();
/**
* Create new Index by copying all settings, mappings and aliases to new Index
*/
client
.indices()
.create(ci -&gt;
ci
.index(newIndexName)
.mappings(m -&gt; m.allField(indexState.mappings().allField()))
.mappings(indexState.mappings())
.settings(sb -&gt;
sb
.analysis(indexState.settings().index().analysis())
.mapping(m -&gt;
m
.totalFields(t -&gt; t.limit(200000))
.depth(t -&gt; t.limit(20))
.nestedFields(t -&gt; t.limit(200000))
.nestedObjects(t -&gt; t.limit(20000))
)
)
.aliases(indexState.aliases())
)
.index();
log.debug(&quot;indexing for: {}&quot;, entity.javaClass());
/**
* Initiate massIndexing
*/
MassIndexer massIndexer = searchSession.massIndexer(entity.javaClass());
massIndexer
.monitor(monitor)
.batchSizeToLoadObjects(500)
.threadsToLoadObjects(2)
.typesToIndexInParallel(1)
.start()
.thenRun(() -&gt; {
try {
/**
* 1. Remove the alias association for read index to present index
* 2. Delete the original Index
*/
client
.indices()
.updateAliases(ub -&gt; ub.actions(ab -&gt; ab.remove(rb -&gt; rb.index(origIndexName).alias(readName))))
.acknowledged();
client.indices().delete(d -&gt; d.index(List.of(origIndexName))).acknowledged();
successful.addAndGet(1);
log.debug(&quot;Indexing successful for: {}&quot;, entity.javaClass().getName());
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.exceptionally(throwable -&gt; {
failures.addAndGet(1);
try {
/**
* 1. Delete the new index
* 2. Create alias association back for write alias to the original Index
*/
client.indices().delete(d -&gt; d.index(List.of(newIndexName))).acknowledged();
client
.indices()
.updateAliases(ub -&gt; ub.actions(ab -&gt; ab.add(rb -&gt; rb.index(origIndexName).alias(writeName))))
.acknowledged();
} catch (IOException e) {
throw new RuntimeException(e);
}
log.error(&quot;Mass indexing failed!: {} {}&quot;, entity.javaClass().getName(), throwable);
return null;
});
} catch (IOException e) {
log.error(&quot;Mass indexing attempt failed!: {} {}&quot;, entity.javaClass().getName(), e);
throw new RuntimeException(e);
}
});
} catch (IOException e) {
log.error(&quot;Mass indexing attempt failed!: {} {}&quot;, entity.javaClass().getName(), e);
throw new RuntimeException(e);
}
}
}

答案1

得分: 0

这将在后台以并行方式有效地重新索引所有类型,因为 MassIndexer#start 不会阻塞您的线程。您将同时启动与类型数量相等的大量索引器,因此无论 typesToIndexInParallel() 如何设置,都将导致高度并行的重新索引。

如果您想一次只索引一个类型,请使用 MassIndexer#startAndWait()

如果您想同时并行索引 N 种类型,请按以下方式拆分您的代码:

  1. 为所有类型创建新的索引
  2. 对所有类型执行大量索引器(.massIndexer(Object.class))并使用 typesToIndexInParallel(N)
  3. 为所有类型恢复别名
英文:

This will effectively reindex all your types in parallel, in the background, since MassIndexer#start does not block your thread. You're starting as many mass indexers in parallel as you have types, so yes this will lead to heavily parallel reindexing, regardless of of typesToIndexInParallel().

If you want to index one type at a time, use MassIndexer#startAndWait().

If you want to index N types at a time, in parallel, then split your code differently:

  1. Create new indexes for all types
  2. Execute the mass indexer on all types (.massIndexer(Object.class)) with typesToIndexInParallel(N)
  3. Restore aliases for all types

huangapple
  • 本文由 发表于 2023年7月6日 16:03:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/76626722.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定