如何在 Observable 中循环并限制每次获取的项目数量

huangapple go评论74阅读模式
英文:

How to loop and limit the number of items fetched each time in an Observable

问题

以下是您要翻译的内容:

我有以下的 Observable,它接收 Kafka 消费者记录并将它们插入到数据库中。
目前它能够正常工作,我可以在消费者中按预期接收数据,并从中提取数据进行一些映射,然后放入一个列表中。
然后,这个列表中的数据将被插入到数据库中。

按照当前的编写方式,它将尝试同时插入所有内容。Kafka 记录可能会包含 10 万到 100 万条记录。
我希望能够将这些记录分割,每次仅获取 1000 个项目从消费者记录中,插入到数据库中,然后对下一个 1000 个项目重复此过程,直到记录为空。这种做法可行吗?

我尝试过使用 take、takeUntil 结合 repeat 的变种,但它们无法工作。一旦我订阅后,调用立即结束,甚至在执行这些操作时都不进入 Observable。

我能否得到一些建议,关于如何编写代码以从 Kafka 记录中获取 1000 条记录,插入到数据库中,并在处理完所有 Kafka 记录之前持续进行此操作?谢谢。

请注意,我正在使用 RXJava 1,需要坚持使用此版本。

private final static AtomicInteger INSERT_COUNT = new AtomicInteger(1000);
private final static AtomicInteger RECORD_COUNT = new AtomicInteger();
private final static AtomicInteger REMAINDER = new AtomicInteger();
private final static AtomicInteger REPEAT_COUNT = new AtomicInteger();

public Observable<KafkaConsumerRecord<String, CustomObj>> dbInsert(KafkaConsumerRecords<String, CustomObj> records) {

    return Observable.just(records.getDelegate().records())
            // 尝试基于以下计数循环。不是首选方法,但不确定是否有更好的方法。
            // 此处捕获的数字目前无关紧要,因为无法使用 takeUntil 和 repeat 使其正常工作。
            .doOnSubscribe(() -> {
                RECORD_COUNT.set(records.getDelegate().records().count());
                REMAINDER.set(RECORD_COUNT.get() % INSERT_COUNT.get() == 0 ? 0 : 1);
                REPEAT_COUNT.set((RECORD_COUNT.get() / INSERT_COUNT.get()) + REMAINDER.get());
            })
            .map(consumerRecords -> consumerRecords.records("Topic name"))
            .map(it -> {
                List<CustomRequest> requests = new ArrayList<>();
                it.forEach(r -> {
                    ConsumerRecord<String, SomeObj> record = (ConsumerRecord<String, SomeObj>) r;
                    CustomRequest request = new CustomRequest(
                        new String(record.headers().headers("id").iterator().next().value(), StandardCharsets.UTF_8),
                        Long.parseLong(new String(record.headers().headers("code").iterator().next().value(), StandardCharsets.UTF_8)),
                        record.value()
                    );
                    requests.add(request);
                });
                return requests;
            })
            // 如果取消注释以下部分,什么都不会发生。
            // .takeUntil(customRequests -> customRequests.size() == INSERT_COUNT.get())
            // .repeat(REPEAT_COUNT.get())
            .doOnNext(customRequests -> {
                // 在这里进行每次事务中的 1000 次数据库插入操作。
            })
            .doOnCompleted(() -> System.out.println("Completed"));
}
英文:

I have the following Observable which receives kafka consumer records and inserts them into a database.
It is currently working where I can receive the data as expected in the consumer and extracting those to perform some mapping and put it into a list.
Data in this list will then be inserted into the DB.

The way it is written right now, it is going to attempt to insert everything at the same time. There are chances for the kafka record to hold between 100k - 1 Million records.
I am looking for a way to break this up such that I only take 1000 items from the consumer records, insert into DB and repeat again for the next 1000 items and keep going till the records is empty. Is this possible?

I attempted to use variations of take, takeuntil with repeat, but they do not work. As in after I subscribe, the call just ends, does not even enter the observable when I do these.

Could I get some advice on how I could write this such that I can fetch 1000 records from the kafka records, insert them to db and keep doing this until done with all kafka records? Thanks.

Please note I am using RXJava 1 and need to stick to this version.

private final static AtomicInteger INSERT_COUNT = new AtomicInteger(1000);
private final static AtomicInteger RECORD_COUNT = new AtomicInteger();
private final static AtomicInteger REMAINDER = new AtomicInteger();
private final static AtomicInteger REPEAT_COUNT = new AtomicInteger();
public Observable&lt;KafkaConsumerRecord&lt;String, CustomObj&gt;&gt; dbInsert(KafkaConsumerRecords&lt;String, CustomObj&gt; records) {
return Observable.just(records.getDelegate().records())
// attempting to loop based on following counts. Not preferred but unsure of a better way.
// the figures captured here are correct.
// plus this doesn&#39;t currently matter anyway cos not able to get it to work using takeUntil, repeat. 
.doOnSubscribe(() -&gt; {
RECORD_COUNT.set(records.getDelegate().records().count());
REMAINDER.set(RECORD_COUNT.get() % INSERT_COUNT.get() == 0 ? 0 : 1);
REPEAT_COUNT.set((RECORD_COUNT.get() / INSERT_COUNT.get()) + REMAINDER.get());
})
.map(consumerRecords -&gt; consumerRecords.records(&quot;Topic name&quot;))
.map(it -&gt; {
List&lt;CustomRequest&gt; requests = new ArrayList&lt;&gt;();
it.forEach(r -&gt; {
ConsumerRecord&lt;String, SomeObj&gt; record = (ConsumerRecord&lt;String, SomeObj&gt;) r;
CustomRequest request = new CustomRequest (
new String(record.headers().headers(&quot;id&quot;).iterator().next().value(), StandardCharsets.UTF_8),
Long.parseLong(new String(record.headers().headers(&quot;code&quot;).iterator().next().value(), StandardCharsets.UTF_8)),
record.value()
);
requests.add(request);
});
return requests;
})
// nothing happens if I uncomment these. 
// .takeUntil(customRequests -&gt; customRequests.size() == INSERT_COUNT.get())
// .repeat(REPEAT_COUNT.get())
.doOnNext(customRequests -&gt; {
// planning to do some db inserts here in a transaction of 1000 inserts at a time. 
})
.doOnCompleted(() -&gt; System.out.println(&quot;Completed&quot;));
}

答案1

得分: 1

以下内容适用于 RxJava 1.3.8:

rx.Observable.from(List.of(1, 2, 3, 4, 5, 6))
  .buffer(2)
  .doOnNext(r -> System.out.println(r))
  .subscribe();

输出如下:

[1, 2]
[3, 4]
[5, 6]

我使用以下版本来测试上述代码:

<dependency>
  <groupId>io.reactivex</groupId>
  <artifactId>rxjava</artifactId>
  <version>1.3.8</version>
</dependency>
英文:

The following should work with RxJava 1.3.8

rx.Observable.from(List.of(1, 2, 3, 4, 5, 6))
.buffer(2)
.doOnNext(r -&gt; System.out.println(r))
.subscribe();

following was the output -

[1, 2]
[3, 4]
[5, 6]

I used following version to test the above code -

&lt;dependency&gt;
&lt;groupId&gt;io.reactivex&lt;/groupId&gt;
&lt;artifactId&gt;rxjava&lt;/artifactId&gt;
&lt;version&gt;1.3.8&lt;/version&gt;
&lt;/dependency&gt;

huangapple
  • 本文由 发表于 2020年7月27日 01:59:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/63103786.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定