英文:
How to implement the BlockingCollection.TakeFromAny equivalent for Channels?
问题
我正在尝试实现一个异步方法,该方法接受一个 ChannelReader<T>
数组,并从任何可用项的通道中获取一个值。这个方法的功能类似于 BlockingCollection<T>.TakeFromAny
方法,其签名如下:
public static int TakeFromAny(BlockingCollection<T>[] collections, out T item,
CancellationToken cancellationToken);
该方法返回从 collections
数组中移除项的索引。由于 async
方法不能有 out
参数,所以我尝试实现的 API 是这样的:
public static Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default);
TakeFromAnyAsync<T>
方法应该异步读取一个项,并将消耗的项与关联通道在 channelReaders
数组中的索引一起返回。如果所有通道都已完成(无论是成功还是出错),或者在 await
过程中所有通道都变为完成状态,该方法应该异步抛出 ChannelClosedException
。
**我的问题是:**我该如何实现 TakeFromAnyAsync<T>
方法?实现看起来相当棘手。显然,在任何情况下,该方法都不应从通道中消耗多个项。它也不应该留下 fire-and-forget 任务,或者让可释放资源未释放。该方法通常会在循环中调用,因此它还应该具有合理的效率。它的复杂度应该不超过 O(n),其中 n
是通道的数量。
关于这个方法在哪些情况下有用的见解,你可以看一下 Go 语言的 select
语句。来自 tour 的描述:
> select
语句允许一个 goroutine 等待多个通信操作。
>
> select
会阻塞,直到其中一个 case 可以运行,然后执行该 case。如果有多个 case 都准备好了,它会随机选择一个。
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
在上面的示例中,要么从通道 c1
中获取一个值并将其赋给变量 msg1
,要么从通道 c2
中获取一个值并将其赋给变量 msg2
。Go 的 select
语句不仅限于从通道中读取。它可以包含多个异构的 case,比如向有界通道写入、等待定时器等。复制 Go select
语句的全部功能超出了本问题的范围。
英文:
I am trying to implement an asynchronous method that takes an array of ChannelReader<T>
s, and takes a value from any of the channels that has an item available. It is a method with similar functionality with the BlockingCollection<T>.TakeFromAny
method, that has this signature:
public static int TakeFromAny(BlockingCollection<T>[] collections, out T item,
CancellationToken cancellationToken);
This method returns the index in the collections
array from which the item was removed. An async
method cannot have out
parameters, so the API that I am trying to implement is this:
public static Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default);
The TakeFromAnyAsync<T>
method should read asynchronously an item, and return the consumed item along with the index of the associated channel in the channelReaders
array. In case all the channels are completed (either successfully or with an error), or all become complete during the await
, the method should throw asynchronously a ChannelClosedException
.
My question is: how can I implement the TakeFromAnyAsync<T>
method? The implementation looks quite tricky. It is obvious that under no circumstances the method should consume more than one items from the channels. Also it should not leave behind fire-and-forget tasks, or let disposable resources undisposed. The method will be typically called in a loop, so it should also be reasonably efficient. It should have complexity not worse than O(n), where n
in the number of the channels.
As an insight of where this method can be useful, you could take a look at the select
statement of the Go language. From the tour:
> The select
statement lets a goroutine wait on multiple communication operations.
>
> A select
blocks until one of its cases can run, then it executes that case. It chooses one at random if multiple are ready.
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
In the above example either a value will be taken from the channel c1
and assigned to the variable msg1
, or a value will be taken from the channel c2
and assigned to the variable msg2
. The Go select
statement is not restricted to reading from channels. It can include multiple heterogeneous cases like writing to bounded channels, waiting for timers etc. Replicating the full functionality of the Go select
statement is beyond the scope of this question.
答案1
得分: 3
我想到了这样的代码:
public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default)
{
if (channelReaders == null)
{
throw new ArgumentNullException(nameof(channelReaders));
}
if (channelReaders.Length == 0)
{
throw new ArgumentException("The list cannot be empty.", nameof(channelReaders));
}
if (channelReaders.Length == 1)
{
return (await channelReaders[0].ReadAsync(cancellationToken), 0);
}
// 首先尝试同步读取一个项目
for (int i = 0; i < channelReaders.Length; ++i)
{
if (channelReaders[i].TryRead(out var item))
{
return (item, i);
}
}
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
var waitToReadTasks = channelReaders
.Select(it => it.WaitToReadAsync(cts.Token).AsTask())
.ToArray();
var pendingTasks = new List<Task<bool>>(waitToReadTasks);
while (pendingTasks.Count > 1)
{
var t = await Task.WhenAny(pendingTasks);
if (t.IsCompletedSuccessfully && t.Result)
{
int index = Array.IndexOf(waitToReadTasks, t);
var reader = channelReaders[index];
// 尝试同步读取一个项目
if (reader.TryRead(out var item))
{
if (pendingTasks.Count > 1)
{
// 取消剩余读取器上的待定“等待读取”操作
// 然后等待完成
try
{
cts.Cancel();
await Task.WhenAll((IEnumerable<Task>)pendingTasks);
}
catch { }
}
return (item, index);
}
// 由于竞争条件,项目不再可用
if (!reader.Completion.IsCompleted)
{
// .. 但通道似乎仍然打开,所以我们重试
var waitToReadTask = reader.WaitToReadAsync(cts.Token).AsTask();
waitToReadTasks[index] = waitToReadTask;
pendingTasks.Add(waitToReadTask);
}
}
// 删除所有无法产生结果的已完成任务
pendingTasks.RemoveAll(tt => tt == t ||
tt.IsCompletedSuccessfully && !tt.Result ||
tt.IsFaulted || tt.IsCanceled);
}
int lastIndex = 0;
if (pendingTasks.Count > 0)
{
lastIndex = Array.IndexOf(waitToReadTasks, pendingTasks[0]);
await pendingTasks[0];
}
var lastItem = await channelReaders[lastIndex].ReadAsync(cancellationToken);
return (lastItem, lastIndex);
}
}
英文:
I came up with something like this:
public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default)
{
if (channelReaders == null)
{
throw new ArgumentNullException(nameof(channelReaders));
}
if (channelReaders.Length == 0)
{
throw new ArgumentException("The list cannot be empty.", nameof(channelReaders));
}
if (channelReaders.Length == 1)
{
return (await channelReaders[0].ReadAsync(cancellationToken), 0);
}
// First attempt to read an item synchronosuly
for (int i = 0; i < channelReaders.Length; ++i)
{
if (channelReaders[i].TryRead(out var item))
{
return (item, i);
}
}
using (var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
var waitToReadTasks = channelReaders
.Select(it => it.WaitToReadAsync(cts.Token).AsTask())
.ToArray();
var pendingTasks = new List<Task<bool>>(waitToReadTasks);
while (pendingTasks.Count > 1)
{
var t = await Task.WhenAny(pendingTasks);
if (t.IsCompletedSuccessfully && t.Result)
{
int index = Array.IndexOf(waitToReadTasks, t);
var reader = channelReaders[index];
// Attempt to read an item synchronosly
if (reader.TryRead(out var item))
{
if (pendingTasks.Count > 1)
{
// Cancel pending "wait to read" on the remaining readers
// then wait for the completion
try
{
cts.Cancel();
await Task.WhenAll((IEnumerable<Task>)pendingTasks);
}
catch { }
}
return (item, index);
}
// Due to the race condition item is no longer available
if (!reader.Completion.IsCompleted)
{
// .. but the channel appears to be still open, so we retry
var waitToReadTask = reader.WaitToReadAsync(cts.Token).AsTask();
waitToReadTasks[index] = waitToReadTask;
pendingTasks.Add(waitToReadTask);
}
}
// Remove all completed tasks that could not yield
pendingTasks.RemoveAll(tt => tt == t ||
tt.IsCompletedSuccessfully && !tt.Result ||
tt.IsFaulted || tt.IsCanceled);
}
int lastIndex = 0;
if (pendingTasks.Count > 0)
{
lastIndex = Array.IndexOf(waitToReadTasks, pendingTasks[0]);
await pendingTasks[0];
}
var lastItem = await channelReaders[lastIndex].ReadAsync(cancellationToken);
return (lastItem, lastIndex);
}
}
答案2
得分: 1
这是另一种方法。这个实现在概念上与alexm的实现implementation相同,直到没有任何通道立即可用的项目为止。然后,它通过避免Task.WhenAny
-in-a-loop模式,而是为每个通道启动一个异步循环来进行区分。所有循环都竞争更新一个共享的ValueTuple<T, int, bool>
consumed
变量,在关键区域中进行更新,以防止从多个通道中消耗元素。
/// <summary>
/// 从指定的任一通道读取器异步获取一个项目。
/// </summary>
public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(channelReaders);
if (channelReaders.Length == 0) throw new ArgumentException(
$"The {nameof(channelReaders)} argument is a zero-length array.");
foreach (var cr in channelReaders) if (cr is null) throw new ArgumentException(
$"The {nameof(channelReaders)} argument contains at least one null element.");
cancellationToken.ThrowIfCancellationRequested();
// 快速路径(至少有一个通道立即可用项目)
for (int i = 0; i < channelReaders.Length; i++)
if (channelReaders[i].TryRead(out var item))
return (item, i);
// 慢速路径(所有通道当前为空)
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
(T Item, int Index, bool HasValue) consumed = default;
Task[] tasks = channelReaders.Select(async (channelReader, index) =>
{
while (true)
{
try
{
if (!await channelReader.WaitToReadAsync(linkedCts.Token)
.ConfigureAwait(false)) break;
}
// 仅下面的异常情况是正常的。
catch (OperationCanceledException)
when (linkedCts.IsCancellationRequested) { break; }
catch when (channelReader.Completion.IsCompleted
&& !channelReader.Completion.IsCompletedSuccessfully) { break; }
// 此通道现在有一个项目可用。
lock (linkedCts)
{
if (consumed.HasValue)
return; // 已经从另一个通道中消耗了一个项目。
if (!channelReader.TryRead(out var item))
continue; // 我们输掉了竞赛,无法消耗可用的项目。
consumed = (item, index, true); // 我们成功消耗了一个项目。
}
linkedCts.Cancel(); // 取消其他任务。
return;
}
}).ToArray();
// 任务不应该失败。如果任务失败,说明有 bug。
try { foreach (var task in tasks) await task.ConfigureAwait(false); }
catch (Exception ex) { Debug.Fail("Unexpected error", ex.ToString()); throw; }
if (consumed.HasValue)
return (consumed.Item, consumed.Index);
cancellationToken.ThrowIfCancellationRequested();
Debug.Assert(channelReaders.All(cr => cr.Completion.IsCompleted));
throw new ChannelClosedException();
}
值得注意的是,这个解决方案以及alexm的解决方案都依赖于在消耗元素后批量取消所有挂起的WaitToReadAsync
操作。不幸的是,这触发了影响具有空闲生产者的.NET通道的臭名昭著的内存泄漏问题。当通道上的任何异步操作被取消时,取消的操作会保留在内存中,附加到通道的内部结构,直到有元素被写入通道。尽管微软已经将此行为归类为设计如此,但并未排除改进的可能性。有趣的是,这种模糊性使得这种效果不适合文档化。因此,了解这一点的唯一方法是偶然发现,要么通过从非官方来源阅读相关信息,要么通过遇到这个问题。
英文:
Here is another approach. This implementation is conceptually the same with alexm's implementation, until the point where no channel has an item available immediately. Then it differs by avoiding the Task.WhenAny
-in-a-loop pattern, and instead starts an asynchronous loop for each channel. All loops are racing to update a shared ValueTuple<T, int, bool>
consumed
variable, which is updated in a critical region, in order to prevent consuming an element from more than one channels.
/// <summary>
/// Takes an item asynchronously from any one of the specified channel readers.
/// </summary>
public static async Task<(T Item, int Index)> TakeFromAnyAsync<T>(
ChannelReader<T>[] channelReaders,
CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(channelReaders);
if (channelReaders.Length == 0) throw new ArgumentException(
$"The {nameof(channelReaders)} argument is a zero-length array.");
foreach (var cr in channelReaders) if (cr is null) throw new ArgumentException(
$"The {nameof(channelReaders)} argument contains at least one null element.");
cancellationToken.ThrowIfCancellationRequested();
// Fast path (at least one channel has an item available immediately)
for (int i = 0; i < channelReaders.Length; i++)
if (channelReaders[i].TryRead(out var item))
return (item, i);
// Slow path (all channels are currently empty)
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
(T Item, int Index, bool HasValue) consumed = default;
Task[] tasks = channelReaders.Select(async (channelReader, index) =>
{
while (true)
{
try
{
if (!await channelReader.WaitToReadAsync(linkedCts.Token)
.ConfigureAwait(false)) break;
}
// Only the exceptional cases below are normal.
catch (OperationCanceledException)
when (linkedCts.IsCancellationRequested) { break; }
catch when (channelReader.Completion.IsCompleted
&& !channelReader.Completion.IsCompletedSuccessfully) { break; }
// This channel has an item available now.
lock (linkedCts)
{
if (consumed.HasValue)
return; // An item has already been consumed from another channel.
if (!channelReader.TryRead(out var item))
continue; // We lost the race to consume the available item.
consumed = (item, index, true); // We consumed an item successfully.
}
linkedCts.Cancel(); // Cancel the other tasks.
return;
}
}).ToArray();
// The tasks should never fail. If a task ever fails, we have a bug.
try { foreach (var task in tasks) await task.ConfigureAwait(false); }
catch (Exception ex) { Debug.Fail("Unexpected error", ex.ToString()); throw; }
if (consumed.HasValue)
return (consumed.Item, consumed.Index);
cancellationToken.ThrowIfCancellationRequested();
Debug.Assert(channelReaders.All(cr => cr.Completion.IsCompleted));
throw new ChannelClosedException();
}
It should be noted that this solution, as well as alexm's solution, depends on canceling en masse all pending WaitToReadAsync
operations when an element has been consumed. Unfortunately this triggers the infamous memory leak issue that affects .NET channels with idle producers. When any async operation on a channel is canceled, the canceled operation remains in memory, attached to the internal structures of the channel, until an element is written to the channel. This behavior has been triaged by Microsoft as by-design, although the possibility of improving it has not been ruled out. Interestingly this ambiguity makes this effect not eligible for documentation. So the only way to get informed about this is by chance, either by reading about it from unofficial sources, or by falling into it.
4: https://stackoverflow.com/questions/67573683/channels-with-cancellationtokensource-with-timeout-memory-leak-after-dispose "Channels with CancellationTokenSource with timeout memory leak after dispose"
5: https://github.com/dotnet/runtime/issues/761#issuecomment-564566525 "Possible CancellationTokenRegistration leak in AsyncOperation<T>"
6: https://github.com/dotnet/runtime/issues/761#issuecomment-1131583049
答案3
得分: 0
问题变得更容易解决,如果通道的使用方式与Go语言中的使用方式相同:将通道(读取器)作为输入,将通道(读取器)作为输出。
IEnumerable<ChannelReader<T>> sources = ....;
await foreach(var msg in sources.TakeFromAny(token))
{
....
}
或者
var merged = sources.TakeFromAny(token);
...
var msg = await merged.ReadAsync(token);
在这种情况下,来自所有通道读取器的输入被复制到单个输出通道。该方法的返回值是该通道的ChannelReader。
CopyToAsync辅助函数
可以使用CopyToAsync
函数将消息从输入源复制到输出通道:
static async Task CopyToAsync<T>(
this ChannelReader<T> input,
ChannelWriter<T> output,
CancellationToken token = default)
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
//如果请求了取消,则提前退出
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
await output.WriteAsync(msg, token);
}
}
}
这段代码类似于ReadAllAsync,但如果请求了取消,则立即退出。ReadAllAsync
会返回所有可用的项,即使请求了取消。所使用的方法,包括WriteAsync
,如果通道已关闭,则不会抛出异常,这样错误处理就变得更容易了。
错误处理和铁路导向编程
WaitToReadAsync
会在源发生故障时抛出异常,该异常将传播到调用方法,并通过Task.WhenAll
传播到输出通道。
这可能会有点混乱,因为它中断了整个流水线。为了避免这种情况,错误可以在CopyToAsync
内部被忽略或记录。更好的选择是使用铁路导向编程,并将所有消息包装在一个Result<TMsg, TError>
类中,例如:
static async Task CopyToAsync<Result<T, Exception>>(
this ChannelReader<Result<T, Exception>> input,
ChannelWriter<Result<T, Exception>> output,
CancellationToken token = default)
{
try
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
//如果请求了取消,则提前退出
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
var newMsg = Result.FromValue(msg);
await output.WriteAsync(newMsg, token);
}
}
}
catch (Exception exc)
{
output.TryWrite(Result<T>.FromError(exc));
}
}
TakeFromAsync
TakeFromAny
(也可以使用MergeAsync
作为更好的名称)可以是:
static ChannelReader<T> TakeFromAny(
this IEnumerable<ChannelReader<T>> inputs,
CancellationToken token = default)
{
var outChannel = Channel.CreateBounded<T>(1);
var readers = inputs.Select(rd => CopyToAsync(rd, outChannel, token));
_ = Task.WhenAll(readers)
.ContinueWith(t => outChannel.TryComplete(t.Exception));
return outChannel;
}
使用容量为1的有界通道可以确保下游代码的背压行为不会改变。
添加源索引
可以调整此代码以同时发出源的索引:
static async Task CopyToAsync<T>(
this ChannelReader<T> input, int index,
ChannelWriter<(T, int)> output,
CancellationToken token = default)
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
await output.WriteAsync((msg, index), token);
}
}
}
static ChannelReader<(T, int)> TakeFromAny(
this IEnumerable<ChannelReader<T>> inputs,
CancellationToken token = default)
{
var outChannel = Channel.CreateBounded<(int, T)>(1);
var readers = inputs.Select((rd, idx) => CopyToAsync(rd, idx, outChannel, token));
_ = Task.WhenAll(readers)
.ContinueWith(t => outChannel.TryComplete(t.Exception));
return outChannel;
}
英文:
The problem is a lot easier if channels are used the way they're used in Go: Channel(Readers) as input, Channel(Readers) as output.
IEnumerable<ChannelReader<T>> sources=....;
await foreach(var msg in sources.TakeFromAny(token))
{
....
}
or
var merged=sources.TakeFromAny(token);
...
var msg=await merged.ReadAsync(token);
In this case, the input from all channel readers is copied to a single output channel. The return value of the method is the ChannelReader of this channel.
CopyToAsync helper
A CopyToAsync
function can be used to copy messages from an input source to the output channel:
static async Task CopyToAsync<T>(
this ChannelReader<T> input,
ChannelWriter<T> output,
CancellationToken token=default)
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
//Early exit if cancellation is requested
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
await output.WriteAsync(msg,token);
}
}
}
This code is similar to ReadAllAsync but exits immediately if cancellation is requested. ReadAllAsync
will return all available items even if cancellation is requested. The methods used, including
WriteAsync
doesn't throw if the channels are closed, which makes error handling a lot easier.
Error Handling and Railway-oriented programming
WaitToReadAsync
does throw if the source faults but that exception and that exception will be propagated to the calling method and through Task.WhenAll
to the output channel.
This can be a bit messy because it interrupts the entire pipeline. To avoid this, the error could be swallowed or logged inside CopyToAsync
. An even better option would be to use Railway-oriented programming and wrap all messages in a Result<TMsg,TError>
class eg :
static async Task CopyToAsync<Result<T,Exception>>(
this ChannelReader<Result<T,Exception>> input,
ChannelWriter<Result<T,Exception>> output,
CancellationToken token=default)
{
try
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
//Early exit if cancellation is requested
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
var newMsg=Result.FromValue(msg);
await output.WriteAsync(newMsg,token);
}
}
}
catch(Exception exc)
{
output.TryWrite(Result<T>.FromError(exc));
}
}
TakeFromAsync
TakeFromAny
(MergeAsync
may be a better name) can be:
static ChannelReader<T> TakeFromAny(
this IEnumerable<ChannelReader<T> inputs,
CancellationToken token=default)
{
var outChannel=Channel.CreateBounded<T>(1);
var readers=inputs.Select(rd=>CopyToAsync(rd,outChannel,token));
_ = Task.WhenAll(readers)
.ContinueWith(t=>outChannel.TryComplete(t.Exception));
return outChannel;
}
Using a bounded capacity of 1 ensures the backpressure behavior of downstream code doesn't change.
Adding a source index
This can be adjusted to emit the index of the source as well:
static async Task CopyToAsync<T>(
this ChannelReader<T> input,int index,
ChannelWriter<(T,int)> output,
CancellationToken token=default)
{
while (await input.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (!token.IsCancellationRequested && input.TryRead(out T? msg))
{
await output.WriteAsync((msg,index),token);
}
}
}
static ChannelReader<(T,int)> TakeFromAny(
this IEnumerable<ChannelReader<T> inputs,
CancellationToken token=default)
{
var outChannel=Channel.CreateBounded<(int,T)>(1);
var readers=inputs.Select((rd,idx)=>CopyToAsync(rd,idx,outChannel,token));
_ = Task.WhenAll(readers)
.ContinueWith(t=>outChannel.TryComplete(t.Exception));
return outChannel;
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论