Webflux API中的批处理调用,当记录数量未知时

huangapple go评论49阅读模式
英文:

Batch calls in webflux api when number of records is not known

问题

我正在尝试批量调用API。例如:第一批将使用偏移量0和限制-10,000进行调用,第二批将使用偏移量-10,000和限制-10,000进行调用(带来10,000 - 20,000),第三批将使用偏移量-20,000和限制-10,000进行调用(带来20,000 - 30,000)。一旦获取所有记录,它将终止,但我看到的调用次数比预期多。

示例代码:

AtomicBoolean makeNextCall = new AtomicBoolean(true);
Flux.fromStream(Stream.iterate(0, i -> i + 1))
                    .takeWhile(integer -> {
                        LOGGER.withTask(GET_TRANSACTIONS)
                                .withMessage(String.format("Batch =[%s] and MaxResultsReturned = [%s]", integer, makeNextCall.get()))
                                .info();
                        return makeNextCall.get();
                    }).concatMap(counter -> {
                        int histOffset = counter * batchSize;
                        return bbTransactionRepository.accountTransactions(transactionContext, histOffset, batchSize)
                                .flatMap(tranList -> {
                                    int size = ((List<BBTransaction>) tranList).size();
                                    LOGGER.withTask(GET_TRANSACTIONS)
                                            .withAttribute(RESULT, size)
                                            .withAttribute(HIST_OFFSET, histOffset)
                                            .withAttribute(HIST_LIMIT, batchSize)
                                            .withAttribute(BATCH, counter)
                                            .withMessage("fetching bb transactions in batches")
                                            .info();
                                    boolean shouldContinue = size >= batchSize;
                                    makeNextCall.set(shouldContinue);
                                    return Mono.just(tranList);
                                });
                    })
                    .flatMap(Flux::fromIterable)
                    .collectList()

因此,对于26,000条记录,应该有3次调用,然后中断,因为3次调用(6,000 < batch.size(10,000)),但在UAT环境中我看到约33次调用,但在本地环境中工作正常。

英文:

I am trying to call the api, in batches. Ex :- First batch will call with offset 0 - limit -10,000
and second one with offset- 10,000, limit-10,000(bring 10,000 -20,000) and third one with offset-20,000 and limit -10,000(bring 20,000 - 30,000). It will break once it has fetched all records, but I see more number of calls than expected.

Sample code :

AtomicBoolean makeNextCall = new AtomicBoolean(true);
Flux.fromStream(Stream.iterate(0, i -&gt; i + 1))
                    .takeWhile(integer -&gt; {
                        LOGGER.withTask(GET_TRANSACTIONS)
                                .withMessage(String.format(&quot;Batch =[%s] and MaxResultsReturned = [%s]&quot;, integer, makeNextCall.get()))
                                .info();
                        return makeNextCall.get();
                    }).concatMap(counter -&gt; {
                        int histOffset = counter * batchSize;
                        return bbTransactionRepository.accountTransactions(transactionContext, histOffset, batchSize)
                                .flatMap(tranList -&gt; {
                                    int size = ((List&lt;BBTransaction&gt;) tranList).size();
                                    LOGGER.withTask(GET_TRANSACTIONS)
                                            .withAttribute(RESULT, size)
                                            .withAttribute(HIST_OFFSET, histOffset)
                                            .withAttribute(HIST_LIMIT, batchSize)
                                            .withAttribute(BATCH, counter)
                                            .withMessage(&quot;fetching bb transactions in batches&quot;)
                                            .info();
                                    boolean shouldContinue = size &gt;= batchSize;
                                    makeNextCall.set(shouldContinue);
                                    return Mono.just(tranList);
                                });
                    })
                    .flatMap(Flux::fromIterable)
                    .collectList()

So for 26,000 records, there should be 3 calls and then break since 3 call(6,000 < batch.size(10,000)
But I see around 33 calls in UAT env, it works correctly in local env though.

答案1

得分: 1

不确定我是否完全理解这段代码,但验证流程的最佳方法是使用StepVerifier创建一个测试。

至于批处理,我建议简化代码并使用Flux.buffer来处理数据,使用Flux.takeUntil在满足条件时取消发布者。

以下是处理数据的代码段:

private Flux<List<Integer>> processInBatch(int batchSize) {
    AtomicInteger offset = new AtomicInteger();

    return Flux.range(0, Integer.MAX_VALUE)
            .buffer(batchSize)
            .concatMap(batch -> {
                var histOffset = offset.getAndAdd(batch.size());

                log.info("offset: {}, batch: {}", histOffset, batch.size());
                return accountTransactions(histOffset, batch.size());
            })
            .doOnNext(res -> log.info("res: {}", res.size()))
            .takeUntil(res -> res.size() < batchSize);
}

以下是验证流程的测试代码:

@Test
void validateBuffer() {
   StepVerifier.create(processInBatch(26000, 10000))
       .expectNextCount(3)
       .verifyComplete();
}

希望这对你有所帮助。

英文:

Not sure I fully understand the code but the best way to validate the flow is to create a test using StepVerifier.

As for batch processing I would suggest to simplify the code and use Flux.buffer to process data and Flux.takeUntil to cancel the publisher when condition matches.

private Flux&lt;List&lt;Integer&gt;&gt; processInBatch(int batchSize) {
    AtomicInteger offset = new AtomicInteger();

    return Flux.range(0, Integer.MAX_VALUE)
            .buffer(batchSize)
            .concatMap(batch -&gt; {
                var histOffset = offset.getAndAdd(batch.size());

                log.info(&quot;offset: {}, batch: {}&quot;, histOffset, batch.size());
                return accountTransactions(histOffset, batch.size());
            })
            .doOnNext(res -&gt; log.info(&quot;res: {}&quot;, res.size()))
            .takeUntil(res -&gt; res.size() &lt; batchSize);
}

and here is a test to verify the flow

@Test
void validateBuffer() {
   StepVerifier.create(processInBatch(26000, 10000))
       .expectNextCount(3)
       .verifyComplete();
}
23:00:11.341  [Test worker] INFO - offset: 0, batch: 10000
23:00:11.369  [Test worker] INFO - res: 10000
23:00:11.370  [Test worker] INFO - offset: 10000, batch: 10000
23:00:11.371  [Test worker] INFO - res: 10000
23:00:11.372  [Test worker] INFO - offset: 20000, batch: 10000
23:00:11.372  [Test worker] INFO - res: 6000

huangapple
  • 本文由 发表于 2023年2月16日 12:44:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/75467942.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定