英文:
Producer Consumer Queue best practices and performance
问题
我正在使用C#构建生产者消费者队列,并且在寻找在鲁棒性和性能方面的最佳方法时进行阅读。
多年来,我一直在使用BlockingCollection
,但我发现了TPLDataflow
和Channels
。
我已经进行了一些基准测试,并发现TPL和Channels在出队元素方面都要快得多。
我的要求是:
- 队列行为(维护项目顺序)
- 多个线程可以入队元素
- 一个线程读取元素(以维护顺序)
以下是不同实现的代码部分:
IProducerConsumer 接口
public interface IProducerConsumer
{
void Enqueue(Action item);
void Stop();
void StartDequeing();
}
Blocking Collection 实现
public class BlockingCollectionQueue : IProducerConsumer
{
// ...(代码部分未翻译)
}
Channel 实现
public class ChannelQueue : IProducerConsumer
{
// ...(代码部分未翻译)
}
TPLDataFlow 使用 BufferBlock 实现
public class DataFlowQueue : IProducerConsumer
{
// ...(代码部分未翻译)
}
TPLDataFlow 使用 ActionBlock 实现
public class ActionBlockQueue : IProducerConsumer
{
// ...(代码部分未翻译)
}
基准测试使用 BenchmarDotNet
public class MultipleJobBenchMark
{
// ...(代码部分未翻译)
}
结果
- BlockingCollection: 平均 21.5 毫秒
- BufferBlock 队列: 平均 14.937 毫秒
- ActionBlock 队列: 6.007 毫秒
- Channel: 4.781 毫秒
问题和结论
通过进行这个练习,我发现目前使用BlockingCollection
可能不是最佳选择。我不明白为什么BufferBlock
和ActionBlock
之间存在如此大的差异。我做了这两种实现是因为在我的接口中定义了StartDequeue()
方法,而使用ActionBlock
不可能,因为出队是在ActionBlock
构造中完成的。我的BufferBlock
实现是否最佳?
我想在这里发布我的结果,以了解目前最受欢迎的生产者-消费者队列,并为什么我发现ActionBlock
和BufferBlock
之间有如此大的差异。
英文:
I'm building a producer consummer queue in C# and I was reading for searching the best method in terms of robust and performance.
For years I was using always BlockingCollection
but I have discovered TPLDataflow
and Channels
.
I'have been doing some benchmarking and I have seen that both TPL and Channels are much more faster in dequeing elements.
My requirements are
- Queue behaviour (maintain item ordering)
- Multiple threads can Enqueue elements
- One thread reading elements (to maintain order)
IProducerConsumer interface
public interface IProducerConsumer
{
void Enqueue(Action item);
void Stop();
void StartDequeing();
}
Blocking Collection Implementation
public class BlockingCollectionQueue : IProducerConsumer
{
private readonly BlockingCollection<Action> _defaultQueue;
private Task _dequeTask;
public BlockingCollectionQueue()
{
_defaultQueue = new BlockingCollection<Action>(new ConcurrentQueue<Action>());
}
public void Enqueue(Action item)
{
if (!_defaultQueue.IsAddingCompleted)
{
_defaultQueue.Add(item);
}
}
public void Stop()
{
_defaultQueue.CompleteAdding();
}
public void StartDequeing()
{
Task.Run(DequeueTask);
}
private void DequeueTask()
{
foreach (var item in _defaultQueue.GetConsumingEnumerable())
{
item?.Invoke();
}
}
}
Channel Implementation
public class ChannelQueue : IProducerConsumer
{
private readonly Channel<Action> _channel;
private readonly ChannelWriter<Action> _channelWriter;
private readonly ChannelReader<Action> _channelReader;
public ChannelQueue()
{
_channel = Channel.CreateUnbounded<Action>(new UnboundedChannelOptions() { SingleReader = true });
_channelWriter = _channel.Writer;
_channelReader = _channel.Reader;
}
public void Enqueue(Action item)
{
_channelWriter.TryWrite(item);
}
public void StartDequeing()
{
Task.Run(DequeueTask);
}
private async Task DequeueTask()
{
while (await _channelReader.WaitToReadAsync())
{
while (_channelReader.TryRead(out var job))
{
job?.Invoke();
}
}
}
public void Stop()
{
_channelWriter.Complete();
}
}
TPLDataFlow using BufferBlock implementation
public class DataFlowQueue : IProducerConsumer
{
private readonly BufferBlock<Action> _bufferBlock;
private Task _dequeTask;
public DataFlowQueue()
{
var dataflowOptions = new DataflowBlockOptions() { EnsureOrdered = true };
_bufferBlock = new BufferBlock<Action>(dataflowOptions);
}
public void Enqueue(Action item)
{
_bufferBlock.Post(item);
}
public void StartDequeing()
{
_dequeTask = Task.Run(DequeueTask);
}
private async Task DequeueTask()
{
while (await _bufferBlock.OutputAvailableAsync())
{
while(_bufferBlock.TryReceive(out var item))
{
item?.Invoke();
}
}
}
public void Stop()
{
_bufferBlock.Complete();
}
}
TPLDataFlow using ActionBlock
public class ActionBlockQueue : IProducerConsumer
{
private readonly ActionBlock<Action> _actionBlock;
private Task _dequeTask;
public ActionBlockQueue()
{
var dataflowOptions = new ExecutionDataflowBlockOptions() { EnsureOrdered = true, MaxDegreeOfParallelism = 1 };
_actionBlock = new ActionBlock<Action>(item=> item?.Invoke(), dataflowOptions);
}
public void Enqueue(Action item, QueuePriority priority = QueuePriority.Default)
{
_actionBlock.Post(item);
}
public void StartDequeing()
{
}
public void Stop()
{
_actionBlock.Complete();
}
}
Benchmark using BenchmarDotNet
As you can see all of the implementations are storing Action
in the queues and I'm using an AutoResetEvent
to signal when the last element is dequeued.
public class MultipleJobBenchMark
{
private AutoResetEvent _autoResetEvent;
public MultipleJobBenchMark()
{
_autoResetEvent = new AutoResetEvent(false);
}
[Benchmark]
public void BlockingCollectionQueue()
{
DoMultipleJobs(new BlockingCollectionQueue());
}
[Benchmark]
public void DataFlowQueue()
{
DoMultipleJobs(new DataFlowQueue());
}
[Benchmark]
public void ActionBlockQueue()
{
DoMultipleJobs(new ActionBlockQueue());
}
[Benchmark]
public void ChannelQueue()
{
DoMultipleJobs(new ChannelQueue());
}
private void DoMultipleJobs(IProducerConsumer producerConsumerQueue)
{
producerConsumerQueue.StartDequeing();
int jobs = 100000;
for (int i = 0; i < jobs - 1; i++)
{
producerConsumerQueue.Enqueue(() => { });
}
producerConsumerQueue.Enqueue(() => _autoResetEvent.Set());
_autoResetEvent.WaitOne();
producerConsumerQueue.Stop();
}
}
Results
- BlockingCollection: Mean 21.5ms
- BufferBlock Queue: Mean 14.937ms
- ActionBlock Queue: 6.007ms
- Channel: 4.781ms
Questions and Conclussions
By doing this exercise I have seen that at this time the use of BlockingCollection
maybe is not the best option.
I don't understand why there is such a big difference between BufferBlock
and ActionBlock
. I have done both implementations becase in my interface I was defined StartDequeue()
method and with ActionBlock
it is not possible because dequeuing is done at ActionBlock
construct.
Does my implementation using BufferBlock the best?
I wanted to post here my results to see which is the most accepted of Producer Consummer Queue at this momment and why I have seen such a big difference Between ActionBlock
and BufferBlock
答案1
得分: 1
根据您的基准测试结果,Channel<T>
比BlockingCollection<T>
在生产者消费者队列方面性能较好。这是合理的,因为Channel<T>
是一个较新的组件(2019年),并利用了ValueTask<T>
技术,而在BlockingCollection<T>
引入时(2010年)这项技术是不存在的。要使这个效果有明显的影响,您必须每秒传递大量的项目通过队列。在这种情况下,考虑批处理/分块处理项目,而不是逐个传递每个项目可能是个好主意。
总的来说,我认为当您的生产者消费者系统是同步的时候,BlockingCollection<T>
仍然是一个不错的选择,即生产者和消费者在专用线程上运行。当您想要构建一个异步系统时,Channel<T>
是一个自然的选择,即您正在调用异步API,并且希望有效地利用线程。至于TPL Dataflow库中的组件,在您想要构建一个可以运行在较旧版本的.NET上的异步系统时,它们是一个有效的选择。在两者都可用时,几乎没有理由更喜欢较旧的BufferBlock<T>
而不是较新的Channel<T>
。Channel<T>
具有更干净和更富有表现力的API,并提供更多选项,比如在达到最大容量时添加新项时删除旧项的能力。
一个罕见的情况,您可能希望避免使用Channel<T>
的情况是,您的生产者、消费者或两者都在每个异步写入/读取操作中使用取消令牌,而这些令牌经常被取消。这种用法可能会在Channel<T>
中触发内存泄漏,但不会在BufferBlock<T>
中触发。有关详细信息,请参阅此问题。
英文:
As your benchmarks reveal, the Channel<T>
is a relatively more performant producer consumer queue than the BlockingCollection<T>
. Which is reasonable since the Channel<T>
is a newer component (2019), and takes advantage of the ValueTask<T>
technology that was non-existent when the BlockingCollection<T>
was introduced (2010). For this to have any measurable effect, you must be passing crazy many items per second through the queue. In which case it might be a good idea to consider processing the items in batches/chunks, instead of passing each item individually through the queue.
In general I think that the BlockingCollection<T>
is still a good option when your producer consumer system is synchronous, i.e. when the producer and the consumer are running on dedicated threads. The Channel<T>
is a natural choice when you want to build an asynchronous system, i.e. you are calling asynchronous APIs and you want to make efficient use of threads. As for the components found in the TPL Dataflow library, they are a valid option when you want to build an asynchronous system that can run on older versions of .NET. There are very few reasons to prefer the older BufferBlock<T>
over the newer Channel<T>
when both are available. The Channel<T>
has a cleaner and more expressive API, and offers more options. Like the ability to drop old items, when new items are added and the maximum capacity has been reached.
A rare scenario where you may want to avoid the Channel<T>
is in case your producer, or the consumer, or both, is using cancellation tokens in each and every asynchronous write/read operation, that are routinely canceled. This usage can trigger a memory leak in the Channel<T>
, but not in the BufferBlock<T>
. See [this question][3] for details.
[3]: https://stackoverflow.com/questions/67573683/channels-with-cancellationtokensource-with-timeout-memory-leak-after-dispose "Channels with CancellationTokenSource with timeout memory leak after dispose"
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论