是的,有一个使用当前协程范围的shareIn()版本吗?

huangapple go评论58阅读模式
英文:

Is there a version of shareIn() that uses the current coroutine scope?

问题

fun fetchData(scope: CoroutineScope){
    val net = flow {
        emit(expensiveNetworkCall())
    }.shareIn(scope, SharingStarted.Lazily)

    val computeFlow0 = net.map{ compute0(it) }
    val computeFlow = net.map{ compute(it) }
    return merge(computeFlow0, computeFlow)
}
英文:

I'm trying to convert the following Rxjava code to Kt flows:

fun fetchData(){
    val net = Observable.fromCallable {
            expensiveNetworkCall()
          }
          .share()
    
    val computeFlow0= net.map{ compute0(it) }
    val computeFlow = net.map{ compute(it) }
    return Observable.merge(computeFlow0, computeFlow)
}

I see that Kotlin has shareIn() which is basically the same as Rxjava's share(), but it needs a CoroutineScope.
I could add a parameter to fetchData(CoroutineScope) to mirror shareIn(), but is there a better way to do this? Is there a version of shareIn() that assumes the same CoroutineScope as the original Flow?

答案1

得分: 2

关于你上一个问题:普通的冷流(Plain cold Flows)和可变共享流(MutableSharedFlows)没有 CoroutineScopes,所以没有可以访问的内容。而不是可变共享流的共享流(SharedFlows)也没有提供任何访问其运行范围的方式。

我对 Rx 不太了解,只是浏览了其文档,但我认为你可以在 Kotlin 中构建等效的代码,而不使用 CoroutineScope 参数。使用 flow 构建器创建一个用于发射项的暂停上下文。这最终将产生一个使用收集它的任何内容的范围的冷流。lambda 中的代码将在每次收集器开始收集此冷流时运行。

fun fetchData() = flow {
    val net = expensiveNetworkCall() // 如果函数阻塞,请使用 withContext 包装
    val computeFlow0 = flow { emit(compute0(net)) }
    val computeFlow = flow { emit(compute(net)) }
    emitAll(merge(computeFlow0, computeFlow))
}

另外,我认为你可以更简单地使用 channelFlow 构建它,而无需创建要合并的流。channelFlow 的 lambda 接收者是一个 CoroutineScope,因此你可以在其中启动子协程来发射项目。在 channelFlow 内部,你可以通过使用 send() 发射项目到结果流中。

fun fetchData() = channelFlow {
    val net = expensiveNetworkCall() // 如果函数阻塞,请使用 withContext 包装
    launch { send(compute0(net)) }
    launch { send(compute(net)) } // 此行上的 launch { } 是可选的
}

假设你不只是需要进行单个昂贵的调用,而是一整个昂贵的流程:

fun fetchData() = channelFlow {
    someColdFlow().collect {
        launch { send(compute0(it)) }
        launch { send(compute(it)) } // 此行上的 launch { } 是可选的
    }
}
英文:

Regarding your last question: Plain cold Flows and MutableSharedFlows do not have CoroutineScopes, so there would be nothing to access. And SharedFlows that are not MutableSharedFlows don't provide any way to access the scope they are running in.

I don't know Rx beyond browsing its documentation, but I think this is how you could build the equivalent in Kotlin without using a CoroutineScope parameter. Use a flow builder to create a suspending context for emitting the items. This will ultimately produce a cold flow that uses the scope of whatever collects it. The code in the lambda will be run each time a collector starts collecting this cold flow.

fun fetchData() = flow {
    val net = expensiveNetworkCall() // wrap in withContext if fun blocks
    val computeFlow0 = flow { emit(compute0(net)) }
    val computeFlow = flow { emit(compute(net)) }
    emitAll(merge(computeFlow0, computeFlow))
}

Alternatively, I think you could build it using channelFlow a little more simply without creating flows to merge. channelFlow's lambda receiver is a CoroutineScope, so you can launch child coroutines inside it to emit the items. Inside a channelFlow, you emit items to the resulting flow by using send().

fun fetchData() = channelFlow {
    val net = expensiveNetworkCall() // wrap in withContext if fun blocks
    launch { send(compute0(net)) }
    launch { send(compute(net)) } // launch { } is optional on this line
}

Supposing you had not just a single expensive call to make, but a whole expensive flow of stuff:

fun fetchData() = channelFlow {
    someColdFlow().collect {
        launch { send(compute0(it)) }
        launch { send(compute(it)) } // launch { } is optional on this line
    }
}

huangapple
  • 本文由 发表于 2023年5月11日 05:17:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/76222611.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定