在 Akka Streams Sink 之后获取原始元素的引用?

huangapple go评论71阅读模式
英文:

Get reference of original element after Akka Streams Sink?

问题

我正在尝试将Amqp alpakka连接器用作源和接收器。

Source<CommittableReadResult, NotUsed> AMQP_SOURCE  
      -> processAndGetResponse 
      -> Sink<ByteString, CompletionStage<Done>> AMQP_SINK 

我希望在 Sink 操作成功后,确认从 Amqp 队列获取的消息。就像这样:

amqp_source(committableReadResult) 
    -> processAndGetResponse 
    -> amqp_sink 
    -> IfSinkOperationSuccess.Then(committableReadResult.ack())

我该如何实现这一点?基本上我希望在接收器操作成功后才标记消息为已确认。

英文:

I am trying to use the Amqp alpakka connector as a source and sink.

Source&lt;CommittableReadResult, NotUsed&gt; AMQP_SOURCE  
      -&gt; processAndGetResponse 
      -&gt; Sink&lt;ByteString, CompletionStage&lt;Done&gt;&gt; AMQP_SINK 

I want to acknowledge the message obtained from the Amqp queue, after the Sink's operation is successful. like this:

amqp_source(committableReadResult) 
    -&gt; processAndGetResponse 
    -&gt; amqp_sink 
    -&gt; IfSinkOperationSuccess.Then(committableReadResult.ack())

How can I achieve this? I basically want to mark the message as acknowledge only after the sink operation is successful.

答案1

得分: 0

一般来说,如果您有一个能够转化为 Future[Done](Scala API,在Java中对应的是 CompletionStage[Done])的 Sink,您可以在 mapAsync 阶段中使用该 Sink 来运行一个子流。只有在成功时数据才会通过。

从对 Alpakka AMQP API 的快速查阅来看,所有的 Sink 都是以这种方式转化的,所以这种方法是可行的。

假设您的 doSomethingWithMessage 函数会产生 CommittableReadResultWriteMessage,在 Scala 中可以使用类似以下的方式:

def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???

amqpSource
  .map(doSomethingWithMessage)
  .mapAsync(parallelism) { tup =>
    val (crr, wm) = tup
    val fut = Source.single(crr, wm).runWith(amqpSink)
    fut.flatMap(_ => crr.ack().map(_ => crr.message))
  }.runWith(Sink.ignore)
英文:

In general, if you have a Sink which materializes into a Future[Done] (Scala API, I believe the Java equivalent is CompletionStage[Done]), you can just run a substream with that Sink within a mapAsync stage. Data will only pass through when successful.

From a quick perusal of the Alpakka AMQP API, the Sinks all materialize that way, so that approach will work.

Assuming that your doSomethingWithMessage function results in both the CommittableReadResult and a WriteMessage, something like the following in Scala will work:

def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???

amqpSource
  .map(doSomethingWithMessage)
  .mapAsync(parallelism) { tup =&gt;
    val (crr, wm) = tup
    val fut = Source.single(crr, wm).runWith(amqpSink)
    fut.flatMap(_ =&gt; crr.ack().map(_ =&gt; crr.message))
  }.runWith(Sink.ignore)

huangapple
  • 本文由 发表于 2020年3月16日 23:52:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/60709142.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定