Akka流,条件性divertLeft

huangapple go评论64阅读模式
英文:

Akka streams, conditional divertLeft

问题

我有一个名为divertLeft的函数,基于这里的代码:https://bszwej.medium.com/akka-streams-error-handling-7ff9cc01bc12,该函数将Left值导向指定的汇聚器,并将Right值传递给下游的消费者。

def divertLeft(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
  flow.via {
    Flow[(Either[L, R], CO)]
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) => (element, c) }
          .to(to),
        _._1.isLeft
      )
      .collect { case (Right(element), c) => (element, c) }
  }
}

实际上,上游的处理在某些情况下返回Left(new Exception(...)),在其他情况下返回Left(new Error(...)),我希望对它们进行不同的处理。

这可能不太理想,但我希望它可能会起作用...

def divertLeftIgnoreError(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
  /* 与上面相同,但如果Left值是Error,则忽略它而不是导向给定的目标 */
  flow.via {
    Flow[(Either[L, R], CO)]
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) if element.isInstanceOf[Error] =>  (element, c) }
          .to(Sink.ignore),
        _._1.isLeft
      )
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) if !element.isInstanceOf[Error] => (element, c) }
          .to(to),
        _._1.isLeft
      )
      .collect { case (Right(element), c) => (element, c) }
  }
}

... 但它似乎不起作用。所有的Left值似乎都被忽略了,可能是因为.collect没有达到我期望的效果,因此消息只是从函数的末端掉落而没有被处理。您能够以这种方式组合divertTo吗?

我也考虑过编写一个谓词(传递给divertTo),以便“_._1.isLeft以及该Left的内容是一个Exception”,但我无法弄清楚其语法。

尝试处理不同类型的Left值是否基本上是不切实际的?如果是的话,我应该使用什么模式来处理这个问题?

英文:

I have a divertLeft function, based on the code here: https://bszwej.medium.com/akka-streams-error-handling-7ff9cc01bc12, which will divert Lefts to the specified sink, and pass Rights onward to the downstream consumer.

def divertLeft(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
  flow.via {
    Flow[(Either[L, R], CO)]
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) => (element, c) }
          .to(to),
        _._1.isLeft
      )
      .collect { case (Right(element), c) => (element, c) }
  }
}

The actual processing upstream, returns Left(new Exception(...)) in some circumstances, and Left(new Error(...)) in others, and I'd like to handle these differently.

This is rather unlovable, but I'd hoped it might work...

def divertLeftIgnoreError(to: Graph[SinkShape[(L, CO)], Mat]): FlowWithContext[I, CI, R, CO, Mat] = {
  /* As above, but if the Left value is an Error, then ignore it instead of diverting to the given destination */
  flow.via {
    Flow[(Either[L, R], CO)]
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) if element.isInstanceOf[Error] =>  (element, c) }
          .to(Sink.ignore),
        _._1.isLeft
      )
      .divertTo(
        Flow[(Either[L, R], CO)]
          .collect { case (Left(element), c) if !element.isInstanceOf[Error] => (element, c) }
          .to(to),
        _._1.isLeft
      )
      .collect { case (Right(element), c) => (element, c) }
  }
}

... it doesn't. All the Lefts seem to be ignored, possibly because the .collect doesn't do what I think it does, and so the messages just fall off the end of the function without being handled. Can you even compose divertTo in that way?

I'd also considered writing a predicate (to pass to divertTo) for "_._1.isLeft and also the contents of that Left is an Exception", but I can't figure out the syntax for that.

Is trying to handle different kinds of Left just fundamentally ill-conceived? If so, what pattern should I be using to handle this?

答案1

得分: 3

你的问题出在divertTo的谓词函数,就像你想的那样。与谓词匹配的任何元素都会被重定向。

在你的情况下,首先divertTo会将所有的Left都重定向。然后,接收器只会收集错误并将它们发送到忽略接收器。其他的Left会被collect过滤掉。

你需要的确实是一个更精确的谓词,就像你在collectcase中写的那样:

item => item._1 match {
  case Left(e) if e.isInstanceOf[Error] => true
  case _ => false
}

(这只是一个编写它的示例,实际上还有其他的编写方式,比如更内联的_._1.left.exists(_.isInstanceOf[Error]),选择你喜欢的方式)。

对每个谓词都做同样的事情,它应该能按预期工作。

请注意,如果你要忽略Left(Error(..),你可以先过滤掉它们,然后保留一个单独的divertTo

英文:

Your issue is in the predicate function of divertTo as you thought. Any element that match the predicate will be diverted.

In your case, first divertTo diverts all Left. Then the sink collects only the errors and send them to ignore sink. The other Lefts are filtered out by collect.

What you want is indeed a predicate more precise, like the one you wrote in the case of collect:

item => item._1 match {
  case Left(e) if e.isInstanceOf[Error] => true
  case _ => false
}

(This is only an example of how to write it, there are actually other ways to write it like more inline _._1.left.exists(_.isInstanceOf[Error]), pick the one you prefer).

Do the same for each predicate and it should work as expected.

Note that if you're going to ignore the Left(Error(..), you could filter them our first and keep a single divertTo.

huangapple
  • 本文由 发表于 2023年7月11日 01:43:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/76656138.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定