When it comes to multi-threading, one of the first patterns that emerges is the producer-consumer pattern.
The Producer-Consumer pattern is great for buffered asynchronous communication which is a great way of separating work that needs to be done, from the execution of that work.
Since .NET 4.0, the standard way to implement the pattern is to use the BlockingCollection class which wraps the IProducerConsumerCollection interface. It’s been in the wild for a long time and was covered by Yacoub Massad earlier in his article.
But it has a major issue: it doesn’t support the async/await paradigm.
Async Await in Producer Consumer Pattern
It’s possible to create an async/await version by using DataFlow blocks, but it’s less elegant and significantly less performant.
The new API was developed as a part of .NET Core which focuses a lot on performance improvements. It was published as a separate NuGet package System.Threading.Channels. It has an intuitive interface and was designed to be used in asynchronous code. However, unlike BlockingCollection, it has its own storage mechanism which is a FIFO queue and it’s not possible to customise it.
Channel Class
The API entry point is the Channel class which exposes a few static factory methods that can create two categories of channels:
Bounded channels
public static Channel CreateBounded(int capacity);
public static Channel CreateBounded(BoundedChannelOptions options)
Users can choose channel capacity and tailor how writing the operation handles a situation when the channel is full. The following options are available:
1) Wait until space is available (default option).
2) Drop the oldest item in the channel.
3) Drop the newest item in the channel.
4) Drop the item being written.
Unbounded channels
public static Channel CreateUnbounded();
public static Channel CreateUnbounded(UnboundedChannelOptions options)
It should be used cautiously as it can lead to OutOfMemoryException if consumers are slower than producers.
Each Channel exposes two properties: Writer and Reader for producer and consumer concerns respectively.
The writer accepts new items until the channel is completed with the TryComplete method. New items can be written synchronously with the TryWrite method or asynchronously with the WriteAsync method. The WaitToWriteAsync method allows to await until the channel is available for writing and returns a Boolean which indicates whether the channel is completed.
The reader mimics writer’s methods and exposes the TryRead method for synchronous reading and the ReadAsync method for asynchronous reading. There is also the WaitToReadAsync method which allows to await until new items are available and indicates whether the channel is completed. The reader also exposes the Completion property. It returns a task that completes when no more data will ever be available to be read from the channel (when the TryComplete method is called on the respective writer and the remaining messages are processed).
The following code snippet demonstrates how to use the library.
In order to make it easier to compare it to the BlockingCollection approach, I deliberately created an async version of ProcessDocumentsUsingProducerConsumerPattern example described by Yacoub Massad in his post.
public static async Task ProcessDocumentsUsingProducerConsumerPattern()
{
const int boundedQueueSize = 500;
var savingChannel = Channel.CreateBounded(boundedQueueSize);
var translationChannel = Channel.CreateUnbounded();
var saveDocumentTask = Task.Run(async () =>
{
while (await savingChannel.Reader.WaitToReadAsync())
{
while (savingChannel.Reader.TryRead(out var document))
{
Debug.WriteLine($"Saving document: {document}");
await SaveDocumentToDestinationStore(document);
}
}
});
var translateDocumentTasks = Enumerable
.Range(0, 7) // 7 translation tasks
.Select(_ => Task.Run(async () =>
{
while (await translationChannel.Reader.WaitToReadAsync())
{
while (translationChannel.Reader.TryRead(out var documentId))
{
Debug.WriteLine($"Reading and translating document {documentId}");
var document = await ReadAndTranslateDocument(documentId);
await savingChannel.Writer.WriteAsync(document);
}
}
}))
.ToArray();
var allItemsAreWrittenIntoTranslationChannel = GetDocumentIdsToProcess()
.Select(id => translationChannel.Writer.TryWrite(id))
.All(_ => _);
Debug.Assert(allItemsAreWrittenIntoTranslationChannel); // All items should be written successfully into the unbounded channel.translationChannel.Writer.Complete();
await Task.WhenAll(translateDocumentTasks);
savingChannel.Writer.Complete();
await savingChannel.Reader.Completion;
}
As mentioned above, Channels were designed with high performance in mind. So we ran a series of tests to compare the throughput of Channels vs BlockingCollection vs DataFlow.
Channels vs BlockingCollection vs DataFlow
The first test measures the “maximum” throughput. The consumers just dequeue messages and don’t do any actual work. The intent is to compare the overhead added by each approach.
In this case, we had a channel with a capacity of 64 items and we measured the duration of processing for 100,000,000 items. To cover a wider range of scenarios, we ran tests with different combinations of numbers of producers and consumers, each in the range of 1-32.
In 80% of scenarios Channels shows better throughput than both BlockingCollection and DataFlow. It demonstrates the best results when the number of producers is greater than the number of consumers. And the worst results when the number of consumers is greater than the number of producers.
The second set of tests covers a more realistic scenario. Each consumer dequeues a message and emulates 25ms of processing. We tested both IO bound and CPU bound cases. On this occasion, we had a channel with a capacity of 64 items and we measured the duration of processing for 32,768 items.
This time the results are not so significantly different. In most cases, Channels and DataFlow are at par. It can probably be explained by the fact that the emulated processing time is relatively high and the number of processed items is significantly lower than in the first test.
Conclusion:
The new Channels should be used to implement the producer-consumer pattern in asynchronous code, especially in performance-sensitive applications where it’s important to reduce the time spent in the GC.
It offers clean API and great throughput. But in certain scenarios, the impact can be unexpected. In existing applications, especially synchronous ones, it may not give significant benefits if BlockingCollection is replaced with Channels. On the other hand, DataFlow is a great tool for building more complex processing pipelines. However, it’s a bit “heavy” for the simple producer-consumer case.
* All measurements were taken on a Windows 10 x64 PC with CPU Intel Core i7-7700@3.60GHz (4 cores) and RAM 64Gb
This article was technically reviewed by Yacoub Massad.
This article has been editorially reviewed by Suprotim Agarwal.
C# and .NET have been around for a very long time, but their constant growth means there’s always more to learn.
We at DotNetCurry are very excited to announce The Absolutely Awesome Book on C# and .NET. This is a 500 pages concise technical eBook available in PDF, ePub (iPad), and Mobi (Kindle).
Organized around concepts, this Book aims to provide a concise, yet solid foundation in C# and .NET, covering C# 6.0, C# 7.0 and .NET Core, with chapters on the latest .NET Core 3.0, .NET Standard and C# 8.0 (final release) too. Use these concepts to deepen your existing knowledge of C# and .NET, to have a solid grasp of the latest in C# and .NET OR to crack your next .NET Interview.
Click here to Explore the Table of Contents or Download Sample Chapters!
Was this article worth reading? Share it with fellow developers too. Thanks!
Alex Basiuk is a software engineer specializing in software & distributed systems development. He has over 15 years of experience delivering various technology solutions for the financial sector. Alex is passionate about software architecture, high-performance computing and functional programming.