Asynchronous Producer Consumer Pattern in .NET (C#)

Posted by: Alex Basiuk , on 9/9/2019, in Category .NET Standard & .NET Core
Views: 3148
Abstract: Microsoft created in a new API in .NET Core some time back which is effectively a recommended option for producer consumer now.This article introduces this new .NET API for implementing asynchronous version of the producer-consumer pattern.

When it comes to multi-threading, one of the first patterns that emerges is the producer-consumer pattern.

The Producer-Consumer pattern is great for buffered asynchronous communication which is a great way of separating work that needs to be done, from the execution of that work.

Since .NET 4.0, the standard way to implement the pattern is to use the BlockingCollection class which wraps the IProducerConsumerCollection interface. It’s been in the wild for a long time and was covered by Yacoub Massad earlier in his article.

But it has a major issue: it doesn’t support the async/await paradigm.

Async Await .NET Core

Async Await in Producer Consumer Pattern

It’s possible to create an async/await version by using DataFlow blocks, but it’s less elegant and significantly less performant.

The new API was developed as a part of .NET Core which focuses a lot on performance improvements. It was published as a separate NuGet package System.Threading.Channels. It has an intuitive interface and was designed to be used in asynchronous code. However, unlike BlockingCollection, it has its own storage mechanism which is a FIFO queue and it’s not possible to customise it.

Are you a .NET/C# developer looking for a resource covering New Technologies, in-depth Tutorials and Best Practices?

Well, you are in luck! We at DotNetCurry release a digital magazine once every two months aimed at Developers, Architects and Technical Managers and cover ASP.NET Core, C#, Patterns, .NET Core, ASP.NET MVC, Azure, DevOps, ALM, TypeScript, Angular, React, and much more. Subscribe to this magazine for FREE and receive all previous, current and upcoming editions, right in your Inbox. No Gimmicks. No Spam Policy.

Click here to Download the Magazines For Free

Channel Class

The API entry point is the Channel class which exposes a few static factory methods that can create two categories of channels:

Bounded channels

public static Channel CreateBounded(int capacity);
public static Channel CreateBounded(BoundedChannelOptions options)

Users can choose channel capacity and tailor how writing the operation handles a situation when the channel is full. The following options are available:

1) Wait until space is available (default option).

2) Drop the oldest item in the channel.

3) Drop the newest item in the channel.

4) Drop the item being written.

Unbounded channels

public static Channel CreateUnbounded();
public static Channel CreateUnbounded(UnboundedChannelOptions options)

It should be used cautiously as it can lead to OutOfMemoryException if consumers are slower than producers.

Each Channel exposes two properties: Writer and Reader for producer and consumer concerns respectively.

The writer accepts new items until the channel is completed with the TryComplete method. New items can be written synchronously with the TryWrite method or asynchronously with the WriteAsync method. The WaitToWriteAsync method allows to await until the channel is available for writing and returns a Boolean which indicates whether the channel is completed.

The reader mimics writer’s methods and exposes the TryRead method for synchronous reading and the ReadAsync method for asynchronous reading. There is also the WaitToReadAsync method which allows to await until new items are available and indicates whether the channel is completed. The reader also exposes the Completion property. It returns a task that completes when no more data will ever be available to be read from the channel (when the TryComplete method is called on the respective writer and the remaining messages are processed).

The following code snippet demonstrates how to use the library.

In order to make it easier to compare it to the BlockingCollection approach, I deliberately created an async version of ProcessDocumentsUsingProducerConsumerPattern example described by Yacoub Massad in his post.

public static async Task ProcessDocumentsUsingProducerConsumerPattern()
{
    const int boundedQueueSize = 500;
    var savingChannel = Channel.CreateBounded(boundedQueueSize);
    var translationChannel = Channel.CreateUnbounded();

    var saveDocumentTask = Task.Run(async () =>
    {
        while (await savingChannel.Reader.WaitToReadAsync())
        {
            while (savingChannel.Reader.TryRead(out var document))
            {
                Debug.WriteLine($"Saving document: {document}");
                await SaveDocumentToDestinationStore(document);
            }
        }
    });

    var translateDocumentTasks = Enumerable
        .Range(0, 7) // 7 translation tasks
        .Select(_ => Task.Run(async () =>
        {
            while (await translationChannel.Reader.WaitToReadAsync())
            {
                while (translationChannel.Reader.TryRead(out var documentId))
                {
                    Debug.WriteLine($"Reading and translating document {documentId}");
                    var document = await ReadAndTranslateDocument(documentId);
                    await savingChannel.Writer.WriteAsync(document);
                }
            }
        }))
        .ToArray();

    var allItemsAreWrittenIntoTranslationChannel = GetDocumentIdsToProcess()
                                                    .Select(id => translationChannel.Writer.TryWrite(id))
                                                    .All(_ => _);
    Debug.Assert(allItemsAreWrittenIntoTranslationChannel); // All items should be written successfully into the unbounded channel.translationChannel.Writer.Complete();

    await Task.WhenAll(translateDocumentTasks);
    savingChannel.Writer.Complete();

    await savingChannel.Reader.Completion;
}

As mentioned above, Channels were designed with high performance in mind. So we ran a series of tests to compare the throughput of Channels vs BlockingCollection vs DataFlow.

Channels vs BlockingCollection vs DataFlow

The first test measures the “maximum” throughput. The consumers just dequeue messages and don’t do any actual work. The intent is to compare the overhead added by each approach.

In this case, we had a channel with a capacity of 64 items and we measured the duration of processing for 100,000,000 items. To cover a wider range of scenarios, we ran tests with different combinations of numbers of producers and consumers, each in the range of 1-32.

channels-blockingcollection-dataflow

In 80% of scenarios Channels shows better throughput than both BlockingCollection and DataFlow. It demonstrates the best results when the number of producers is greater than the number of consumers. And the worst results when the number of consumers is greater than the number of producers.

The second set of tests covers a more realistic scenario. Each consumer dequeues a message and emulates 25ms of processing. We tested both IO bound and CPU bound cases. On this occasion, we had a channel with a capacity of 64 items and we measured the duration of processing for 32,768 items.

io-bound-cpu-bound

This time the results are not so significantly different. In most cases, Channels and DataFlow are at par. It can probably be explained by the fact that the emulated processing time is relatively high and the number of processed items is significantly lower than in the first test.

Conclusion:

The new Channels should be used to implement the producer-consumer pattern in asynchronous code, especially in performance-sensitive applications where it’s important to reduce the time spent in the GC.

It offers clean API and great throughput. But in certain scenarios, the impact can be unexpected. In existing applications, especially synchronous ones, it may not give significant benefits if BlockingCollection is replaced with Channels. On the other hand, DataFlow is a great tool for building more complex processing pipelines. However, it’s a bit “heavy” for the simple producer-consumer case.

* All measurements were taken on a Windows 10 x64 PC with CPU Intel Core i7-7700@3.60GHz (4 cores) and RAM 64Gb

This article was technically reviewed by Yacoub Massad.

This article has been editorially reviewed by Suprotim Agarwal.

Absolutely Awesome Book on C# and .NET

C# and .NET have been around for a very long time, but their constant growth means there’s always more to learn.

We at DotNetCurry are very excited to announce the The Absolutely Awesome Book on C# and .NET. This is a 500 pages concise technical eBook available in PDF, ePub (iPad), and Mobi (Kindle).

Organized around concepts, this eBook aims to provide a concise, yet solid foundation in C# and .NET, covering C# 6.0, C# 7.0 and .NET Core, with chapters on .NET Standard and the upcoming C# 8.0 too. Use these concepts to deepen your existing knowledge of C# and .NET, to have a solid grasp of the latest in C# and .NET OR to crack your next .NET Interview.

Click here to Explore the Table of Contents or Download Sample Chapters!

What Others Are Reading!
Was this article worth reading? Share it with fellow developers too. Thanks!
Share on LinkedIn
Share on Google+

Author
Alex Basiuk is a software engineer specializing in software & distributed systems development. He has over 15 years of experience delivering various technology solutions for the financial sector. Alex is passionate about software architecture, high-performance computing and functional programming.


Page copy protected against web site content infringement 	by Copyscape




Feedback - Leave us some adulation, criticism and everything in between!

Categories

JOIN OUR COMMUNITY

POPULAR ARTICLES

C# .NET BOOK

C# Book for Building Concepts and Interviews

Tags

JQUERY COOKBOOK

jQuery CookBook