Akka Streams 在条件失败后仍继续处理下一个元素

huangapple go评论109阅读模式
英文:

Akka Streams takeWhile processing next element even after condition fails

问题

以下是翻译好的内容:

我有一个非常简单的演员只是打印数字-

public class PrintLineActor extends AbstractLoggingActor {

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(Integer.class, i -> {
          System.out.println("Processing: " + i);
          sender().tell(i, self());
        }).build();
  }
}


现在我有一个流来打印偶数直到遇到奇数元素为止-

@Test
public void streamsTest() throws Exception {

  ActorSystem system = ActorSystem.create("testSystem");
  ActorRef printActor = system.actorOf(Props.create(PrintLineActor.class));

  Integer[] intArray = new Integer[]{2,4,6,8,9,10,12};
  CompletionStage<List<Integer>> result = Source.from(Arrays.asList(intArray))
      .ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS))
      .takeWhile(i -> i != 9)
      .runWith(Sink.seq(), ActorMaterializer.create(system));

  List<Integer> result1 = result.toCompletableFuture().get();
  System.out.println("Result :- ");
  result1.forEach(System.out::println);
}

**不希望**9之后的任何元素被处理也就是不会发送给演员但是我看到数字"10"也被演员处理了但没有12),如下输出所示-

Processing: 2
Processing: 4
Processing: 6
Processing: 8
Processing: 9
Processing: 10 //为什么演员会处理这个?
  
Result :- 
2
4
6
8

为什么会处理10如何停止这个

编辑

我尝试通过记录事件的时间戳来调试只是想看看10是否在9完全完成之前被处理但是10是在9完全处理后被处理的以下是日志-

在请求之前2时间戳为1596035906509
演员内部处理2时间戳为1596035906509
在TakeWhile中2时间戳为1596035906509

在请求之前4时间戳为1596035906609
演员内部处理4时间戳为1596035906610
在TakeWhile中4时间戳为1596035906610

在请求之前6时间戳为1596035906712
演员内部处理6时间戳为1596035906712
在TakeWhile中6时间戳为1596035906712

在请求之前8时间戳为1596035906814
演员内部处理8时间戳为1596035906814
在TakeWhile中8时间戳为1596035906815

在请求之前9时间戳为1596035906915
演员内部处理9时间戳为1596035906915
在TakeWhile中9时间戳为1596035906916

在请求之前10时间戳为1596035907017 //因此,在完全处理9后,才处理了10
演员内部处理10时间戳为1596035907017

Result :- 
2
4
6
8

另外如果我用直接的.map(print..)替换.ask那么10就不会被打印所以为什么在涉及演员.ask时会发生这种情况对我来说很奇怪
英文:

I have a very simple actor that just prints the number :-

public class PrintLineActor extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, i -&gt; {
System.out.println(&quot;Processing: &quot; + i);
sender().tell(i, self());
}).build();
}
}

Now, I have a stream to print the even numbers until I encounter an odd element:-

  @Test
public void streamsTest() throws Exception {
ActorSystem system = ActorSystem.create(&quot;testSystem&quot;);
ActorRef printActor = system.actorOf(Props.create(PrintLineActor.class));
Integer[] intArray = new Integer[]{2,4,6,8,9,10,12};
CompletionStage&lt;List&lt;Integer&gt;&gt; result = Source.from(Arrays.asList(intArray))
.ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS))
.takeWhile(i -&gt; i != 9)
.runWith(Sink.seq(), ActorMaterializer.create(system));
List&lt;Integer&gt; result1 = result.toCompletableFuture().get();
System.out.println(&quot;Result :- &quot;);
result1.forEach(System.out::println);
}

I do NOT expect any element after 9 being processed aka being sent to actor. However, I see the number "10" also being processed by actor (but not 12) as seen in below output

Processing: 2
Processing: 4
Processing: 6
Processing: 8
Processing: 9
Processing: 10 //WHY IS THIS BEING PROCESSED BY ACTOR??
Result :- 
2
4
6
8

Why is 10 being processed by actor? How to stop this?

EDIT:

I have tried debugging by recording timestamp of events, just to see if 10 is being processed before 9 actually completes, but no, 10 is taken after 9 is processed fully. here are the logs :-

Before Ask: 2 in 1596035906509
Processing inside Actor: 2 at 1596035906509
Inside TakeWhile 2 at  in 1596035906509
Before Ask: 4 in 1596035906609
Processing inside Actor: 4 at 1596035906610
Inside TakeWhile 4 at  in 1596035906610
Before Ask: 6 in 1596035906712
Processing inside Actor: 6 at 1596035906712
Inside TakeWhile 6 at  in 1596035906712
Before Ask: 8 in 1596035906814
Processing inside Actor: 8 at 1596035906814
Inside TakeWhile 8 at  in 1596035906815
Before Ask: 9 in 1596035906915
Processing inside Actor: 9 at 1596035906915
Inside TakeWhile 9 at  in 1596035906916
Before Ask: 10 in 1596035907017 //so 10 is taken much after the 9 is processed fully
Processing inside Actor: 10 at 1596035907017
Result :- 
2
4
6
8

Also, if i replace the .ask with a direct .map(print..), then 10 does not get printed. So why this happens when actor.ask is involved is very strange to me.

答案1

得分: 1

因为您异步地向printActor发出了请求,而不是同步打印。在您的确切运行中:

  • 消息9到达PrintActor,打印“处理中:9”
  • 消息10到达PrintActor,打印“处理中:10”
  • 您的Akka流接收到来自PrintActor的消息9的响应消息,完成Akka流,因此在结果中既没有9也没有10。

要解决确切的问题,请移除异步请求并改为同步打印。但不确定PrintActor是否只是一个类比,让我知道。

英文:

Because you ask printActor asynchronously rather than print synchronously. In your exact run:

  • message 9 arrives at PrintActor, prints "Processing: 9"
  • message 10 arrives at PrintActor, prints "Processing: 10"
  • Your Akka stream receives the response message for message 9 from PrintActor, complete the Akka stream, therefore in the result there is neither 9 nor 10.

To solve the exact problem, remove the async ask and print synchronously instead. But not sure if PrintActor is just an analogy, let me know.

答案2

得分: 0

The akka streams are buffering the values thru the different streams. Please note that 10 was precessed, but it is not part of the result. If you wish, you can configure the buffer size:

.ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS)).buffer(1, OverflowStrategy.backpressure)
英文:

The akka streams are buffering the values thru the different streams. Please note that 10 was precessed, but it is not part of the result. If you wish, you can configure the buffer size:

.ask(1, printActor, Integer.class, Timeout.apply(10, TimeUnit.SECONDS)).buffer(1, OverflowStrategy.backpressure)

huangapple
  • 本文由 发表于 2020年7月29日 22:57:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/63156501.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定