buffer(0) 在 Kotlin 中的 collectLatest 实现中的效果是什么?

huangapple go评论52阅读模式
英文:

What is the effect of buffer(0) in the collectLatest implementation in kotlin?

问题

在 Kotlin 源代码中,collectLatest 的实现解释如下:

public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit) {
    /*
     * Implementation note:
     * buffer(0) is inserted here to fulfil user's expectations in sequential usages, e.g.:
     * ```
     * flowOf(1, 2, 3).collectLatest {
     *     delay(1)
     *     println(it) // Expect only 3 to be printed
     * }
     * ```
     *
     * It's not the case for intermediate operators which users mostly use for interactive UI,
     * where performance of dispatch is more important.
     */
    mapLatest(action).buffer(0).collect()
}

它说:“在这里插入 buffer(0) 是为了满足用户在连续使用时的期望”,但是当我不使用 buffer(0) 时,我得到的结果与使用 buffer(0) 时相同:

flowOf(1, 2, 3).mapLatest {
    delay(1)
    println(it)
}.buffer(0).collect() // 输出 3

flowOf(1, 2, 3).mapLatest {
    delay(1)
    println(it)
}.collect() // 也输出 3

所以我猜想 buffer(0) 在这里不起作用。是否有人可以帮助解释一下?

当我查看 mapLatest 的源代码时,有一个注释说:“默认情况下,此操作符是带缓冲的,并且其输出缓冲区的大小可以通过后续应用的 buffer 操作符来更改”。

/**
 * Returns a flow that emits elements from the original flow transformed by [transform] function.
 * When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
 *
 * For example, the following flow:
 * ```
 * flow {
 *     emit("a")
 *     delay(100)
 *     emit("b")
 * }.mapLatest { value ->
 *     println("Started computing $value")
 *     delay(200)
 *     "Computed $value"
 * }
 * ```
 * will print "Started computing a" and "Started computing b", but the resulting flow will contain only "Computed b" value.
 *
 * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
 */
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.mapLatest(@BuilderInference transform: suspend (value: T) -> R): Flow<R> =
    transformLatest { emit(transform(it)) }

我不太明白为什么它说 mapLatest 的缓冲可以通过后续的 buffer 操作符来更改,因为当新的流到达时,前一个流将被取消。更改缓冲的含义是什么?

也许这个问题应该是:“mapLatest/transformLatest 中的缓冲的效果是什么?”

英文:

The explanation of the implementation of collectLatest in the Kotlin source code is as follows:

public suspend fun &lt;T&gt; Flow&lt;T&gt;.collectLatest(action: suspend (value: T) -&gt; Unit) {
    /*
     * Implementation note:
     * buffer(0) is inserted here to fulfil user&#39;s expectations in sequential usages, e.g.:
     * ```
     * flowOf(1, 2, 3).collectLatest {
     *     delay(1)
     *     println(it) // Expect only 3 to be printed
     * }
     * ```
     *
     * It&#39;s not the case for intermediate operators which users mostly use for interactive UI,
     * where performance of dispatch is more important.
     */
    mapLatest(action).buffer(0).collect()
}

It says "buffer(0) is inserted here to fulfil user's expectations in sequential usages", but the results I get when I don't use buffer(0) are the same as when I use buffer(0):

    flowOf(1, 2, 3).mapLatest {
        delay(1)
        println(it)
    }.buffer(0).collect() // print 3

    flowOf(1, 2, 3).mapLatest {
        delay(1)
        println(it)
    }.collect() // print 3 too

So I guess buffer(0) doesn't work here. Can someone help explain it.

When I looked at the source code of mapLatest, there was a comment saying "This operator is buffered by default and size of its output buffer can be changed by applying subsequent buffer operator".

/**
 * Returns a flow that emits elements from the original flow transformed by [transform] function.
 * When the original flow emits a new value, computation of the [transform] block for previous value is cancelled.
 *
 * For example, the following flow:
 * ```
 * flow {
 *     emit(&quot;a&quot;)
 *     delay(100)
 *     emit(&quot;b&quot;)
 * }.mapLatest { value -&gt;
 *     println(&quot;Started computing $value&quot;)
 *     delay(200)
 *     &quot;Computed $value&quot;
 * }
 * ```
 * will print &quot;Started computing a&quot; and &quot;Started computing b&quot;, but the resulting flow will contain only &quot;Computed b&quot; value.
 *
 * This operator is [buffered][buffer] by default and size of its output buffer can be changed by applying subsequent [buffer] operator.
 */
@ExperimentalCoroutinesApi
public fun &lt;T, R&gt; Flow&lt;T&gt;.mapLatest(@BuilderInference transform: suspend (value: T) -&gt; R): Flow&lt;R&gt; =
    transformLatest { emit(transform(it)) }

I don't quite understand why it says mapLatest's buffer can be changed by applying subsequent buffer operator, because when a new flow arrives, the previous flow will be canceled. What is the meaning of changing the buffer?

Maybe this question should be "what is the effect of the buffer in mapLatest/transformLatest?".

答案1

得分: 1

潜在的线索是buffer会以某种方式创建另一个协程。

fun main(): Unit = runBlocking { // this: CoroutineScope
    val flow1 = flowOf(1, 2, 3)

    // 未被取消(取消此作用域的CoroutineScope也会取消其子作用域)
    launch {
        while (true){
            println("依然在运行")
            delay(1000)
        }
    }

    flow1.collectLatest {// it: Int
        delay(10) // 延迟收集协程
        println(it)
    }
}
依然在运行
3
依然在运行
依然在运行
...

mapLatest也发生了同样的事情。

flow1.mapLatest {// it: Int
    delay(10) // 延迟收集协程
    println(it)
}.collect()

我认为正常的collect会延迟当前协程。

val start = System.currentTimeMillis()
flow1.collect{
    delay(1000)
    println("$it: ${System.currentTimeMillis() - start}")
}
println("${System.currentTimeMillis() - start}")
1: 1018
2: 2023
3: 3034
3034

但是,我无法打印不同的线程来确认它。

val start = System.currentTimeMillis()
println("${Thread.currentThread()} $coroutineContext")
flow1.collectLatest {
    delay(1000)
    println("$it: ${System.currentTimeMillis() - start} ${Thread.currentThread()} $coroutineContext")
}
println("${System.currentTimeMillis() - start}")
Thread[main,5,main] [BlockingCoroutine{Active}@532760d8, BlockingEventLoop@57fa26b7]
3: 1095 Thread[main,5,main] [BlockingCoroutine{Active}@532760d8, BlockingEventLoop@57fa26b7]
1107

我尝试在流中添加延迟。发射受到收集所花费的时间的影响。

val start = System.currentTimeMillis()
fun elapsedTime() = System.currentTimeMillis() - start

val flow1 = flow{
    for(i in 1..5){
        emit(i)
        println("EMIT $i: ${Thread.currentThread()} ${elapsedTime()}")
        delay(1000)
    }
}

flow1.collect{
    println("COLLECT $it: ${Thread.currentThread()} ${elapsedTime()}")
    delay(2000)
}
COLLECT 1: Thread[main,5,main] 24
EMIT 1: Thread[main,5,main] 2053
COLLECT 2: Thread[main,5,main] 3064
EMIT 2: Thread[main,5,main] 5067

对于collectLatest,流不会等待收集。

COLLECT 1: Thread[main,5,main] 247
EMIT 1: Thread[main,5,main] 258
COLLECT 2: Thread[main,5,main] 1270
EMIT 2: Thread[main,5,main] 1270

我想出了一个例子来理解缓冲区容量的作用。

flow1.onEach {
    println("ON_EACH $it: ${elapsedTime()}")
}.mapLatest {
    println("MAP $it: ${elapsedTime()}")
    it
}.buffer(0).collect{
    println("COLLECT $it: ${elapsedTime()} ")
    delay(2000)
}
ON_EACH 1: 116
MAP 1: 120
ON_EACH 2: 123
MAP 2: 124
ON_EACH 3: 124
COLLECT 1: 130 
MAP 3: 141
COLLECT 3: 2147 

容量为3时,所有内容都被收集。

ON_EACH 1: 131
MAP 1: 134
ON_EACH 2: 138
MAP 2: 140
ON_EACH 3: 140
MAP 3: 140
COLLECT 1: 147 
COLLECT 2: 2158 
COLLECT 3: 4172 

没有指定容量为0,似乎mapLatest使用了无限容量。

flow1.onEach {
    println("ON_EACH $it: ${elapsedTime()}")
}.mapLatest {
    println("MAP $it: ${elapsedTime()}")
    it
}.collect{
    println("COLLECT $it: ${elapsedTime()} ")
    delay(2000)
}
ON_EACH 10: 247
MAP 10: 247
COLLECT 1: 257 
COLLECT 2: 2270 
COLLECT 3: 4274 
COLLECT 4: 6284 
COLLECT 5: 8298 
COLLECT 6: 10311 
COLLECT 7: 12316 
COLLECT 8: 14323 
COLLECT 9: 16329 
COLLECT 10: 18341 

令人困惑的是,缓冲区影响收集器,但在collectLatest的代码中,动作是流的一部分mapLatest(action).buffer(0).collect()

英文:

A potential hint is that buffer somehow creates another coroutine.

fun main(): Unit = runBlocking { // this: CoroutineScope
    val flow1 = flowOf(1, 2, 3)

    // not cancelled (cancelling this: CoroutineScope would cancel the children as well)
    launch {
        while (true){
            println(&quot;still running&quot;)
            delay(1000)
        }
    }

    flow1.collectLatest {// it: Int
        delay(10) // delays the collector coroutine
        println(it)
    }
}
still running
3
still running
still running
...

The same thing happens with mapLatest.

flow1.mapLatest {// it: Int
        delay(10) // delays the collector coroutine
        println(it)
    }.collect()

I think that the normal collect delays the current coroutine.

    val start = System.currentTimeMillis()
    flow1.collect{
        delay(1000)
        println(&quot;$it: ${System.currentTimeMillis() - start}&quot;)
    }
    println(&quot;${System.currentTimeMillis() - start}&quot;)
1: 1018
2: 2023
3: 3034
3034

However, I am not able to print a different thread to confirm it.

val start = System.currentTimeMillis()
    println(&quot;${Thread.currentThread()} $coroutineContext&quot;)
    flow1.collectLatest {
        delay(1000)
        println(&quot;$it: ${System.currentTimeMillis() - start} ${Thread.currentThread()} $coroutineContext&quot;)
    }
    println(&quot;${System.currentTimeMillis() - start}&quot;)
Thread[main,5,main] [BlockingCoroutine{Active}@532760d8, BlockingEventLoop@57fa26b7]
3: 1095 Thread[main,5,main] [BlockingCoroutine{Active}@532760d8, BlockingEventLoop@57fa26b7]
1107

I tried having delays inside the flow. The emission is affected by the time it takes to collect.

val start = System.currentTimeMillis()
    fun elapsedTime() = System.currentTimeMillis() - start

    val flow1 = flow{
        for(i in 1..5){
            emit(i)
            println(&quot;EMIT $i: ${Thread.currentThread()} ${elapsedTime()}&quot;)
            delay(1000)
        }
    }

    flow1.collect{
        println(&quot;COLLECT $it: ${Thread.currentThread()} ${elapsedTime()}&quot;)
        delay(2000)
    }
COLLECT 1: Thread[main,5,main] 24
EMIT 1: Thread[main,5,main] 2053
COLLECT 2: Thread[main,5,main] 3064
EMIT 2: Thread[main,5,main] 5067

With collectLatest, the flow doesn't wait for the collector.

COLLECT 1: Thread[main,5,main] 247
EMIT 1: Thread[main,5,main] 258
COLLECT 2: Thread[main,5,main] 1270
EMIT 2: Thread[main,5,main] 1270

I came up with an example to understand what buffer capacity does.

flow1.onEach {
        println(&quot;ON_EACH $it: ${elapsedTime()}&quot;)
    }.mapLatest {
        println(&quot;MAP $it: ${elapsedTime()}&quot;)
        it
    }.buffer(0).collect{
        println(&quot;COLLECT $it: ${elapsedTime()} &quot;)
        delay(2000)
    }
ON_EACH 1: 116
MAP 1: 120
ON_EACH 2: 123
MAP 2: 124
ON_EACH 3: 124
COLLECT 1: 130 
MAP 3: 141
COLLECT 3: 2147 

With a capacity of 3, everything gets collected.

ON_EACH 1: 131
MAP 1: 134
ON_EACH 2: 138
MAP 2: 140
ON_EACH 3: 140
MAP 3: 140
COLLECT 1: 147 
COLLECT 2: 2158 
COLLECT 3: 4172 

Without specifying a capacity of 0, it seems that mapLatest uses an infinite capacity.

flow1.onEach {
        println(&quot;ON_EACH $it: ${elapsedTime()}&quot;)
    }.mapLatest {
        println(&quot;MAP $it: ${elapsedTime()}&quot;)
        it
    }.collect{
        println(&quot;COLLECT $it: ${elapsedTime()} &quot;)
        delay(2000)
    }
ON_EACH 10: 247
MAP 10: 247
COLLECT 1: 257 
COLLECT 2: 2270 
COLLECT 3: 4274 
COLLECT 4: 6284 
COLLECT 5: 8298 
COLLECT 6: 10311 
COLLECT 7: 12316 
COLLECT 8: 14323 
COLLECT 9: 16329 
COLLECT 10: 18341 

What's confusing is that the buffer affects the collector, but in the code of collectLatest, the action is part of the flow mapLatest(action).buffer(0).collect().

答案2

得分: 0

我必须承认,我对这段代码不太熟悉,但是"buffer(0)"部分很可能会阻止数据在被处理之前进行缓冲。

举个例子,昨天我在编写一个bash脚本,将一个程序的输出重定向到一个文件中。与其说输出来自运行中的程序是实时的,就像我在控制台上观看一样,实际上它会一次显示几行,直到程序的输出填满缓冲区。

因为我还在另一个程序中读取这个文件,这会导致第二个程序捕捉输入时出现延迟,从而引发了问题。

我怀疑,就像我的情况一样,这可能不会一直出现,只会在某些情况下才会出现。

英文:

I must admit I am not familiar with this code, but the buffer(0) part is likely preventing any data from being buffered before getting processed.

For example, I was working on a bash script yesterday that was redirecting output from a program into a file. Instead of the lines being redirected from the running program being live, as they would be if I was watching it in the console, it would appear a few lines at a time after the output from the program filled the buffer.

Since I was also reading this file in another program, this would lead to a delay in the input being picked up in the second program and was causing issues.

I suspect, like it was in my case, that this may not even show up all of the time but may only appear in certain cases.

huangapple
  • 本文由 发表于 2023年5月28日 11:44:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/76349848.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定