“pass through mechanism with fs2 Stream” 可以翻译为 “使用fs2 Stream的传递机制”。

huangapple go评论56阅读模式
英文:

pass through mechanism with fs2 Stream

问题

请问有人知道如何使用fs2 Stream复制通过机制(类似于Akka Stream),但使用fs2 Stream?

我想要能够将对象的值分割成两个子流,当这两个子流发出值时将它们重新合并。

其中一个子流将什么都不做,只是等待另一个流发出一个值,然后将两个值一起压缩。

case class Message(offset: Int, record: String)

Stream[Message] -> (在此处插入offset)Pipe[Int, Int]        -> (压缩输出)Pipe[Message, Message]
                -> (在此处插入record)Pipe[String, String]
英文:

Would anyone know how to reproduce a pass through mechanism (as with Akka Stream) but with fs2 Stream ?

I would like to be able to split the values of on Object into 2 substreams, zipping them back together when the 2 substreams emit values.

One of the substreams would do nothing, only waiting for the other stream to emit a value and zip both values together.

case class Message(offset: Int, record: String)

Stream[Message] ->(offset goes here)Pipe[Int, Int]        -> (zip outputs)Pipe[Message, Message]
                -> (record goes here)Pipe[String, String]

答案1

得分: 1

Here is the translated content:

很遗憾,从流的角度来看,您想要做的事情是没有意义的。
首先,您不能将Stream拆分为两个独立的流,因为其中一个的生命周期依赖于另一个的生命周期。
其次,如果将整个Stream暴露给用户,那么他们将能够调用许多可能增加或减少元素数量的操作,因此您将失去一对一的语义。

我可以想到三种解决您问题的方法。

1. 不要这样做

我知道,这不是一个真正的解决方案,但这是一个有效的选项。
看看像fs2-kafka这样的库,它们不隐藏偏移量,而是强制用户跟踪它。

2. 创建自己的流投影

创建一个自定义类,其中内部包含一个Stream[Message],但只公开一对一的方法并隐藏偏移量。然后,您可以请求使用该数据类型的函数。

3. 使用队列进行并发(危险)

我知道拆分流的唯一方法是使用Queuescats-effect的那种)。
思路很简单,首先,您将使用evalMap将每个偏移量发送到一个Queue,然后只需投影消息。然后,您可以在该流上使用用户提供的Pipe使用through。最后,您可以再次使用evalMapQueue中检索偏移量。

同样,如果用户违反一对一的语义,这可能导致数据损坏。


无论如何,我建议您也在typelevel Discord服务器上咨询,那里的人可能会提供不同/更好的建议。

英文:

Sadly what you want to do doesn't make sense from the stream point of view.<br>
First, you can't split a Stream into two independent ones since the life cycle of one depends on the life cycle of the other.<br>
Second, if you expose the whole Stream to your users, then they would be able to call a lot of things that may increase or reduce the number of elements, thus you would lose the one-to-one semantics.

I can think of three "solutions" to your problem.

1. Don't do it

I know, I know. This is not a solution per se, but is a valid option.<br>
Take a look to libraries like fs2-kafka they don't hide the off-set, rather they force their users to keep track of it.

2. Create your own Stream projection

Create a custom class that internally contains a Stream[Message] but that exposes only one-to-one methods and hides the offset. Then you can ask for a function using that data type.

3. Concurrency with Queues (dangerous)

The only way I know of splitting streams is by using Queues (the cats-effect ones).

The idea is simple, first, you are going to use evalMap to send each offset into a Queue and then just project the message. Then, you can use through on that stream using the Pipe the user provided. Finally, you use another evalMap to retrieve the offset again from the Queue.

Again, this may lead to corruption if users break the one-to-one semantics.


Anyways, I would recommend also asking on the typelevel Discord server, folks there may have a different / better advice.

huangapple
  • 本文由 发表于 2023年5月17日 19:26:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/76271574.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定