如何在F#中同时启动和等待多个I/O操作,而不浪费线程?

huangapple go评论63阅读模式
英文:

How to start and await multiple I/O operations at the same time without wasting threads in F#?

问题

Option 1 — Async.Sequential

let insertProductsAsync (products : DbModels.Product seq) =
    async {
        return!
            products
            |> Seq.map insertProductAsync
            |> Async.Sequential // each async block probably awaiting the previous one to complete, probably not what I want
            |> Async.Ignore
    }

Option 1 is not suitable for the workflow you described. Async.Sequential would indeed execute each asynchronous operation sequentially, but it awaits the previous one to complete before starting the next one. This would block the current thread while waiting for each I/O operation to finish, which is not the desired behavior.

Option 2 — Async.Parallel

let insertProductsAsync (products : DbModels.Product seq) =
    async {
        return!
            products
            |> Seq.map insertProductAsync
            |> Async.Parallel // wasting threads probably, I guess
            |> Async.Ignore
    }

Option 2 is also not suitable for your workflow. Async.Parallel would execute all asynchronous operations concurrently, which might waste threads and resources, and it doesn't guarantee that the operations will be executed sequentially.

Option 3 — Custom Implementation

let insertProductsAsync (products : DbModels.Product seq) =
    async {
        let tasks = products |> Seq.map insertProductAsync |> Seq.toList
        do! Async.Sleep 0 // Yield to the scheduler to allow other work
        return! Task.WhenAll(tasks).ToAsync()
    }

Option 3 is a custom implementation that matches the workflow you described. It starts each insertProductAsync task sequentially and doesn't block the current thread while waiting for them to complete. It uses Task.WhenAll to return a task that completes when all the individual insertProductAsync tasks have been completed.

This custom implementation ensures that the I/O operations execute asynchronously without blocking any threads and fulfills the requirements you specified.

英文:

We know there is no point in running I/O operations in separate threads since they are "truly" asynchronous, meaning that they are non-blocking without requiring an extra thread to be run on.

Consider this scenario: I have an I/O function insertProductAsync : DbModels.Product -> Async<unit> and another I/O function insertProductsAsync : DbModels.Product seq -> Async<unit>. Both do exactly what they say they do and there is one condition: insertProductsAsync must use insertProductAsync. This is an educational example, so no need to discuss SQL batch operations and similar concepts.

The workflow I'm looking for for insertProductsAsync is as follows:

  1. For each product, start an insertProductAsync task sequentially.
  2. While the I/O operations execute, do not use or block the current thread, or any thread.
  3. Finally, return a task or similar structure that completes when all the individual insertProductAsync tasks will have been completed. I believe in C# this could be achieved by doing the following:
public Task InsertProductAsync(DbModels.Product product)
{
    // pure 'async'/non-CPU-bound operation
}

public Task InsertProductsAsync(IEnumerable<DbModels.Product> products)
{
    IEnumerable<Task> tasks = products.Select(InsertProductAsync);  // tasks already started to execute, no extra threads used
    return Task.WhenAll(tasks);
}

Which of the following options (if any) match the workflow I described and why? If none, please provide your own option with explanation. When explaining, please provide a good description of what happens behind the scenes.

Option 1 — Async.Sequential

let insertProductsAsync (products : DbModels.Product seq) =
    async {
        return!
            products
            |> Seq.map insertProductAsync
            |> Async.Sequantial // each async block probably awaiting the previous one to complete, probably not what i want
            |> Async.Ignore
    }

Option 2 — Async.Parallel

let insertProductsAsync (products : DbModels.Product seq) =
    async {
        return!
            products
            |> Seq.map insertProductAsync
            |> Async.Parallel // wasting threads probably, I guess
            |> Async.Ignore
    }

Option 3 — ???

Your option here.

答案1

得分: 1

为了回答您最后的问题,执行并行操作不会阻塞线程。这将是同步行为。您可以在以下示例中看到这一点:

open System
open System.Threading

let doSomething timeout =
    async {
        Thread.Sleep(timeout : int)
        printfn "Slept %d milliseconds on thread ID: %d" timeout Thread.CurrentThread.ManagedThreadId
    }

printfn "Main thread ID: %d" Thread.CurrentThread.ManagedThreadId
doSomething 1000 |> Async.Start

printfn "Main thread ID: %d" Thread.CurrentThread.ManagedThreadId
doSomething 900 |> Async.Start

printfn "Main thread ID: %d" Thread.CurrentThread.ManagedThreadId
doSomething 800 |> Async.Start

printfn "Main thread ID: %d" Thread.CurrentThread.ManagedThreadId
doSomething 700 |> Async.Start

printfn ""
Console.ReadLine() |> ignore

输出:

Main thread ID: 1
Main thread ID: 1
Main thread ID: 1
Main thread ID: 1

Slept 700 milliseconds on thread ID: 10
Slept 800 milliseconds on thread ID: 7
Slept 900 milliseconds on thread ID: 6
Slept 1000 milliseconds on thread ID: 4

请注意,主线程(ID 1)在执行期间不会阻塞。

英文:

To answer your final question, operations that execute in parallel do not block a thread. That would be synchronous behavior. You can see this in the following example:

open System
open System.Threading

let doSomething timeout =
    async {
        Thread.Sleep(timeout : int)
        printfn $"Slept {timeout} milliseconds on thread ID: {Thread.CurrentThread.ManagedThreadId}"
    }

printfn $"Main thread ID: {Thread.CurrentThread.ManagedThreadId}"
doSomething 1000 |> Async.Start

printfn $"Main thread ID: {Thread.CurrentThread.ManagedThreadId}"
doSomething 900 |> Async.Start

printfn $"Main thread ID: {Thread.CurrentThread.ManagedThreadId}"
doSomething 800 |> Async.Start

printfn $"Main thread ID: {Thread.CurrentThread.ManagedThreadId}"
doSomething 700 |> Async.Start

printfn ""
Console.ReadLine() |> ignore

Output:

Main thread ID: 1
Main thread ID: 1
Main thread ID: 1
Main thread ID: 1

Slept 700 milliseconds on thread ID: 10
Slept 800 milliseconds on thread ID: 7
Slept 900 milliseconds on thread ID: 6
Slept 1000 milliseconds on thread ID: 4

Note that the main thread (ID 1) doesn't block during execution.

huangapple
  • 本文由 发表于 2023年7月4日 23:40:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/76614159.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定