英文:
How to stop processing further elements in akka streams?
问题
我有一个整数列表 {2,4,6,8,9,10,12}
为了简化我的问题,我的目标是获取所有偶数整数,直到遇到奇数。所以我的结果应该是 -> {2,4,6,8, 9}
此外,我有一个演员说一个数字是偶数还是奇数(为了简单起见)
我已经完成了以下工作:-
CompletionStage<List<Integer>> result = Source.from(integerList)
.ask(oddEvenActor, OddEvenResponse.class, Timeout.apply(1, TimeUnit.SECONDS))
.map(oddEvenResult -> if(oddEvenResult.isOdd()){
//停止进一步处理元素
}
else {
return oddEvenResult.number();
})
.runWith(Sink.seq(), materializer)
那么,当我遇到奇数元素时,如何停止进一步处理元素?
CompletionStage "result" 应该在流完成后包含 2,4,6,8,9。
我查看了 statefulMapConcat (https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html)
然而,这仍然会处理 9 之后的其他元素,因为演员仍然会被 "asked"。
当然,我可以做以下事情:-
-
有一个 resultList 变量(全局),我执行 resultList.add(oddEvenResult.number()),然后一旦遇到奇数就抛出异常。我必须编写一个自定义异常类来携带这个全局的 resultList。
-
使用 takeWhile,如 @Jeffrey Chung 建议的,但 OddEvenActoor 仍然会被 "asked" 处理元素 10 和 12。这是没有意义的。
有没有更简洁的方法来实现这一点?
英文:
I have a list of Integers {2,4,6,8,9,10,12}
To simplify my problem, My goal is to get all even integers until I encounter an odd number. So my result should be -> {2,4,6,8, 9}
Also, i have an actor that says if a number is even or odd (for simplicity)
I have done the following :-
CompletionStage<List<Integer>> result = Source.from(integerList)
.ask(oddEvenActor, OddEvenResponse.class, Timeout.apply(1, TimeUnit.SECONDS))
.map(oddEvenResult -> if(oddEvenResult.isOdd()){
//stop processing further elements
}
else {
return oddEvenResult.number();
})
.runWith(Sink.seq(), materializer)
So how can i stop the proceessing of further elements as soon as i encounter an odd element?
The CompletionStage "result" should contain 2,4,6,8,9 once the stream is complete.
I checked out the statefulMapConcat (https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/statefulMapConcat.html)
However, this will still process the other elements after 9 as the actor will still be "asked"
Of course I can do the following :-
-
Have a resultList variable (global) and i do a resultList.add(oddEvenResult.number()) and then Throw an exception once i encounter odd number. I have to write a custom exception class to piggy back this global resultList.
-
Use takeWhile as suggested by @Jeffrey Chung, but the OddEvenActoor is still "asked" to process elements 10 and 12. This is pointless.
Is there a cleaner way to achieve this?
答案1
得分: 3
使用 takeWhile
。在Scala中,这会类似于以下内容:
implicit val timeout: akka.util.Timeout = 3.seconds
val result: Future[Seq[Int]] =
Source(List(2, 4, 6, 8, 9, 10, 12))
.ask[OddEvenResponse](oddEvenActor)
.takeWhile(resp => !resp.isOdd, true)
.map(_.number)
.runWith(Sink.seq)
请注意,在调用 takeWhile
时使用了 inclusive
布尔标志,如果您想保留第一个奇数数值,这是必要的。
Java 等效代码会类似。
英文:
Use takeWhile
. In Scala, this would be something like the following:
implicit val timeout: akka.util.Timeout = 3.seconds
val result: Future[Seq[Int]] =
Source(List(2, 4, 6, 8, 9, 10, 12))
.ask[OddEvenResponse](oddEvenActor)
.takeWhile(resp => !resp.isOdd, true)
.map(_.number)
.runWith(Sink.seq)
Note the use of the inclusive
boolean flag in the invocation of takeWhile
, which is necessary if you want to keep the first odd number.
The Java equivalent would look similar.
答案2
得分: 0
以下是代码部分的翻译:
如果您想保留您的演员,可以实现它,例如在Scala中:
implicit val system = ActorSystem("StopOnOdd")
implicit val materializer = ActorMaterializer()
class StopOnOdd extends Actor with ActorLogging {
override def receive: Receive = {
case x: Int if x % 2 == 0 =>
log.info(s"Just received an even int: $x")
sender() ! x
case x: Int if x % 2 == 1 =>
log.info(s"Just received an odd number: $x 停止处理。")
context.become(dontProcess)
case _ =>
}
private def dontProcess: Receive = {
case x =>
log.info(s"Dropping $x 因为接收到奇数。")
}
}
def main(args: Array[String]): Unit = {
val stopOnOdd = system.actorOf(Props[StopOnOdd], "simpleActor")
val source = Source(List(2,4,6,8,9,10,12))
implicit val timeout: Timeout = Timeout(2.seconds)
val stopOnOddFlow = Flow[Int].ask[Int](parallelism = 1)(stopOnOdd)
source.via(stopOnOddFlow).to(Sink.foreach[Int](number => println(s"Got number: $number"))).run()
}
输出为:
[INFO] [07/29/2020 16:37:14.618] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个偶数:2
得到数字:2
[INFO] [07/29/2020 16:37:14.625] [StopOnOdd-akka.actor.default-dispatcher-2]
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个偶数:4
得到数字:4
得到数字:6
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个偶数:6
得到数字:8
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个偶数:8
[INFO] [07/29/2020 16:37:14.628] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] 刚刚收到一个奇数:9 停止处理。
英文:
If you'd like to keep your actor you can implement it, for example as the following(in Scala):
implicit val system = ActorSystem("StopOnOdd")
implicit val materializer = ActorMaterializer()
class StopOnOdd extends Actor with ActorLogging {
override def receive: Receive = {
case x: Int if x % 2 == 0 =>
log.info(s"Just received an even int: $x")
sender() ! x
case x: Int if x % 2 == 1 =>
log.info(s"Just received an odd number: $x Stop processing.")
context.become(dontProcess)
case _ =>
}
private def dontProcess: Receive = {
case x =>
log.info(s"Dropping $x because odd number was received.")
}
}
def main(args: Array[String]): Unit = {
val stopOnOdd = system.actorOf(Props[StopOnOdd], "simpleActor")
val source = Source(List(2,4,6,8,9,10,12))
implicit val timeout: Timeout = Timeout(2.seconds)
val stopOnOddFlow = Flow[Int].ask[Int](parallelism = 1)(stopOnOdd)
source.via(stopOnOddFlow).to(Sink.foreach[Int](number => println(s"Got number: $number"))).run()
}
The output is:
[INFO] [07/29/2020 16:37:14.618] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] Just received an even int: 2
Got number: 2
[INFO] [07/29/2020 16:37:14.625] [StopOnOdd-akka.actor.default-dispatcher-2]
[akka://StopOnOdd/user/simpleActor] Just received an even int: 4
Got number: 4
Got number: 6
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] Just received an even int: 6
Got number: 8
[INFO] [07/29/2020 16:37:14.627] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] Just received an even int: 8
[INFO] [07/29/2020 16:37:14.628] [StopOnOdd-akka.actor.default-dispatcher-4]
[akka://StopOnOdd/user/simpleActor] Just received an odd number: 9 Stop processing.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论