反应器 – 按组分组,不同组的并行执行

huangapple go评论116阅读模式
英文:

Reactor - Group By, Parallel execution of different groups

问题

我正在尝试使用Spring Project Reactor实现以下行为:

  1. 基于一个列表,我想根据一个属性进行分组。
  2. 我希望确保在组内顺序进行处理,但在组之间并行处理。

如何实现这个目标?我在SO上看到了几个类似的问题,但没有一个回答我的问题,而且示例也不能正常工作。

List<CurrencyWithValue> pairWithValueList = Arrays.asList(
    new CurrencyWithValue("EUR", 1.0),
    new CurrencyWithValue("USD", 1.0),
    // 其他货币数据...
);

Flux.fromIterable(pairWithValueList)
    .groupBy(v -> v.getCurrency())
    .flatMap(f -> f)
    .parallel(3)
    .runOn(Schedulers.newBoundedElastic(3, 1000, "k-task"))
    .subscribe(i -> System.out.println("Thread:" + Thread.currentThread().getName() + "::Data:" + i.getCurrency() + "::" + i.getAmount()));

每次执行结果都可能不同 - 有时println按顺序执行,有时不是。这是一次运行的结果:

Thread:k-task-3::Data:CHF::1.0
Thread:k-task-2::Data:USD::1.0
Thread:k-task-1::Data:EUR::1.0
Thread:k-task-2::Data:SEK::2.0
Thread:k-task-3::Data:EUR::2.0
Thread:k-task-2::Data:CHF::2.0
Thread:k-task-1::Data:SEK::1.0
Thread:k-task-2::Data:USD::3.0
Thread:k-task-3::Data:SEK::3.0
Thread:k-task-2::Data:USD::4.0
Thread:k-task-1::Data:USD::2.0
Thread:k-task-2::Data:SEK::5.0
Thread:k-task-3::Data:CHF::3.0
Thread:k-task-1::Data:EUR::3.0
Thread:k-task-3::Data:CHF::4.0
Thread:k-task-1::Data:EUR::4.0
Thread:k-task-1::Data:SEK::4.0

我尝试将flatMap替换为concatMapflatMapSequential,但结果仍然相同。我对Project Reactor是新手,正在努力实现我的期望行为。

有什么想法如何解决这个问题?感谢!

英文:

I am trying to use Spring Project Reactor to achieve the following behavior:

  1. Based on an List, I want to do grouping based on a property
  2. I want to make sure that the processing happens sequentially WITHIN a group, but parallel BETWEEN groups.

How can this be achieved? I have seen several similar questions on SO, but none of them answer my question and the examples are not working properly.

List&lt;CurrencyWithValue&gt; pairWithValueList = Arrays.asList(
                new CurrencyWithValue(&quot;EUR&quot;, 1.0),
                new CurrencyWithValue(&quot;USD&quot;, 1.0),
                new CurrencyWithValue(&quot;CHF&quot;, 1.0),
                new CurrencyWithValue(&quot;SEK&quot;, 1.0),
                new CurrencyWithValue(&quot;SEK&quot;, 2.0),
                new CurrencyWithValue(&quot;EUR&quot;, 2.0),
                new CurrencyWithValue(&quot;USD&quot;, 2.0),
                new CurrencyWithValue(&quot;CHF&quot;, 2.0),
                new CurrencyWithValue(&quot;SEK&quot;, 3.0),
                new CurrencyWithValue(&quot;EUR&quot;, 3.0),
                new CurrencyWithValue(&quot;USD&quot;, 3.0),
                new CurrencyWithValue(&quot;CHF&quot;, 3.0),
                new CurrencyWithValue(&quot;EUR&quot;, 4.0),
                new CurrencyWithValue(&quot;USD&quot;, 4.0),
                new CurrencyWithValue(&quot;CHF&quot;, 4.0),
                new CurrencyWithValue(&quot;SEK&quot;, 4.0),
                new CurrencyWithValue(&quot;SEK&quot;, 5.0)
        );


Flux.fromIterable(pairWithValueList).groupBy(v -&gt; v.getCurrency())
.flatMap(f -&gt; f)
.parallel(3)
.runOn(Schedulers.newBoundedElastic(3, 1000, &quot;k-task&quot;))
.subscribe(i -&gt; System.out.println(&quot;Thread:&quot; + Thread.currentThread().getName() + &quot;::Data:&quot; + i.getCurrency() + &quot;::&quot; + i.getAmount()));

The results are kinda different on every execution - sometimes the println happens in order, sometimes not.
Here is the result of one run:

Thread:k-task-3::Data:CHF::1.0
Thread:k-task-2::Data:USD::1.0
Thread:k-task-1::Data:EUR::1.0
Thread:k-task-2::Data:SEK::2.0
Thread:k-task-3::Data:EUR::2.0
Thread:k-task-2::Data:CHF::2.0
Thread:k-task-1::Data:SEK::1.0
Thread:k-task-2::Data:USD::3.0
Thread:k-task-3::Data:SEK::3.0
Thread:k-task-2::Data:USD::4.0
Thread:k-task-1::Data:USD::2.0
Thread:k-task-2::Data:SEK::5.0
Thread:k-task-3::Data:CHF::3.0
Thread:k-task-1::Data:EUR::3.0
Thread:k-task-3::Data:CHF::4.0
Thread:k-task-1::Data:EUR::4.0
Thread:k-task-1::Data:SEK::4.0

I have tried replacing flatMap with concatMap or flatMapSequential but the result is still the same.
I am new to Project Reactor and I am struggling to achieve my desired behavior.

Any ideas how i can solve this?
Thanks a lot

答案1

得分: 1

首先,你需要学会如何正确使用项目反应器。

例如,由 groupBy 创建的组会立即展开,因此从技术上讲,这类似于通过运算符添加效果,然后下一个运算符立即将其移除,以便之前根本不存在。

然后,parallel 正常工作,但它不能保证以完全顺序的方式处理所有相同的货币。

要解决问题,你只需要使用 groupBy 运算符和 publishOn,而不是 parallel + runOn


Flux.fromIterable(pairWithValueList)
    .groupBy(v -&gt; v.getCurrency())
    .flatMap(f -&gt; 
         f.publishOn(Schedulers.parallel())
          .doOnNext(i -&gt; System.out.println(&quot;Thread:&quot; + Thread.currentThread().getName() + &quot;::Data:&quot; + i.getCurrency() + &quot;::&quot; + i.getAmount())
    )
    .subscribe();

上述解决方案会起作用,但这里有多个注意事项。

可能出现的问题是 flatMap 运算符中存在的并发限制。默认情况下,并发设置为 256,因此您需要确保通过 groupBy 分配的组的数量小于或等于该限制。否则,您必须调整 flatMap 的并发性,并将其设置为可能的货币数量。

如果货币的数量不可预测(假设它是无限的),那么您需要定义一个不同的分组机制。例如,您可以使用首字母创建组(或计算一些哈希码,然后按以下方式操作 hash % 256):


Flux.fromIterable(pairWithValueList)
    .groupBy(v -&gt; v.getCurrency().charAt(0))
    .flatMap(f -&gt; 
         f.publishOn(Schedulers.parallel())
          .doOnNext(i -&gt; System.out.println(&quot;Thread:&quot; + Thread.currentThread().getName() + &quot;::Data:&quot; + i.getCurrency() + &quot;::&quot; + i.getAmount())
    )
    .subscribe();

其他注意事项

  1. 在上述分组时,您必须确保随着时间的推移,所有组的元素数量大致相同。确保您的分组算法是平衡的
  2. Schedulers.parallel() 的线程数受限于可用核心数。您可能希望创建自己的专用调度程序或使用 boundedElastic。
英文:

First of all, you need to learn how to use the project reactor properly.

For example, the groups created by groupBy are immediately flattened, so technically it is similar to adding an effect by an operator, and then the next operator removing it right away so the previous was not there at all.

Then, parallel is working properly but it does not offer you any guarantee that all the same currencies are processed sequentially.

To solve your problem you just need the groupBy operator and publishOn, instead of parallel + runOn:


Flux.fromIterable(pairWithValueList)
    .groupBy(v -&gt; v.getCurrency())
    .flatMap(f -&gt; 
         f.publishOn(Schedulers.parallel())
          .doOnNext(i -&gt; System.out.println(&quot;Thread:&quot; + Thread.currentThread().getName() + &quot;::Data:&quot; + i.getCurrency() + &quot;::&quot; + i.getAmount())
    )
    .subscribe();

The above solution is going to work, however, there are multiple caveats here.

The problem that may emerge is the concurrency limit that is present in the flatMap operator. By default, concurrency is set to 256, so you need to make sure that the number of allocated groups by groupBy is less or equal to that limit. Otherwise, you have to adjust the concurrency of flatMap and set it equal to the number of possible currencies.

If the number of currencies is unpredictable (let's assume it is an infinite number) then you need to define a different grouping mechanism. For example, you can use the first letter instead to create a group, (or calculate some hash code, and then do as follows hash % 256):


Flux.fromIterable(pairWithValueList)
    .groupBy(v -&gt; v.getCurrency().charAt(0))
    .flatMap(f -&gt; 
         f.publishOn(Schedulers.parallel())
          .doOnNext(i -&gt; System.out.println(&quot;Thread:&quot; + Thread.currentThread().getName() + &quot;::Data:&quot; + i.getCurrency() + &quot;::&quot; + i.getAmount())
    )
    .subscribe();

Other caveats

  1. When you do grouping as above, you have to make sure that all the groups have more or less the same number of elements within them over time. Make sure your grouping algorithm is balanced
  2. Schedulers.parallel() has a limited number of threads equal to a number of cores available. You may want to create your own dedicated scheduler or use boundedElastic instead

huangapple
  • 本文由 发表于 2023年6月16日 00:09:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/76483566.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定