如何从 Akka Streams Sink 中抛出的异常中恢复?

huangapple go评论61阅读模式
英文:

How to recover from Exception thrown in Akka Streams Sink?

问题

如何从Akka Streams的Sink中抛出的异常中恢复?

简单示例:

    Source<Integer, NotUsed> integerSource = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));

    integerSource.runWith(Sink.foreach(x -> {
      if (x == 4) {
        throw new Exception("发生错误");
      }
      System.out.println("Sink:" + x);
    }), system);

输出:

    Sink:1
    Sink:2
    Sink:3

如何处理异常并继续处理来自源的下一个元素?(即 5、6、7、8、9)
英文:

How can I recover from an exception thrown in the Sink of Akka Streams?

Simple Example:

    Source&lt;Integer, NotUsed&gt; integerSource = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));

    integerSource.runWith(Sink.foreach(x -&gt; {
      if (x == 4) {
        throw new Exception(&quot;Error Occurred&quot;);
      }
      System.out.println(&quot;Sink: &quot; + x);
    }), system);

Output:

Sink: 1
Sink: 2
Sink: 3

How can I handle the exception and move on to the next element from the source? (aka 5,6,7,8,9)

答案1

得分: 3

默认情况下,监督策略 在抛出异常时会停止流。要将监督策略更改为丢弃导致异常的消息并继续处理下一条消息,请使用“resume”策略。例如:

final Function<Throwable, Supervision.Directive> decider =
  exc -> {
    return Supervision.resume();
  };

final Sink<Integer, CompletionStage<Done>> printSink =
  Sink.foreach(x -> {
    if (x == 4) {
      throw new Exception("Error Occurred");
    }
    System.out.println("Sink: " + x);
  });

final RunnableGraph<CompletionStage<Done>> runnableGraph =
  integerSource.toMat(printSink, Keep.right());

final RunnableGraph<CompletionStage<Done>> withResumingSupervision =
  runnableGraph.withAttributes(ActorAttributes.withSupervisionStrategy(decider));

final CompletionStage<Done> result = withResumingSupervision.run(system);

您还可以为不同类型的异常定义不同的监督策略:

final Function<Throwable, Supervision.Directive> decider =
  exc -> {
    if (exc instanceof MySpecificException) return Supervision.resume();
    else return Supervision.stop();
  };
英文:

By default, the supervision strategy stops the stream when an exception is thrown. To change the supervision strategy to drop an exception-causing message and proceed to the next message, use the "resume" strategy. For example:

final Function&lt;Throwable, Supervision.Directive&gt; decider =
  exc -&gt; {
    return Supervision.resume();
  };

final Sink&lt;Integer, CompletionStage&lt;Done&gt;&gt; printSink =
  Sink.foreach(x -&gt; {
    if (x == 4) {
      throw new Exception(&quot;Error Occurred&quot;);
    }
    System.out.println(&quot;Sink: &quot; + x);
  });

final RunnableGraph&lt;CompletionStage&lt;Done&gt;&gt; runnableGraph =
  integerSource.toMat(printSink, Keep.right());

final RunnableGraph&lt;CompletionStage&lt;Done&gt;&gt; withResumingSupervision =
  runnableGraph.withAttributes(ActorAttributes.withSupervisionStrategy(decider));

final CompletionStage&lt;Done&gt; result = withResumingSupervision.run(system);

You could also define different supervision strategies for different kinds of exceptions:

final Function&lt;Throwable, Supervision.Directive&gt; decider =
  exc -&gt; {
    if (exc instanceof MySpecificException) return Supervision.resume();
    else return Supervision.stop();
  };

huangapple
  • 本文由 发表于 2020年3月16日 22:31:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/60707839.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定