英文:
Is there a version of shareIn() that uses the current coroutine scope?
问题
fun fetchData(scope: CoroutineScope){
val net = flow {
emit(expensiveNetworkCall())
}.shareIn(scope, SharingStarted.Lazily)
val computeFlow0 = net.map{ compute0(it) }
val computeFlow = net.map{ compute(it) }
return merge(computeFlow0, computeFlow)
}
英文:
I'm trying to convert the following Rxjava code to Kt flows:
fun fetchData(){
val net = Observable.fromCallable {
expensiveNetworkCall()
}
.share()
val computeFlow0= net.map{ compute0(it) }
val computeFlow = net.map{ compute(it) }
return Observable.merge(computeFlow0, computeFlow)
}
I see that Kotlin has shareIn()
which is basically the same as Rxjava's share()
, but it needs a CoroutineScope.
I could add a parameter to fetchData(CoroutineScope)
to mirror shareIn()
, but is there a better way to do this? Is there a version of shareIn() that assumes the same CoroutineScope as the original Flow?
答案1
得分: 2
关于你上一个问题:普通的冷流(Plain cold Flows)和可变共享流(MutableSharedFlows)没有 CoroutineScopes,所以没有可以访问的内容。而不是可变共享流的共享流(SharedFlows)也没有提供任何访问其运行范围的方式。
我对 Rx 不太了解,只是浏览了其文档,但我认为你可以在 Kotlin 中构建等效的代码,而不使用 CoroutineScope 参数。使用 flow
构建器创建一个用于发射项的暂停上下文。这最终将产生一个使用收集它的任何内容的范围的冷流。lambda 中的代码将在每次收集器开始收集此冷流时运行。
fun fetchData() = flow {
val net = expensiveNetworkCall() // 如果函数阻塞,请使用 withContext 包装
val computeFlow0 = flow { emit(compute0(net)) }
val computeFlow = flow { emit(compute(net)) }
emitAll(merge(computeFlow0, computeFlow))
}
另外,我认为你可以更简单地使用 channelFlow
构建它,而无需创建要合并的流。channelFlow
的 lambda 接收者是一个 CoroutineScope,因此你可以在其中启动子协程来发射项目。在 channelFlow
内部,你可以通过使用 send()
发射项目到结果流中。
fun fetchData() = channelFlow {
val net = expensiveNetworkCall() // 如果函数阻塞,请使用 withContext 包装
launch { send(compute0(net)) }
launch { send(compute(net)) } // 此行上的 launch { } 是可选的
}
假设你不只是需要进行单个昂贵的调用,而是一整个昂贵的流程:
fun fetchData() = channelFlow {
someColdFlow().collect {
launch { send(compute0(it)) }
launch { send(compute(it)) } // 此行上的 launch { } 是可选的
}
}
英文:
Regarding your last question: Plain cold Flows and MutableSharedFlows do not have CoroutineScopes, so there would be nothing to access. And SharedFlows that are not MutableSharedFlows don't provide any way to access the scope they are running in.
I don't know Rx beyond browsing its documentation, but I think this is how you could build the equivalent in Kotlin without using a CoroutineScope parameter. Use a flow
builder to create a suspending context for emitting the items. This will ultimately produce a cold flow that uses the scope of whatever collects it. The code in the lambda will be run each time a collector starts collecting this cold flow.
fun fetchData() = flow {
val net = expensiveNetworkCall() // wrap in withContext if fun blocks
val computeFlow0 = flow { emit(compute0(net)) }
val computeFlow = flow { emit(compute(net)) }
emitAll(merge(computeFlow0, computeFlow))
}
Alternatively, I think you could build it using channelFlow
a little more simply without creating flows to merge. channelFlow
's lambda receiver is a CoroutineScope, so you can launch child coroutines inside it to emit the items. Inside a channelFlow
, you emit items to the resulting flow by using send()
.
fun fetchData() = channelFlow {
val net = expensiveNetworkCall() // wrap in withContext if fun blocks
launch { send(compute0(net)) }
launch { send(compute(net)) } // launch { } is optional on this line
}
Supposing you had not just a single expensive call to make, but a whole expensive flow of stuff:
fun fetchData() = channelFlow {
someColdFlow().collect {
launch { send(compute0(it)) }
launch { send(compute(it)) } // launch { } is optional on this line
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论