英文:
Get reference of original element after Akka Streams Sink?
问题
我正在尝试将Amqp alpakka连接器用作源和接收器。
Source<CommittableReadResult, NotUsed> AMQP_SOURCE
-> processAndGetResponse
-> Sink<ByteString, CompletionStage<Done>> AMQP_SINK
我希望在 Sink 操作成功后,确认从 Amqp 队列获取的消息。就像这样:
amqp_source(committableReadResult)
-> processAndGetResponse
-> amqp_sink
-> IfSinkOperationSuccess.Then(committableReadResult.ack())
我该如何实现这一点?基本上我希望在接收器操作成功后才标记消息为已确认。
英文:
I am trying to use the Amqp alpakka connector as a source and sink.
Source<CommittableReadResult, NotUsed> AMQP_SOURCE
-> processAndGetResponse
-> Sink<ByteString, CompletionStage<Done>> AMQP_SINK
I want to acknowledge the message obtained from the Amqp queue, after the Sink's operation is successful. like this:
amqp_source(committableReadResult)
-> processAndGetResponse
-> amqp_sink
-> IfSinkOperationSuccess.Then(committableReadResult.ack())
How can I achieve this? I basically want to mark the message as acknowledge only after the sink operation is successful.
答案1
得分: 0
一般来说,如果您有一个能够转化为 Future[Done]
(Scala API,在Java中对应的是 CompletionStage[Done]
)的 Sink
,您可以在 mapAsync
阶段中使用该 Sink
来运行一个子流。只有在成功时数据才会通过。
从对 Alpakka AMQP API 的快速查阅来看,所有的 Sink
都是以这种方式转化的,所以这种方法是可行的。
假设您的 doSomethingWithMessage
函数会产生 CommittableReadResult
和 WriteMessage
,在 Scala 中可以使用类似以下的方式:
def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???
amqpSource
.map(doSomethingWithMessage)
.mapAsync(parallelism) { tup =>
val (crr, wm) = tup
val fut = Source.single(crr, wm).runWith(amqpSink)
fut.flatMap(_ => crr.ack().map(_ => crr.message))
}.runWith(Sink.ignore)
英文:
In general, if you have a Sink
which materializes into a Future[Done]
(Scala API, I believe the Java equivalent is CompletionStage[Done]
), you can just run a substream with that Sink
within a mapAsync
stage. Data will only pass through when successful.
From a quick perusal of the Alpakka AMQP API, the Sink
s all materialize that way, so that approach will work.
Assuming that your doSomethingWithMessage
function results in both the CommittableReadResult
and a WriteMessage
, something like the following in Scala will work:
def doSomethingWithMessage(in: CommittableReadResult): (CommittableReadResult, WriteMessage) = ???
amqpSource
.map(doSomethingWithMessage)
.mapAsync(parallelism) { tup =>
val (crr, wm) = tup
val fut = Source.single(crr, wm).runWith(amqpSink)
fut.flatMap(_ => crr.ack().map(_ => crr.message))
}.runWith(Sink.ignore)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论