英文:
What is the effect of buffer(0) in the collectLatest implementation in kotlin?
问题
在 Kotlin 源代码中,collectLatest
的实现解释如下:
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
/*
* Implementation note:
* buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
* ```
* flowOf(1, 2, 3).collectLatest {
* delay(1)
* println(it) // Expect only 3 to be printed
* }
* ```
*
* It's not the case for intermediate operators which users mostly use for interactive UI,
* where performance of dispatch is more important.
*/
mapLatest(action).buffer(0).collect()
}
它说:“在这里插入 buffer(0)
是为了满足用户在连续使用时的期望”,但是当我不使用 buffer(0)
时,我得到的结果与使用 buffer(0)
时相同:
flowOf(1, 2, 3).mapLatest {
delay(1)
println(it)
}.buffer(0).collect() // 输出 3
flowOf(1, 2, 3).mapLatest {
delay(1)
println(it)
}.collect() // 也输出 3
所以我猜想 buffer(0)
在这里不起作用。是否有人可以帮助解释一下?
当我查看 mapLatest
的源代码时,有一个注释说:“默认情况下,此操作符是带缓冲的,并且其输出缓冲区的大小可以通过后续应用的 buffer
操作符来更改”。
/**
* Returns a flow that emits elements from the original flow transformed by [transform] function.
* When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
*
* For example, the following flow:
* ```
* flow {
* emit("a")
* delay(100)
* emit("b")
* }.mapLatest { value ->
* println("Started computing $value")
* delay(200)
* "Computed $value"
* }
* ```
* will print "Started computing a" and "Started computing b", but the resulting flow will contain only "Computed b" value.
*
* This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
*/
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
transformLatest { emit(transform(it)) }
我不太明白为什么它说 mapLatest
的缓冲可以通过后续的 buffer
操作符来更改,因为当新的流到达时,前一个流将被取消。更改缓冲的含义是什么?
也许这个问题应该是:“mapLatest
/transformLatest
中的缓冲的效果是什么?”
英文:
The explanation of the implementation of collectLatest
in the Kotlin source code is as follows:
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
/*
* Implementation note:
* buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
* ```
* flowOf(1, 2, 3).collectLatest {
* delay(1)
* println(it) // Expect only 3 to be printed
* }
* ```
*
* It's not the case for intermediate operators which users mostly use for interactive UI,
* where performance of dispatch is more important.
*/
mapLatest(action).buffer(0).collect()
}
It says "buffer(0) is inserted here to fulfil user's expectations in sequential usages", but the results I get when I don't use buffer(0) are the same as when I use buffer(0):
flowOf(1, 2, 3).mapLatest {
delay(1)
println(it)
}.buffer(0).collect() // print 3
flowOf(1, 2, 3).mapLatest {
delay(1)
println(it)
}.collect() // print 3 too
So I guess buffer(0) doesn't work here. Can someone help explain it.
When I looked at the source code of mapLatest, there was a comment saying "This operator is buffered by default and size of its output buffer can be changed by applying subsequent buffer operator".
/**
* Returns a flow that emits elements from the original flow transformed by [transform] function.
* When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
*
* For example, the following flow:
* ```
* flow {
* emit("a")
* delay(100)
* emit("b")
* }.mapLatest { value ->
* println("Started computing $value")
* delay(200)
* "Computed $value"
* }
* ```
* will print "Started computing a" and "Started computing b", but the resulting flow will contain only "Computed b" value.
*
* This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
*/
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
transformLatest { emit(transform(it)) }
I don't quite understand why it says mapLatest
's buffer can be changed by applying subsequent buffer operator, because when a new flow arrives, the previous flow will be canceled. What is the meaning of changing the buffer?
Maybe this question should be "what is the effect of the buffer in mapLatest
/transformLatest
?".
答案1
得分: 1
潜在的线索是buffer
会以某种方式创建另一个协程。
fun main(): Unit = runBlocking { // this: CoroutineScope
val flow1 = flowOf(1, 2, 3)
// 未被取消(取消此作用域的CoroutineScope也会取消其子作用域)
launch {
while (true){
println("依然在运行")
delay(1000)
}
}
flow1.collectLatest {// it: Int
delay(10) // 延迟收集协程
println(it)
}
}
依然在运行
3
依然在运行
依然在运行
...
mapLatest
也发生了同样的事情。
flow1.mapLatest {// it: Int
delay(10) // 延迟收集协程
println(it)
}.collect()
我认为正常的collect
会延迟当前协程。
val start = System.currentTimeMillis()
flow1.collect{
delay(1000)
println("$it: ${System.currentTimeMillis() - start}")
}
println("${System.currentTimeMillis() - start}")
1: 1018
2: 2023
3: 3034
3034
但是,我无法打印不同的线程来确认它。
val start = System.currentTimeMillis()
println("${Thread.currentThread()} $coroutineContext")
flow1.collectLatest {
delay(1000)
println("$it: ${System.currentTimeMillis() - start} ${Thread.currentThread()} $coroutineContext")
}
println("${System.currentTimeMillis() - start}")
Thread[main,5,main] [BlockingCoroutine{Active}@532760d8, BlockingEventLoop@57fa26b7]
3: 1095 Thread[main,5,main] [BlockingCoroutine{Active}@532760d8, BlockingEventLoop@57fa26b7]
1107
我尝试在流中添加延迟。发射受到收集所花费的时间的影响。
val start = System.currentTimeMillis()
fun elapsedTime() = System.currentTimeMillis() - start
val flow1 = flow{
for(i in 1..5){
emit(i)
println("EMIT $i: ${Thread.currentThread()} ${elapsedTime()}")
delay(1000)
}
}
flow1.collect{
println("COLLECT $it: ${Thread.currentThread()} ${elapsedTime()}")
delay(2000)
}
COLLECT 1: Thread[main,5,main] 24
EMIT 1: Thread[main,5,main] 2053
COLLECT 2: Thread[main,5,main] 3064
EMIT 2: Thread[main,5,main] 5067
对于collectLatest
,流不会等待收集。
COLLECT 1: Thread[main,5,main] 247
EMIT 1: Thread[main,5,main] 258
COLLECT 2: Thread[main,5,main] 1270
EMIT 2: Thread[main,5,main] 1270
我想出了一个例子来理解缓冲区容量的作用。
flow1.onEach {
println("ON_EACH $it: ${elapsedTime()}")
}.mapLatest {
println("MAP $it: ${elapsedTime()}")
it
}.buffer(0).collect{
println("COLLECT $it: ${elapsedTime()} ")
delay(2000)
}
ON_EACH 1: 116
MAP 1: 120
ON_EACH 2: 123
MAP 2: 124
ON_EACH 3: 124
COLLECT 1: 130
MAP 3: 141
COLLECT 3: 2147
容量为3时,所有内容都被收集。
ON_EACH 1: 131
MAP 1: 134
ON_EACH 2: 138
MAP 2: 140
ON_EACH 3: 140
MAP 3: 140
COLLECT 1: 147
COLLECT 2: 2158
COLLECT 3: 4172
没有指定容量为0,似乎mapLatest
使用了无限容量。
flow1.onEach {
println("ON_EACH $it: ${elapsedTime()}")
}.mapLatest {
println("MAP $it: ${elapsedTime()}")
it
}.collect{
println("COLLECT $it: ${elapsedTime()} ")
delay(2000)
}
ON_EACH 10: 247
MAP 10: 247
COLLECT 1: 257
COLLECT 2: 2270
COLLECT 3: 4274
COLLECT 4: 6284
COLLECT 5: 8298
COLLECT 6: 10311
COLLECT 7: 12316
COLLECT 8: 14323
COLLECT 9: 16329
COLLECT 10: 18341
令人困惑的是,缓冲区影响收集器,但在collectLatest
的代码中,动作是流的一部分mapLatest(action).buffer(0).collect()
。
英文:
A potential hint is that buffer
somehow creates another coroutine.
fun main(): Unit = runBlocking { // this: CoroutineScope
val flow1 = flowOf(1, 2, 3)
// not cancelled (cancelling this: CoroutineScope would cancel the children as well)
launch {
while (true){
println("still running")
delay(1000)
}
}
flow1.collectLatest {// it: Int
delay(10) // delays the collector coroutine
println(it)
}
}
still running
3
still running
still running
...
The same thing happens with mapLatest
.
flow1.mapLatest {// it: Int
delay(10) // delays the collector coroutine
println(it)
}.collect()
I think that the normal collect
delays the current coroutine.
val start = System.currentTimeMillis()
flow1.collect{
delay(1000)
println("$it: ${System.currentTimeMillis() - start}")
}
println("${System.currentTimeMillis() - start}")
1: 1018
2: 2023
3: 3034
3034
However, I am not able to print a different thread to confirm it.
val start = System.currentTimeMillis()
println("${Thread.currentThread()} $coroutineContext")
flow1.collectLatest {
delay(1000)
println("$it: ${System.currentTimeMillis() - start} ${Thread.currentThread()} $coroutineContext")
}
println("${System.currentTimeMillis() - start}")
Thread[main,5,main] [BlockingCoroutine{Active}@532760d8, BlockingEventLoop@57fa26b7]
3: 1095 Thread[main,5,main] [BlockingCoroutine{Active}@532760d8, BlockingEventLoop@57fa26b7]
1107
I tried having delays inside the flow. The emission is affected by the time it takes to collect.
val start = System.currentTimeMillis()
fun elapsedTime() = System.currentTimeMillis() - start
val flow1 = flow{
for(i in 1..5){
emit(i)
println("EMIT $i: ${Thread.currentThread()} ${elapsedTime()}")
delay(1000)
}
}
flow1.collect{
println("COLLECT $it: ${Thread.currentThread()} ${elapsedTime()}")
delay(2000)
}
COLLECT 1: Thread[main,5,main] 24
EMIT 1: Thread[main,5,main] 2053
COLLECT 2: Thread[main,5,main] 3064
EMIT 2: Thread[main,5,main] 5067
With collectLatest
, the flow doesn't wait for the collector.
COLLECT 1: Thread[main,5,main] 247
EMIT 1: Thread[main,5,main] 258
COLLECT 2: Thread[main,5,main] 1270
EMIT 2: Thread[main,5,main] 1270
I came up with an example to understand what buffer capacity does.
flow1.onEach {
println("ON_EACH $it: ${elapsedTime()}")
}.mapLatest {
println("MAP $it: ${elapsedTime()}")
it
}.buffer(0).collect{
println("COLLECT $it: ${elapsedTime()} ")
delay(2000)
}
ON_EACH 1: 116
MAP 1: 120
ON_EACH 2: 123
MAP 2: 124
ON_EACH 3: 124
COLLECT 1: 130
MAP 3: 141
COLLECT 3: 2147
With a capacity of 3, everything gets collected.
ON_EACH 1: 131
MAP 1: 134
ON_EACH 2: 138
MAP 2: 140
ON_EACH 3: 140
MAP 3: 140
COLLECT 1: 147
COLLECT 2: 2158
COLLECT 3: 4172
Without specifying a capacity of 0, it seems that mapLatest uses an infinite capacity.
flow1.onEach {
println("ON_EACH $it: ${elapsedTime()}")
}.mapLatest {
println("MAP $it: ${elapsedTime()}")
it
}.collect{
println("COLLECT $it: ${elapsedTime()} ")
delay(2000)
}
ON_EACH 10: 247
MAP 10: 247
COLLECT 1: 257
COLLECT 2: 2270
COLLECT 3: 4274
COLLECT 4: 6284
COLLECT 5: 8298
COLLECT 6: 10311
COLLECT 7: 12316
COLLECT 8: 14323
COLLECT 9: 16329
COLLECT 10: 18341
What's confusing is that the buffer affects the collector, but in the code of collectLatest
, the action is part of the flow mapLatest(action).buffer(0).collect()
.
答案2
得分: 0
我必须承认,我对这段代码不太熟悉,但是"buffer(0)"部分很可能会阻止数据在被处理之前进行缓冲。
举个例子,昨天我在编写一个bash脚本,将一个程序的输出重定向到一个文件中。与其说输出来自运行中的程序是实时的,就像我在控制台上观看一样,实际上它会一次显示几行,直到程序的输出填满缓冲区。
因为我还在另一个程序中读取这个文件,这会导致第二个程序捕捉输入时出现延迟,从而引发了问题。
我怀疑,就像我的情况一样,这可能不会一直出现,只会在某些情况下才会出现。
英文:
I must admit I am not familiar with this code, but the buffer(0) part is likely preventing any data from being buffered before getting processed.
For example, I was working on a bash script yesterday that was redirecting output from a program into a file. Instead of the lines being redirected from the running program being live, as they would be if I was watching it in the console, it would appear a few lines at a time after the output from the program filled the buffer.
Since I was also reading this file in another program, this would lead to a delay in the input being picked up in the second program and was causing issues.
I suspect, like it was in my case, that this may not even show up all of the time but may only appear in certain cases.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论