取消 URLSession.AsyncBytes 的任务似乎不起作用。

huangapple go评论64阅读模式
英文:

Cancelling the task of a URLSession.AsyncBytes doesn't seem to work

问题

我想下载一个大文件,知道传输的字节数,并且能够在必要时取消下载。

我知道可以通过使用URLSessionDownloadTask并遵循URLSessionDownloadDelegate来实现这一目标,但我希望通过async/await机制来实现,所以我使用了URLSession.shared.bytes(from: url),然后使用for-await-in循环来处理每个字节。

问题出现在尝试取消正在进行的任务时,因为即使URLSession.AsyncBytesTask已被取消,for-await-in循环仍然在处理字节,所以我认为下载仍在进行中。

我在playground中使用了这段代码进行测试。

let url = URL(string: "https://example.com/large_file.zip")!

let (asyncBytes, _) = try await URLSession.shared.bytes(from: url)

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    asyncBytes.task.cancel()
}

var data = Data()

for try await byte in asyncBytes {

    data.append(byte)
    print(data.count)
}

我本来希望一旦任务被取消,下载就会停止,因此for-await-in也会停止处理字节。

我在这里有什么遗漏吗?这些任务无法有效取消吗?

英文:

I want to download a large file, knowing the number of bytes transferred, and be able to cancel the download if necessary.

I know that this can be done having a URLSessionDownloadTask and conforming to the URLSessionDownloadDelegate, but I wanted to achieve it through an async/await mechanism, so I used URLSession.shared.bytes(from: url) and then a for-await-in loop to handle each byte.

The issue comes when trying to cancel the ongoing task, as even though the URLSession.AsyncBytes's Task has been cancelled, the for-await-in loop keeps processing bytes, so I'm assuming that the download is still ongoing.

I've tested it with this piece of code in a playground.

    let url = URL(string: "https://example.com/large_file.zip")!
        
    let (asyncBytes, _) = try await URLSession.shared.bytes(from: url)
    
    DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
        asyncBytes.task.cancel()
    }

    var data = Data()
    
    for try await byte in asyncBytes {
        
        data.append(byte)
        print(data.count)
    }

I would have expected that, as soon as the task is cancelled, the download would have been stopped and, therefore, the for-await-in would stop processing bytes.

What am I missing here? Can these tasks not be effectively cancelled?

答案1

得分: 2

取消一个 URLSessionDataTaskAsyncBytes 中能正常工作。话虽如此,即使取消了 URLSessionDataTaskAsyncBytes 仍会继续迭代取消前接收的字节。但数据任务确实会停止。

考虑 experiment1

@MainActor
class ViewModel: ObservableObject {
    private let url: URL = 
    private let session: URLSession = 
    private var cancelButtonTapped = false
    private var dataTask: URLSessionDataTask?

    @Published var bytesBeforeCancel = 0
    @Published var bytesAfterCancel = 0

    func experiment1() async throws {
        let (asyncBytes, _) = try await session.bytes(from: url)
        dataTask = asyncBytes.task

        var data = Data()

        for try await byte in asyncBytes {
            if cancelButtonTapped {
                bytesAfterCancel += 1
            } else {
                bytesBeforeCancel += 1
            }
            data.append(byte)
        }
    }

    func cancel() {
        dataTask?.cancel()
        cancelButtonTapped = true
    }
}

所以,我在1秒后取消了操作(在这个时候我已经迭代了2,022字节),它继续迭代取消 URLSessionDataTask 前接收的剩余 14,204 字节。但下载确实成功停止。(在我的示例中,实际下载的资源大小为74MB)。当使用 URLSession 时,数据以数据包的形式传输,因此 AsyncBytes 需要一点时间来处理在取消 URLSession 请求之前实际接收的所有数据。

您可以考虑取消 Swift 并发的 Task,而不是取消 URLSessionDataTask。 (我真的希望它们不要使用相同的单词“task”来指代完全不同的概念!)

考虑 experiment2

@MainActor
class ViewModel: ObservableObject {
    private let url: URL = 
    private let session: URLSession = 
    private var cancelButtonTapped = false
    private var task: Task<Void, Error>?

    @Published var bytesBeforeCancel = 0
    @Published var bytesAfterCancel = 0

    func experiment2() async throws {
        task = Task { try await download() }
        try await task?.value
    }

    func cancel() {
        task?.cancel()
        cancelButtonTapped = true
    }

    func download() async throws {
        let (asyncBytes, _) = try await session.bytes(from: url)

        var data = Data()

        for try await byte in asyncBytes {
            try Task.checkCancellation()

            if cancelButtonTapped {
                bytesAfterCancel += 1
            } else {
                bytesBeforeCancel += 1
            }

            data.append(byte)
        }
    }
}

没有 try Task.checkCancellation() 行,行为几乎与 experiment1 相同。取消带有 AsyncBytesTask 会导致取消底层的 URLSessionDataTask(但是序列将继续迭代在取消之前成功接收的数据包中的字节)。但使用 try Task.checkCancellation(),它将在 Task 被取消后立即退出。

英文:

Canceling a URLSessionDataTask works fine with AsyncBytes. That having been said, even if the URLSessionDataTask is canceled, the AsyncBytes will continue to iterate through the bytes received prior to cancelation. But the data task does stop.

Consider experiment1:

@MainActor
class ViewModel: ObservableObject {
    private let url: URL = 
    private let session: URLSession = 
    private var cancelButtonTapped = false
    private var dataTask: URLSessionDataTask?

    @Published var bytesBeforeCancel = 0
    @Published var bytesAfterCancel = 0

    func experiment1() async throws {
        let (asyncBytes, _) = try await session.bytes(from: url)
        dataTask = asyncBytes.task

        var data = Data()

        for try await byte in asyncBytes {
            if cancelButtonTapped {
                bytesAfterCancel += 1
            } else {
                bytesBeforeCancel += 1
            }
            data.append(byte)
        }
    }

    func cancel() {
        dataTask?.cancel()
        cancelButtonTapped = true
    }
}

So, I canceled after 1 second (at which point I had iterated through 2,022 bytes), and it continues to iterate through the remaining 14,204 bytes that had been received prior to the cancelation of the URLSessionDataTask. But the download does stop successfully. (In my example, the actual asset being downloaded was 74mb.) When using URLSession, the data comes in packets, so it takes AsyncBytes a little time to get through everything that was actually received before the URLSession request was canceled.

取消 URLSession.AsyncBytes 的任务似乎不起作用。


You might consider canceling the Swift concurrency Task, rather than the URLSessionDataTask. (I really wish they did not use the same word, “task”, to refer to entirely different concepts!)

Consider experiment2:

@MainActor
class ViewModel: ObservableObject {
    private let url: URL = 
    private let session: URLSession = 
    private var cancelButtonTapped = false
    private var task: Task&lt;Void, Error&gt;?

    @Published var bytesBeforeCancel = 0
    @Published var bytesAfterCancel = 0

    func experiment2() async throws {
        task = Task { try await download() }
        try await task?.value
    }

    func cancel() {
        task?.cancel()
        cancelButtonTapped = true
    }

    func download() async throws {
        let (asyncBytes, _) = try await session.bytes(from: url)

        var data = Data()

        for try await byte in asyncBytes {
            try Task.checkCancellation()

            if cancelButtonTapped {        // this whole `if` statement is no longer needed, but I&#39;ve kept it here for comparison to the previous example
                bytesAfterCancel += 1
            } else {
                bytesBeforeCancel += 1
            }

            data.append(byte)
        }
    }
}

Without the try Task.checkCancellation() line, the behavior is almost the same as in experiment1. The cancelation of the Task with the AsyncBytes will result in the cancelation of the underlying URLSessionDataTask (but the sequence will continue to iterate through the bytes in the packets that were successfully received prior to cancelation). But with try Task.checkCancellation(), it will exit as soon as the Task is canceled.

取消 URLSession.AsyncBytes 的任务似乎不起作用。

答案2

得分: 0

以下是您要翻译的文本部分:

TL;DR Read Rob's answer, but the iterator code and and the partial download code are still handy so I'm leaving this answer with corrections.

Okay so I spent some time on this because I'm about to try to write my own cancellable url stream object. and it appears that asyncBytes.task.cancel() is more along the lines of URLSession's finishTasksAndInvalidate() than invalidateAndCancel().</strike> Since you are pointing your streaming task at a file that isn't really that large the URLSessionDataTask had already gotten the bytes in the buffer.

You can see this when you change up the function a bit (see Rob's example as well):


    func test_funcCondition(timeOut:TimeInterval, url:URL, session:URLSession) async throws {
             
        let (asyncBytes, _) = try await session.bytes(from: url)
    
        let deadLine = Date.now + timeOut
        var data = Data()
    
       func someConditionCheck(_ deadline:Date) -&gt; Bool {
            Date.now &gt; deadLine
        }
        
        for try await byte in asyncBytes {   
            if someConditionCheck(deadLine) {  
                asyncBytes.task.cancel()
                print(&quot;trying to cancel...&quot;)
            } 
            //Wrong type of task! Should not work. if Task.isCancelled { print (&quot;cancelled&quot;) }
            data.append(byte) 
            //just to reduce the amount of printing
            if data.count % 100 == 0 {
                print(data.count) 
            }              
        }
    }

如果您将URL指向"https://example.com/large_file.zip",就像您的示例一样,并将时间间隔设置得非常短,函数将在标记命中时间和文件完成之间打印"trying to cancel..."。 <strike>然而,它永远不会打印"cancelled"。</strike>(被取消的任务是URLSessionDataTask,而不是Swift并发Task,那一行永远不会起作用。)

<strike>如果您将您编写的任何内容或此函数指向Server-Sent-Event流,它将很好地取消。</strike>(虽然是真的,但与其他行为相比,它也可以很好地工作。只是SSE数据中有更大的暂停。)

<strike>如果这不是您想要的,如果您想要能够在中途停止流,也许可以探索自定义委托(这是我自己还没有做过的事情),或者使用AVFoundation来工作,如果这是一个选项,因为他们已经深思熟虑了如何处理大型流式文件。 我没有检查是否创建自己的会话并在其上运行session.invalidateAndCancel(),因为那似乎有点过于极端,但如果您想立即刷新缓冲区,这可能是一种方法。</strike>

以下内容将用于立即停止关注缓冲区。它涉及制作自定义迭代器。<strike>但它似乎有点古怪,实际上可能不会阻止下载(仍然会消耗用户的数据速率和电源)。 我还没有研究流协议与较低级别的网络协议之间的关系,如果停止请求,它是否会停止获取? 我不知道。</strike>取消将停止流,允许通过已在缓冲区中的字节,但您的代码不会获取它们。现在在我的待办事项中是研究如何更改缓冲策略。

Rob的代码似乎是一种不错的方法,并利用了并发Task

    func test_customIterator(timeOut:TimeInterval, url:URL, session:URLSession) async throws {
             
        let (asyncBytes, _) = try await session.bytes(from: url)
    
        let deadLine = Date.now + timeOut
        var data = Data()
    
       func someConditionCheck(_ deadline:Date) -&gt; Bool {
            Date.now &gt; deadLine
        }
    
        //could also be asyncBytes.lines.makeAsyncIterator(), etc.
        var iterator = asyncBytes.makeAsyncIterator()
        while !someConditionCheck(deadLine) {
            //await Task.yield()
            let byte = try await iterator.next()
            data.append(byte!) 
            print(data.count)            
        }
        //make sure to still tell URLSession you aren't listening anymore.
        //It may auto-close but that's not how I roll.
        asyncBytes.task.cancel()
    
    }

    let tap_out:TimeInterval = 0.0005
    try await test_customIterator(timeOut: tap_out, url: URL(string:"https://example.com/large_file.zip")!, session: URLSession.shared)

<strike>有趣的行为特点。感谢您指出它。此外,我不知道任务已经可用(asyncBytes.task)。感谢那个。</strike>不正确。asyncBytes.task是URLSessionDataTask,而不是并发Task

更新以添加:

要明确获取文件的一部分

    //https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
    func requestInChunks(data:inout Data, url:URL, session:URLSession, offset:Int, length:Int) async throws {
        var urlRequest = URLRequest(url: url)
        urlRequest.addValue("bytes=\(offset)-\(offset + length - 1)", forHTTPHeaderField: "Range")
    
        let (asyncBytes, response) = try await
            session.bytes(for: urlRequest, delegate: nil)
    
        guard (response as? HTTPURLResponse)?.statusCode == 206 else { //NOT 200!!
            throw APIngError("The server responded with an error.")
        }
        
        for try await byte in asyncBytes { 
            data.append(byte) 
            if data.count % 100 == 0 {
                print(data.count) 
            }  
        }
    }

仍然认为,如果我的任务是关于文件下载,session.download将是我的首选,但然后还有文件清理等等,所以我明白为什么不选择那里。

英文:

TL;DR Read Rob's answer, but the iterator code and and the partial download code are still handy so I'm leaving this answer with corrections.

Okay so I spent some time on this because I'm about to try to write my own cancellable url stream object. <strike>and it appears that asyncBytes.task.cancel() is more along the lines of URLSession's finishTasksAndInvalidate() than invalidateAndCancel().</strike> Since you are pointing your streaming task at a file that isn't really that large the URLSessionDataTask had already gotten the bytes in the buffer.

You can see this when you change up the function a bit (see Rob's example as well):


    func test_funcCondition(timeOut:TimeInterval, url:URL, session:URLSession) async throws {
             
        let (asyncBytes, _) = try await session.bytes(from: url)
    
        let deadLine = Date.now + timeOut
        var data = Data()
    
       func someConditionCheck(_ deadline:Date) -&gt; Bool {
            Date.now &gt; deadLine
        }
        
        for try await byte in asyncBytes {   
            if someConditionCheck(deadLine) {  
                asyncBytes.task.cancel()
                print(&quot;trying to cancel...&quot;)
            } 
            //Wrong type of task! Should not work. if Task.isCancelled { print (&quot;cancelled&quot;) }
            data.append(byte) 
            //just to reduce the amount of printing
            if data.count % 100 == 0 {
                print(data.count) 
            }              
        }
    }

If you point the URL at "https://example.com/large_file.zip" like your example and make the time interval very short the function will print "trying to cancel..." between the time your marker hits and the file completes. <strike>It does NOT however, ever print "cancelled".</strike> (The task being cancelled is a URLSessionDataTask, not a Swift concurrency Task, that line never would have worked.)

<strike>If you point either what you wrote or this function at a Server-Sent-Event stream it will cancel out just fine.</strike> (While true, its not in contrast to the other behavior, which also works just fine. There are just bigger pauses in SSE data.)

<strike>If that isn't what you want, if you want to be able to start-stop streams mid-chunk, maybe explore a custom delegate (something I haven't done yet myself), or go work with AVFoundation if that's an option because they've thought a lot about working with large streaming files. I did not check making my own session and running session.invalidateAndCancel() on it instead, because that seems kind of extreme, but may be the way to go if you want to flush the buffer immediately. </strike>

The below will work to stop caring about the buffer immediately. It involves making a custom iterator. <strike>but it seems kind of quirky and may not in fact arrest the downloading (still cost users data rates and power). I haven't looked into how the stream protocol relates to the network protocol on that lower level, if you stop asking does it stop getting? I don't know. </strike> The cancel will arrest the stream allowing through the bytes that are already in the buffer, but your code won't get them. On my todo-list now is to look into how to change buffering policies.

Rob's code seems a nice way to go and advantage of a concurrency Task.

    func test_customIterator(timeOut:TimeInterval, url:URL, session:URLSession) async throws {
             
        let (asyncBytes, _) = try await session.bytes(from: url)
    
        let deadLine = Date.now + timeOut
        var data = Data()
    
       func someConditionCheck(_ deadline:Date) -&gt; Bool {
            Date.now &gt; deadLine
        }
    
        //could also be asyncBytes.lines.makeAsyncIterator(), etc.
        var iterator = asyncBytes.makeAsyncIterator()
        while !someConditionCheck(deadLine) {
            //await Task.yield()
            let byte = try await iterator.next()
            data.append(byte!) 
            print(data.count)            
        }
        //make sure to still tell URLSession you aren&#39;t listening anymore.
        //It may auto-close but that&#39;s not how I roll.
        asyncBytes.task.cancel()
    
    }

    let tap_out:TimeInterval = 0.0005
    try await test_customIterator(timeOut: tap_out, url: URL(string:&quot;https://example.com/large_file.zip&quot;)!, session: URLSession.shared)

<strike>Interesting flavor of behavior. Thanks for pointing it out. Also I didn't know that the task was already available (asyncBytes.task). Thanks for that.</strike> Incorrect. The asyncBytes.task is a URLSessionDataTask not a concurrency Task

UPDATED TO ADD:

To get part of the file explicitly

    //https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests
    func requestInChunks(data:inout Data, url:URL, session:URLSession, offset:Int, length:Int) async throws {
        var urlRequest = URLRequest(url: url)
        urlRequest.addValue(&quot;bytes=\(offset)-\(offset + length - 1)&quot;, forHTTPHeaderField: &quot;Range&quot;)
    
        let (asyncBytes, response) = try await
            session.bytes(for: urlRequest, delegate: nil)
    
        guard (response as? HTTPURLResponse)?.statusCode == 206 else { //NOT 200!!
            throw APIngError(&quot;The server responded with an error.&quot;)
        }
        
        for try await byte in asyncBytes { 
            data.append(byte) 
            if data.count % 100 == 0 {
                print(data.count) 
            }  
        }
    }

Still think if my task on hand was about file downloading session.download would be my go to, but then there is file clean up, etc. so I get why not go there.

huangapple
  • 本文由 发表于 2023年2月16日 02:40:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/75464142.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定