如何在akka streams中停止处理后续元素?

huangapple go评论69阅读模式
英文:

How to stop processing further elements in akka streams?

问题

我有一个整数列表 {2,4,6,8,9,10,12}

为了简化我的问题,我的目标是获取所有偶数整数,直到遇到奇数。所以我的结果应该是 -> {2,4,6,8, 9}

此外,我有一个演员说一个数字是偶数还是奇数(为了简单起见)

我已经完成了以下工作:-

CompletionStage<List<Integer>> result = Source.from(integerList)
      .ask(oddEvenActor, OddEvenResponse.class, Timeout.apply(1, TimeUnit.SECONDS))
      .map(oddEvenResult -> if(oddEvenResult.isOdd()){
                                //停止进一步处理元素
                            }
                            else {
                                return oddEvenResult.number();
                            })
     .runWith(Sink.seq(), materializer)

那么,当我遇到奇数元素时,如何停止进一步处理元素?

CompletionStage "result" 应该在流完成后包含 2,4,6,8,9。

我查看了 statefulMapConcat (https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html)
然而,这仍然会处理 9 之后的其他元素,因为演员仍然会被 "asked"。

当然,我可以做以下事情:-

  1. 有一个 resultList 变量(全局),我执行 resultList.add(oddEvenResult.number()),然后一旦遇到奇数就抛出异常。我必须编写一个自定义异常类来携带这个全局的 resultList。

  2. 使用 takeWhile,如 @Jeffrey Chung 建议的,但 OddEvenActoor 仍然会被 "asked" 处理元素 10 和 12。这是没有意义的。

有没有更简洁的方法来实现这一点?

英文:

I have a list of Integers {2,4,6,8,9,10,12}

To simplify my problem, My goal is to get all even integers until I encounter an odd number. So my result should be -> {2,4,6,8, 9}

Also, i have an actor that says if a number is even or odd (for simplicity)

I have done the following :-

CompletionStage<List<Integer>> result = Source.from(integerList)
      .ask(oddEvenActor, OddEvenResponse.class, Timeout.apply(1, TimeUnit.SECONDS))
      .map(oddEvenResult -> if(oddEvenResult.isOdd()){
                                //stop processing further elements
                            }
                            else {
                                return oddEvenResult.number();
                            })
     .runWith(Sink.seq(), materializer)

So how can i stop the proceessing of further elements as soon as i encounter an odd element?

The CompletionStage "result" should contain 2,4,6,8,9 once the stream is complete.

I checked out the statefulMapConcat (https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html)
However, this will still process the other elements after 9 as the actor will still be "asked"

Of course I can do the following :-

  1. Have a resultList variable (global) and i do a resultList.add(oddEvenResult.number()) and then Throw an exception once i encounter odd number. I have to write a custom exception class to piggy back this global resultList.

  2. Use takeWhile as suggested by @Jeffrey Chung, but the OddEvenActoor is still "asked" to process elements 10 and 12. This is pointless.

Is there a cleaner way to achieve this?

答案1

得分: 3

使用 takeWhile。在Scala中,这会类似于以下内容:

implicit val timeout: akka.util.Timeout = 3.seconds

val result: Future[Seq[Int]] =
  Source(List(2, 4, 6, 8, 9, 10, 12))
    .ask[OddEvenResponse](oddEvenActor)
    .takeWhile(resp => !resp.isOdd, true)
    .map(_.number)
    .runWith(Sink.seq)

请注意,在调用 takeWhile 时使用了 inclusive 布尔标志,如果您想保留第一个奇数数值,这是必要的。

Java 等效代码会类似。

英文:

Use takeWhile. In Scala, this would be something like the following:

implicit val timeout: akka.util.Timeout = 3.seconds

val result: Future[Seq[Int]] =
  Source(List(2, 4, 6, 8, 9, 10, 12))
    .ask[OddEvenResponse](oddEvenActor)
    .takeWhile(resp => !resp.isOdd, true)
    .map(_.number)
    .runWith(Sink.seq)

Note the use of the inclusive boolean flag in the invocation of takeWhile, which is necessary if you want to keep the first odd number.

The Java equivalent would look similar.

答案2

得分: 0

以下是代码部分的翻译:

如果您想保留您的演员可以实现它例如在Scala中

implicit val system = ActorSystem("StopOnOdd")
implicit val materializer = ActorMaterializer()

class StopOnOdd extends Actor with ActorLogging {
  override def receive: Receive = {
    case x: Int if x % 2 == 0 =>
      log.info(s"Just received an even int: $x")
      sender() ! x
    case x: Int if x % 2 == 1 =>
      log.info(s"Just received an odd number: $x 停止处理。")
      context.become(dontProcess)
    case _ =>
  }

  private def dontProcess: Receive = {
    case x =>
      log.info(s"Dropping $x 因为接收到奇数。")
  }
}

def main(args: Array[String]): Unit = {
  val stopOnOdd = system.actorOf(Props[StopOnOdd], "simpleActor")

  val source = Source(List(2,4,6,8,9,10,12))
  implicit val timeout: Timeout = Timeout(2.seconds)
  val stopOnOddFlow = Flow[Int].ask[Int](parallelism = 1)(stopOnOdd)

  source.via(stopOnOddFlow).to(Sink.foreach[Int](number => println(s"Got number: $number"))).run()
}

输出为:

[INFO] [07/29/2020 16:37:14.618] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个偶数:2
得到数字:2
[INFO] [07/29/2020 16:37:14.625] [StopOnOdd-akka.actor.default-dispatcher-2] 
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个偶数:4
得到数字:4
得到数字:6
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个偶数:6
得到数字:8
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个偶数:8
[INFO] [07/29/2020 16:37:14.628] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个奇数:9 停止处理。
英文:

If you'd like to keep your actor you can implement it, for example as the following(in Scala):

implicit val system = ActorSystem("StopOnOdd")
implicit val materializer = ActorMaterializer()
class StopOnOdd extends Actor with ActorLogging {
override def receive: Receive = {
case x: Int if x % 2 == 0 =>
log.info(s"Just received an even int: $x")
sender() ! x
case x: Int if x % 2 == 1 =>
log.info(s"Just received an odd number: $x Stop processing.")
context.become(dontProcess)
case _ =>
}
private def dontProcess: Receive = {
case x =>
log.info(s"Dropping $x because odd number was received.")
}
}
def main(args: Array[String]): Unit = {
val stopOnOdd = system.actorOf(Props[StopOnOdd], "simpleActor")
val source = Source(List(2,4,6,8,9,10,12))
implicit val timeout: Timeout = Timeout(2.seconds)
val stopOnOddFlow = Flow[Int].ask[Int](parallelism = 1)(stopOnOdd)
source.via(stopOnOddFlow).to(Sink.foreach[Int](number => println(s"Got number: $number"))).run()
}

The output is:

[INFO] [07/29/2020 16:37:14.618] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] Just received an even int: 2
Got number: 2
[INFO] [07/29/2020 16:37:14.625] [StopOnOdd-akka.actor.default-dispatcher-2] 
[akka://StopOnOdd/user/simpleActor] Just received an even int: 4
Got number: 4
Got number: 6
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] Just received an even int: 6
Got number: 8
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] Just received an even int: 8
[INFO] [07/29/2020 16:37:14.628] [StopOnOdd-akka.actor.default-dispatcher-4] 
[akka://StopOnOdd/user/simpleActor] Just received an odd number: 9 Stop processing.

huangapple
  • 本文由 发表于 2020年7月29日 20:40:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/63153839.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定