英文:
Batch calls in webflux api when number of records is not known
问题
我正在尝试批量调用API。例如:第一批将使用偏移量0和限制-10,000进行调用,第二批将使用偏移量-10,000和限制-10,000进行调用(带来10,000 - 20,000),第三批将使用偏移量-20,000和限制-10,000进行调用(带来20,000 - 30,000)。一旦获取所有记录,它将终止,但我看到的调用次数比预期多。
示例代码:
AtomicBoolean makeNextCall = new AtomicBoolean(true);
Flux.fromStream(Stream.iterate(0, i -> i + 1))
.takeWhile(integer -> {
LOGGER.withTask(GET_TRANSACTIONS)
.withMessage(String.format("Batch =[%s] and MaxResultsReturned = [%s]", integer, makeNextCall.get()))
.info();
return makeNextCall.get();
}).concatMap(counter -> {
int histOffset = counter * batchSize;
return bbTransactionRepository.accountTransactions(transactionContext, histOffset, batchSize)
.flatMap(tranList -> {
int size = ((List<BBTransaction>) tranList).size();
LOGGER.withTask(GET_TRANSACTIONS)
.withAttribute(RESULT, size)
.withAttribute(HIST_OFFSET, histOffset)
.withAttribute(HIST_LIMIT, batchSize)
.withAttribute(BATCH, counter)
.withMessage("fetching bb transactions in batches")
.info();
boolean shouldContinue = size >= batchSize;
makeNextCall.set(shouldContinue);
return Mono.just(tranList);
});
})
.flatMap(Flux::fromIterable)
.collectList()
因此,对于26,000条记录,应该有3次调用,然后中断,因为3次调用(6,000 < batch.size(10,000)),但在UAT环境中我看到约33次调用,但在本地环境中工作正常。
英文:
I am trying to call the api, in batches. Ex :- First batch will call with offset 0 - limit -10,000
and second one with offset- 10,000, limit-10,000(bring 10,000 -20,000) and third one with offset-20,000 and limit -10,000(bring 20,000 - 30,000). It will break once it has fetched all records, but I see more number of calls than expected.
Sample code :
AtomicBoolean makeNextCall = new AtomicBoolean(true);
Flux.fromStream(Stream.iterate(0, i -> i + 1))
.takeWhile(integer -> {
LOGGER.withTask(GET_TRANSACTIONS)
.withMessage(String.format("Batch =[%s] and MaxResultsReturned = [%s]", integer, makeNextCall.get()))
.info();
return makeNextCall.get();
}).concatMap(counter -> {
int histOffset = counter * batchSize;
return bbTransactionRepository.accountTransactions(transactionContext, histOffset, batchSize)
.flatMap(tranList -> {
int size = ((List<BBTransaction>) tranList).size();
LOGGER.withTask(GET_TRANSACTIONS)
.withAttribute(RESULT, size)
.withAttribute(HIST_OFFSET, histOffset)
.withAttribute(HIST_LIMIT, batchSize)
.withAttribute(BATCH, counter)
.withMessage("fetching bb transactions in batches")
.info();
boolean shouldContinue = size >= batchSize;
makeNextCall.set(shouldContinue);
return Mono.just(tranList);
});
})
.flatMap(Flux::fromIterable)
.collectList()
So for 26,000 records, there should be 3 calls and then break since 3 call(6,000 < batch.size(10,000)
But I see around 33 calls in UAT env, it works correctly in local env though.
答案1
得分: 1
不确定我是否完全理解这段代码,但验证流程的最佳方法是使用StepVerifier
创建一个测试。
至于批处理,我建议简化代码并使用Flux.buffer
来处理数据,使用Flux.takeUntil
在满足条件时取消发布者。
以下是处理数据的代码段:
private Flux<List<Integer>> processInBatch(int batchSize) {
AtomicInteger offset = new AtomicInteger();
return Flux.range(0, Integer.MAX_VALUE)
.buffer(batchSize)
.concatMap(batch -> {
var histOffset = offset.getAndAdd(batch.size());
log.info("offset: {}, batch: {}", histOffset, batch.size());
return accountTransactions(histOffset, batch.size());
})
.doOnNext(res -> log.info("res: {}", res.size()))
.takeUntil(res -> res.size() < batchSize);
}
以下是验证流程的测试代码:
@Test
void validateBuffer() {
StepVerifier.create(processInBatch(26000, 10000))
.expectNextCount(3)
.verifyComplete();
}
希望这对你有所帮助。
英文:
Not sure I fully understand the code but the best way to validate the flow is to create a test using StepVerifier
.
As for batch processing I would suggest to simplify the code and use Flux.buffer
to process data and Flux.takeUntil
to cancel the publisher when condition matches.
private Flux<List<Integer>> processInBatch(int batchSize) {
AtomicInteger offset = new AtomicInteger();
return Flux.range(0, Integer.MAX_VALUE)
.buffer(batchSize)
.concatMap(batch -> {
var histOffset = offset.getAndAdd(batch.size());
log.info("offset: {}, batch: {}", histOffset, batch.size());
return accountTransactions(histOffset, batch.size());
})
.doOnNext(res -> log.info("res: {}", res.size()))
.takeUntil(res -> res.size() < batchSize);
}
and here is a test to verify the flow
@Test
void validateBuffer() {
StepVerifier.create(processInBatch(26000, 10000))
.expectNextCount(3)
.verifyComplete();
}
23:00:11.341 [Test worker] INFO - offset: 0, batch: 10000
23:00:11.369 [Test worker] INFO - res: 10000
23:00:11.370 [Test worker] INFO - offset: 10000, batch: 10000
23:00:11.371 [Test worker] INFO - res: 10000
23:00:11.372 [Test worker] INFO - offset: 20000, batch: 10000
23:00:11.372 [Test worker] INFO - res: 6000
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论