Ensure that a Stream is closed after converting to Sequence (or: why do "use" and "finally" not work in a sequence)

huangapple go评论60阅读模式
英文:

Ensure that a Stream is closed after converting to Sequence (or: why do "use" and "finally" not work in a sequence)

问题

I have some Java code which makes heavy use of the Stream API. It is critical that these streams are closed when we are finished consuming them, but we are struggling to come up with a robust solution.

I had an idea: this is already a mixed Java + Kotlin project, so let's try Kotlin's Sequence.

And so I came up with this extension function which looks like it does just what we need:

fun <T> Stream<T>.asCloseableSequence() = sequence {
    this@asCloseableSequence.use {
        yieldAll(it.asSequence())
    }
}

This works okay. The original Stream is closed after we finish processing the Sequence. However, if an exception occurs during processing, then the Stream is not closed.

What am I doing wrong here? My understanding of the use function is that it should close the underlying resource even if an exception occurs. My thought was that the exception must be occurring even before use is called, but if we add some prints

sequence {
    println("entering sequence")

    this@asCloseableSequence.use {
        println("entering use")
        yieldAll(it.asSequence())
    }
}

then we can see entering use is indeed printed, and yet the Stream is not closed.

The same thing happens if I use try/finally instead of the use function.

Here is a complete, (close to) minimal, reproducible example. (Note that the built-in asSequence function does not work even if no exception occurs, and use does work if it is not used inside a sequence scope.)

import java.util.stream.Stream
import kotlin.streams.asSequence
import kotlin.test.Test
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue

class StreamClosingTests {

    /**
     * First, let's see if the built-in function does what we want.
     */
    @Test
    fun `using asSequence`() {

        // Given a Stream that has been converted to a Sequence,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
        val sequence = stream.asSequence()

        // When we perform a terminal operation on the Sequence,
        sequence.forEach { println(it) }

        // Then the underlying Stream should be closed.
        assertTrue(closed[0]) // Fails!
    }

    /**
     * Since the above test fails, lets try using sequence scope instead
     */
    @Test
    fun `using SequenceScope and iterator`() {

        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
        val sequence = sequence {
            stream.use {
                yieldAll(it.iterator())
            }
        }

        sequence.forEach { println(it) }

        assertTrue(closed[0]) // Passes!
    }

    @Test
    fun `using SequenceScope and iterator and Exception occurs`() {

        // Given a Stream that has been converted to a Sequence,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
        val sequence = sequence {
            stream.use {
                yieldAll(it.iterator())
            }
        }

        // When we perform a terminal operation on the Sequence and an exception occurs
        assertFailsWith<RuntimeException> {
            sequence.forEach { throw RuntimeException() }
        }

        // Then the underlying Stream should be closed.
        assertTrue(closed[0]) // Fails!
    }

    /**
     * Let's remove sequence and see if use works with just a plain old stream.
     */
    @Test
    fun `use without sequence`() {

        // Given a Stream,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }

        // When we perform a terminal operation on the Stream and an exception occurs,
        assertFailsWith<RuntimeException> {
            stream.use {
                it.forEach { throw RuntimeException() }
            }
        }

        // Then the Stream should be closed.
        assertTrue(closed[0]) // Passes!
    }
}
英文:

I have some Java code which makes heavy use of the Stream API. It is critical that these streams are closed when we are finished consuming them, but we are struggling to come up with a robust solution.

I had an idea: this is already a mixed Java + Kotlin project, so let's try Kotlin's Sequence

And so I came up with this extension function which looks like it does just what we need:

fun &lt;T&gt; Stream&lt;T&gt;.asCloseableSequence() = sequence {
    this@asCloseableSequence.use {
        yieldAll(it.asSequence())
    }
}

This works okay. The original Stream is closed after we finish processing the Sequence. However, if an exception occurs during processing, then the Stream is not closed.

What am I doing wrong here? My understanding of the use function is that it should close the underlying resource even if an exception occurs. My thought was that the exception must be occurring even before use is called, but if we add some prints

sequence {
    println(&quot;entering sequence&quot;)

    this@asCloseableSequence.use {
        println(&quot;entering use&quot;)
        yieldAll(it.asSequence())
    }
}

then we can see entering use is indeed printed, and yet the Stream is not closed.

The same thing happens if I use try/finally instead of the use function.

Here is a complete, (close to) minimal, reproducible example. (Note that the built in asSequence function does not work even if no exception occurs, and use does work if it is not used inside a sequence scope.)

import java.util.stream.Stream
import kotlin.streams.asSequence
import kotlin.test.Test
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue

class StreamClosingTests {

    /**
     * First, let&#39;s see if the built-in function does what we want.
     */
    @Test
    fun `using asSequence`() {

        // Given a Stream that has been converted to a Sequence,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
        val sequence = stream.asSequence()

        // When we perform a terminal operation on the Sequence,
        sequence.forEach { println(it) }

        // Then the underlying Stream should be closed.
        assertTrue(closed[0]) // Fails!
    }

    /**
     * Since the above test fails, lets try using sequence scope instead
     */
    @Test
    fun `using SequenceScope and iterator`() {

        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
        val sequence = sequence {
            stream.use {
                yieldAll(it.iterator())
            }
        }

        sequence.forEach { println(it) }

        assertTrue(closed[0]) // Passes!
    }

    @Test
    fun `using SequenceScope and iterator and Exception occurs`() {

        // Given a Stream that has been converted to a Sequence,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }
        val sequence = sequence {
            stream.use {
                yieldAll(it.iterator())
            }
        }

        // When we perform a terminal operation on the Sequence and an exception occurs
        assertFailsWith&lt;RuntimeException&gt; {
            sequence.forEach { throw RuntimeException() }
        }

        // Then the underlying Stream should be closed.
        assertTrue(closed[0]) // Fails!
    }

    /**
     * Let&#39;s remove sequence and see if use works with just a plain old stream.
     */
    @Test
    fun `use without sequence`() {

        // Given a Stream,
        val closed = mutableListOf(false)
        val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }

        // When we perform a terminal operation on the Stream and an exception occurs,
        assertFailsWith&lt;RuntimeException&gt; {
            stream.use {
                it.forEach { throw RuntimeException() }
            }
        }

        // Then the Stream should be closed.
        assertTrue(closed[0]) // Passes!
    }
}

(Side note: it is very possible that Streams and Sequences are poorly suited to our use case. But even so, I am very interested in why this doesn't work as I expect.)

答案1

得分: 4

以下是您要求的中文翻译:

"Stream" 和 "Sequence" 都没有真正的停止消耗的概念。它们不知道是否有人仍在读取它们或已经完成,我们无法区分一个与另一个。此外,Kotlin 序列通常支持多次消耗它们(尽管这不是必需的),因此我们不能假设它们在消耗所有项后应始终关闭。

您的解决方案不起作用,因为它只检测到我们在消耗最后一项时才退出 "use()"。如果我们在到达最后一项之前停止消耗,它只会在 "yieldAll()" 行中永远等待(好吧,不完全是这样)。我们实际上不必抛出异常。使用诸如 "first()"、"take()" 或 "find()" 之类的方法,您将遇到相同的问题,因为我们没有到达流的最后一项,因此我们永远不会退出 "use()"。

假设我们失去了对序列的所有引用(这在大多数情况下是正确的),我认为它最终会关闭流。当垃圾回收器决定销毁序列时,它可能会取消其底层协程,结果我们也将退出 "use()",关闭流。但这不会立即发生在我们停止消耗序列之后。

我不确定您是否注意到了,即使 "Stream" 是 "AutoCloseable" 的子类型,它也不会自动调用 "close()"。"stream.forEach { println(it) }" 不会关闭流。流只是自动可关闭的,以允许人们通过在 try-with-resources 中使用它们来显式关闭它们。就像您在 "use without sequence" 示例中所做的那样。

我建议使用与序列完全相同的方法。创建一个可关闭的序列,然后期望您的代码的用户以正确和显式的方式关闭序列,例如使用 "use()"。

如果您想强制代码的用户关闭序列,常见的解决方案是不直接提供对序列的访问,而只在 lambda 内部提供访问,并在完成时自动关闭序列:

第一个示例看起来与第一个示例非常相似,但不同之处在于 "ClosingProvider" 本身不是序列,因此用户无法在 "use()" 之外消耗它。

英文:

Both Stream and Sequence don't really have a concept of stopping consuming. They don't know if someone still reads from them or finished already, we can't distinguish one from another. Additionally, Kotlin sequences generally support consuming them multiple times (although this is not required), so we can't assume they should be always closed after consuming all items.

Your solution doesn't work, because it detects only the case when we consumed the last item - only then it exits the use(). If we stop consuming before the last item, it just waits in yieldAll() line forever (well, not exactly). We don't really have to throw an exception. Use things like first(), take() or find() and you will have the same problem - because we didn't get to the last item in the stream and we never exited use().

Assuming we lost all references to the sequence (which is true in most cases), I think it will eventually close the stream. When garbage collector decides to destroy the sequence, it will probably cancel its underlying coroutine and as a result we will exit use() as well, closing the stream. But it won't happen straight after we stopped consuming the sequence.

I'm not sure if you noticed that, but even if Stream is a subtype of AutoCloseable, it also doesn't automatically call close(). stream.forEach { println(it) } won't close the stream. Streams are auto-closeable only to allow people to close them explicitly, by using them in try-with-resources. Which you did in your use without sequence example.

I suggest using exactly the same approach with sequences. Create a closeable sequence and then expect users of your code to properly and explicitly close the sequence, for example by using use().

@Test
fun `use with closeable sequence`() {

    // Given a Stream,
    val closed = mutableListOf(false)
    val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }

    stream.asCloseableSequence().use { seq -&gt;
        seq.take(1).forEach { println(it) }
    }
    // Then the Stream should be closed.
    assertTrue(closed[0]) // Passes!
}

interface CloseableSequence&lt;T&gt; : Sequence&lt;T&gt;, AutoCloseable

fun &lt;T&gt; Stream&lt;T&gt;.asCloseableSequence() = object : CloseableSequence&lt;T&gt; {
    override fun iterator(): Iterator&lt;T&gt; = this@asCloseableSequence.iterator()
    override fun close() = this@asCloseableSequence.close()
}

If you want to force users of your code to close the sequence, common solution is to not provide the access to the sequence directly, but only inside a lambda and close the sequence automatically upon finishing:

@Test
fun `use with closing provider`() {

    // Given a Stream,
    val closed = mutableListOf(false)
    val stream = Stream.of(1, 2, 3).onClose { closed[0] = true }

    stream.asSequenceClosingProvider().use { seq -&gt;
        seq.take(1).forEach { println(it) }
    }
    // Then the Stream should be closed.
    assertTrue(closed[0]) // Passes!
}

interface ClosingProvider&lt;T&gt; {
    fun &lt;R&gt; use(block: (T) -&gt; R): R
}

fun &lt;T&gt; Stream&lt;T&gt;.asSequenceClosingProvider() = object : ClosingProvider&lt;Sequence&lt;T&gt;&gt; {
    override fun &lt;R&gt; use(block: (Sequence&lt;T&gt;) -&gt; R): R = this@asSequenceClosingProvider.use { block(it.asSequence()) }
}

It looks very similar to the first example, but the difference is that the ClosingProvider isn't a sequence itself, so users can't consume it outside of use().

答案2

得分: 2

免责声明:这是一种“技巧”,用于简化管理Java Stream关闭,而不是直接解决序列问题的直接响应。有关详细信息,请参阅Broot答案

可以通过使用flatMap自动关闭内部自身的流。这个技巧包括以下步骤:

  1. 创建一个只包含一个值的流。
  2. "flatMapping"这个值,以创建有效打开所需资源的流。
  3. 这就是全部。flatMap负责在错误或完成时关闭底层流。

演示:

import java.util.stream.Stream

/** @return a stream that must be manually closed */
fun closeableStream() :Stream<String> {
    print("OPEN")
    return Stream.of("a", "b", "c")
                 .run { onClose { println("..CLOSE") } }
}

/** @return a stream that open/close automatically underlying resource */
fun autoCloseableStream() = Stream.of("whatever").flatMap { closeableStream() }

fun main() {
    println("execute stream")
    autoCloseableStream().forEach { print("..$it") }

    println("partially consume stream")
    autoCloseableStream().limit(2).forEach { print("..$it") }

    println("fail stream")
    autoCloseableStream().forEach {
        require(it != "b") { "Failure" }
        print("..$it")
    }
}

输出:

execute stream
OPEN..a..b..c..CLOSE
partially consume stream
OPEN..a..b..CLOSE
fail stream
OPEN..a..CLOSE
Exception in thread "main" java.lang.IllegalArgumentException: Failure
    at ...

P.S: 我受到了project-reactor的Flux.using操作符的启发。

英文:

Disclaimer : This is a "trick" to ease managing java Stream closing, not a direct response for the problem with sequences. For it, see Broot answer.

It is possible to provide streams that close themselves internally automatically, by using flatMap. The trick consists in:

  1. Creating a stream with only a single value
  2. "flatMapping" this value, to create the stream that effectively open wanted resource
  3. That's all. flatMap is responsible for closing the underlying stream upon error or completion.

Demonstration:

import java.util.stream.Stream

/** @return a stream that must be manually closed */
fun closeableStream() :Stream&lt;String&gt; {
    print(&quot;OPEN&quot;)
    return Stream.of(&quot;a&quot;, &quot;b&quot;, &quot;c&quot;)
                 .run { onClose { println(&quot;..CLOSE&quot;) } }
}

/** @return a stream that open/close automatically underlying resource */
fun autoCloseableStream() = Stream.of(&quot;whatever&quot;).flatMap { closeableStream() }

fun main() {
    println(&quot;execute stream&quot;)
    autoCloseableStream().forEach { print(&quot;..$it&quot;) }

    println(&quot;partially consume stream&quot;)
    autoCloseableStream().limit(2).forEach { print(&quot;..$it&quot;) }

    println(&quot;fail stream&quot;)
    autoCloseableStream().forEach {
        require(it != &quot;b&quot;) { &quot;Failure&quot; }
        print(&quot;..$it&quot;)
    }
}

Output:

execute stream
OPEN..a..b..c..CLOSE
partially consume stream
OPEN..a..b..CLOSE
fail stream
OPEN..a..CLOSE
Exception in thread &quot;main&quot; java.lang.IllegalArgumentException: Failure
at ...

P.S: I've been inspired by project-reactor Flux.using operator for this.

huangapple
  • 本文由 发表于 2023年6月9日 01:37:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76434387.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定