The Producer Consumer Pattern in .NET (C#)

Posted by: Yacoub Massad , on 12/23/2017, in Category Patterns & Practices
Views: 35379
Abstract: This article discusses the producer-consumer pattern in .NET, some reasons why we should use it and demonstrates some examples of how to implement it in .NET.

As a result of machines having multiple processing cores, parallel programming is becoming more important these days. A typical PC these days has four to eight cores, and servers have a lot more than that.

It makes sense to write software that uses multiple cores to enhance performance.

Are you keeping up with new developer technologies? Advance your IT career with our Free Developer magazines covering C#, Patterns, .NET Core, MVC, Azure, Angular, React, and more. Subscribe to the DotNetCurry (DNC) Magazine for FREE and download all previous, current and upcoming editions.

Consider the following code from an application that processes documents:

void ProcessDocuments()
{
    string[] documentIds = GetDocumentIdsToProcess();

    foreach(var id in documentIds)
    {
        Process(id);
    }
}

string[] GetDocumentIdsToProcess() => ...

void Process(string documentId)
{
    var document = ReadDocumentFromSourceStore(documentId);

    var translatedDocument = TranslateDocument(document, Language.English);

    SaveDocumentToDestinationStore(translatedDocument);
}

Document ReadDocumentFromSourceStore(string identifier) => ...

Document TranslateDocument(Document document, Language language) => ...

void SaveDocumentToDestinationStore(Document document) => ...

This code gets the ids of documents that need to be processed, obtains each document from the source store (e.g. database), translates it, and then saves it to the destination store.

The code right now will run under a single thread.

In C#, we can easily modify this code to use multiple cores using the Parallel class like this:

void ProcessDocumentsInParallel()
{
    string[] documentIds = GetDocumentIdsToProcess();

    Parallel.ForEach(
        documentIds,
        id => Process(id));
}

This is called Data Parallelism because we apply the same operations on each data item; and in this particular case, on each document id.

Parallel.ForEach will parallelize the invocation of the Process method over the items in the documentIds array.

Of course, all of this depends on the environment!

For example, the higher the number of available cores, the more the number of documents that can be processed in parallel.

Consider the following figure:

simple-threads

Assuming that currently eight threads are processing, we can imagine each of these threads taking one document id, calling ReadDocumentFromSourceStore, TranslateDocument, and then SaveDocumentToDestinationStore.

Once each thread is done with its document, it will grab another document id and process it. Parallel.ForEach will manage all of this.

Although this might seem a good idea, there might be some issues with this approach.

I will go through these issues and finally introduce the producer consumer pattern and show how it solves some of these issues.

Note: The behavior of Parallel.ForEach (and the .NET components used by it) is sophisticated. It automatically manages the degree of parallelism. For example, even if a machine has eight cores, the system might decide to run more than eight tasks in parallel if it sees that the currently executing tasks, are blocked by I/O. To keep the discussion simple, I am going to assume that in a machine with eight cores, Parallel.ForEach will have exactly eight tasks running in parallel.

Simple parallelism – Some Issues

Too much parallelism for some operations

In the previous example, we assumed that the system will use eight thread-pool threads to process the documents. This means that it is possible that at a given instance, there would be eight threads invoking ReadDocumentFromSourceStore.

This might be problematic if ReadDocumentFromSourceStore is an I/O operation on a device that performs badly on parallel access.

For example, ReadDocumentFromSourceStore might be reading the document from a simple Hard Disk Drive (HDD). When reading from the HDD in parallel, the disk might need to read data from different locations. Moving from one location to another in HDDs is slow because the physical head in the disk needs to move.

On the other hand, if we are reading a single file at a time, it is more likely that the contents of the file (or large segments of the file) are in the same physical location on the disk and therefore the disk takes less time moving the head.

You can read more about hard disk performance here: https://en.wikipedia.org/wiki/Hard_disk_drive_performance_characteristics

To fix this issue, we can put a lock or a semaphore around the invocation of the ReadDocumentFromSourceStore method.

A lock would allow only one thread to invoke this operation at a time, and a semaphore will allow a predefined number of threads (specified in code) to invoke the method at a time.

The following example uses a semaphore to make sure that at any point of time, at most two threads will be invoking ReadDocumentFromSourceStore.

Semaphore semaphore = new Semaphore(2,2);

public void Process(string documentId)
{
    semaphore.WaitOne();

    Document document;

    try
    {
        document = ReadDocumentFromSourceStore(documentId);
    }
    finally
    {
        semaphore.Release();
    }

    var translatedDocument = TranslateDocument(document, Language.English);

    SaveDocumentToDestinationStore(translatedDocument);
}

Note: Limiting access to ReadDocumentFromSourceStore is a concern that can be extracted to its own decorator instead of being tangled with the processing code. In this article however, my focus is not on coding practices.

For more information about the Semaphore class, take a look at this reference: https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphore?view=netframework-4.7

For more information about locks in C#, take a look at this reference: https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/lock-statement

SaveDocumentToDestinationStore will probably also not be able to handle being called by eight threads in parallel. We can use a lock or a semaphore to also limit the number of threads that can access this method.

Ever changing execution times

When we use a lock or a semaphore, some threads will be blocked waiting for their turn to enter the protected section of the code, e.g. to call SaveDocumentToDestinationStore. In many cases, this makes a lot of sense.

Consider the following example:

locking-save-method

In this example, each of the three operations takes 10 millisecond to complete. However, the SaveDocumentToDestinationStore method is protected with a lock to allow only one thread to call it (for reasons discussed in the previous section). Note that in this example, we assume that ReadDocumentFromSourceStore can be called from multiple threads without an issue.

Without the lock, each thread will process a document in 30 milliseconds (assuming all operations can be parallelized without an issue).

Now, with the lock, this is no longer true!

A thread would take 30 milliseconds plus the time it has to wait to obtain the lock (its turn to execute SaveDocumentToDestinationStore).

Instead of having the threads waiting and doing nothing, can we make them grab other document ids, read the documents and translate them?

The following figure illustrates this:

add-to-queue-waiting

In this example, seven threads read and translate documents.

Each one of these threads will take one document id, call ReadDocumentFromSourceStore and TranslateDocument, then put the result of the translation into a queue and go back to process (read and translate) another document, etc. A dedicated thread will take documents from the queue and call SaveDocumentToDestinationStore and then go fetch another document from the queue, etc.

Because there are seven threads working on the left of the queue, we can process about 7 threads * (1000 millisecond / (10 millisecond + 10 millisecond)) = 350 documents per second. The single thread on the right of the queue will be processing 1000 milliseconds / 10 milliseconds = 100 documents per second. This means that the number of items in the queue will increase by 250 documents per second. After all documents have been read and translated, the number of documents in the queue will start to decrease by 100 documents per second, i.e., the rate at which we save documents.

So even if we partially process (read and translate) documents faster than 100 documents per second, these partially processed documents would still need to wait (in the queue) for their turn to be saved and we are still going to have an overall throughput of 100 documents per second.

In summary, we gain nothing by making the threads process other documents, instead of waiting.

The conclusion we reached above is true because we assumed that the time it takes to save a document is constant (e.g. 10 milliseconds). However, in many cases, the time it takes to save a document changes frequently.

Consider the case where the document is saved to a disk on a remote machine. The connection to the remote machine might become better or worse from time to time and therefore the time it takes to save a document might change frequently.

To analyze the effect of this let’s assume that the SaveDocumentToDestinationStore method takes 10 milliseconds to complete half of the time and 2 milliseconds the other half (for example, as a result of low network load).

Let’s consider the case where we process documents for the duration of two seconds. In the 1st second, SaveDocumentToDestinationStore takes 10 milliseconds to complete. In the 2nd second, it takes 2 milliseconds.

Let’s first see what happens if there is no queue, i.e., when threads simply block to wait for their turn to call SaveDocumentToDestinationStore.

threads-ever-changing-times

In the 1st second, we process 100 documents per second (as fast we can save documents) and therefore we process 100 documents.

In the 2nd second, saving the document is no longer the bottleneck. In this second, each document will take 22 milliseconds + average waiting time for the lock. I am going to ignore this waiting time and say that we process 8 threads * 1000 milliseconds / 22 milliseconds ~= 363 documents (if we haven’t ignored the waiting time, this would be less).

So, in the interval of two seconds, we process about 463 documents.

Let’s now consider the case where we have a queue.

first-second-queue

In the 1st second, we complete 100 documents. However, 250 (350 – 100) additional documents would have been read and translated and sitting in the queue.

second-queue

In the 2nd second, the threads on the left can enqueue 350 additional documents. This means that a total of 600 documents are available for the saving thread to process. The thread on the right will process 500 documents in the 2nd second (1000 / 2).

This gives us a total of 600 documents processed in the whole interval of two seconds.

With this approach, we have processed at least 600 – 463 = 137 more documents in the interval of two seconds.

The Producer-Consumer pattern

In the previous section, I showcased two ways to process documents in parallel.

Such processing has three distinct stages: reading a document, translating it, and saving it.

In the first way (the one without any queues), we have multiple threads and each will process a document through all three stages.

In the second way, each stage (or a group of stages) has dedicated threads running it.

In the last example, for instance, we had seven threads reading and translating documents, and a single thread saving documents.

This is basically the producer-consumer pattern.

In the example, we have seven producers that produce translated documents and we have a single consumer that consumes these documents by saving them.

In this section, I will talk about this pattern in more details and show how it can be implemented in .NET.

Producer-Consumer pattern in detail

When we apply the producer-consumer pattern in its simplest form, we will have a single thread generating data and enqueuing them into some queue, and another thread dequeuing the data from the queue and processing them.

simple-producer-consumer-pattern

The queue should be thread-safe. It is possible that one thread is enqueuing an item, while another one is dequeuing another item.

Sometimes, the producer can produce data faster than the consumer can process them. In this case, it makes sense to have an upper bound on the number of items the queue can store. If the queue is full, the producer will have to stop producing more data until the queue becomes non-full again.

Support for the Producer-Consumer pattern in the .NET framework

The easiest way to implement the producer-consumer pattern in .NET is to use the BlockingCollection class.

The following code shows how we can implement the document processing example using this class:

public void ProcessDocumentsUsingProducerConsumerPattern()
{
    string[] documentIds = GetDocumentIdsToProcess();

    BlockingCollection inputQueue = CreateInputQueue(documentIds);

    BlockingCollection queue = new BlockingCollection(500);

    var consumer = Task.Run(() =>
    {
        foreach (var translatedDocument in queue.GetConsumingEnumerable())
        {
            SaveDocumentToDestinationStore(translatedDocument);
        }
    });

    var producers = Enumerable.Range(0, 7)
        .Select(_ => Task.Run(() =>
        {
            foreach (var documentId in inputQueue.GetConsumingEnumerable())
            {
                var document = ReadAndTranslateDocument(documentId);
                queue.Add(document);
            }
        }))
        .ToArray();

    Task.WaitAll(producers);

    queue.CompleteAdding();

    consumer.Wait();
}
private BlockingCollection CreateInputQueue(string[] documentIds)
{
    var inputQueue = new BlockingCollection();

    foreach (var id in documentIds)
        inputQueue.Add(id);

    inputQueue.CompleteAdding();

    return inputQueue;
}

In this example, we first call GetDocumentIdsToProcess to get the list of document ids that need to be processed. Then all the document ids are added into a new BlockingCollection object (in the inputQueue variable) by using the CreateInputQueue method. For now, ignore the fact that we use a BlockingCollection to store the document ids, we’re just using it here as a thread-safe collection with convenient API. I will explain this later.

We then create a BlockingCollection object specifying a maximum queue size of 500 items. This object will be used as the queue between the producers and the consumer. Having a limit on the queue size will cause a producer to block when it tries to add an item if the queue is full.

We then create a task (via Task.Run) to consume documents from the queue.

We use the GetConsumingEnumerable method to obtain an IEnumerable that can be used to loop through the produced documents as soon as they are available. If the queue is empty, the call to IEnumerable.MoveNext would block to cause the loop to pause. In the loop body, we simply invoke SaveDocumentToDestinationStore to save the translated documents.

Next, we create seven tasks that will produce the translated documents.

Each producer will call GetConsumingEnumerable on the document ids BlockingCollection (in the inputQueue variable) to get some document ids to process. For each document id, a producer will read and translate the document and then add it to the queue via the Add method.

Please note that the BlockingCollection class handles multiple threads reading items from it correctly, i.e., no single item will be read by two threads.

In case you are wondering why we do we have a BlockingCollection for the input queue, it is simply convenient as I explained just now. Had we stored the document ids in an array or a list, we would have to manage how the data is given correctly to the producer threads, ourselves.

Next, we wait for all producer tasks to complete via the Task.WaitAll method. Then, we invoke the CompleteAdding method on the BlockingCollection to mark the collection as completed.

When this happens, the consumer loop (the foreach loop) will complete once all items are processed. In more details, when the queue becomes empty, the IEnumerable returned from GetConsumingEnumerable will terminate (IEnumerable.MoveNext will return false). Without calling CompleteAdding, IEnumerable.MoveNext will simply block to wait for new documents to be added to the queue.

Finally, we wait for the consumer task to complete to make sure the ProcessDocumentsUsingProducerConsumerPattern method does not return before all documents are fully processed.

Please note that this is only one scenario where we can use the producer-consumer pattern.

In other scenarios, there may not be a fixed-sized set of documents that we need to process and therefore it does not make sense to wait for the producer or consumer to finish.

For example, we might want to have a never-ending process of reading new documents from an MSMQ queue and processing them.

The pipeline pattern

In our example, imagine that each stage of processing documents has its own degree of parallelism requirements and that its execution time changes frequently.

In this case, it makes sense to have two queues; one for read documents and one for translated documents.

The following figure illustrates this:

task-pipeline

Basically, the pipeline pattern is a variant of the producer-consumer pattern.

In this pattern, some consumers are also producers.

In this particular example, the translation process is both a consumer and a producer. It takes documents from the first queue, translates them, and then adds them to the second queue.

We can easily implement this pattern by using two BlockingCollection objects (and as explained in the previous example, we use another BlockingCollection to hold the input document ids for convenience):

public void ProcessDocumentsUsingPipelinePattern()
{
    string[] documentIds = GetDocumentIdsToProcess();

    BlockingCollection inputQueue = CreateInputQueue(documentIds);

    BlockingCollection queue1 = new BlockingCollection(500);

    BlockingCollection queue2 = new BlockingCollection(500);

    var savingTask = Task.Run(() =>
    {
        foreach (var translatedDocument in queue2.GetConsumingEnumerable())
        {
            SaveDocumentToDestinationStore(translatedDocument);
        }
    });

    var translationTasks =
        Enumerable.Range(0, 7)
            .Select(_ =>
                Task.Run(() =>
                {
                    foreach (var readDocument in queue1.GetConsumingEnumerable())
                    {
                        var translatedDocument =
                            TranslateDocument(readDocument, Language.English);

                        queue2.Add(translatedDocument);
                    }
                }))
            .ToArray();

    var readingTasks =
        Enumerable.Range(0, 4)
            .Select(_ =>
                Task.Run(() =>
                {
                    foreach (var documentId in inputQueue.GetConsumingEnumerable())
                    {
                        var document = ReadDocumentFromSourceStore(documentId);

                        queue1.Add(document);
                    }
                }))
            .ToArray();

    Task.WaitAll(readingTasks);

    queue1.CompleteAdding();

    Task.WaitAll(translationTasks);

    queue2.CompleteAdding();

    savingTask.Wait();
}

A note about I/O operations and server applications

The read and save operations in the example I used in the article, are I/O operations. If we synchronously invoke an I/O operation, e.g. by using File.ReadAllBytes, the calling thread will block while the operation is running.

When a thread blocks, it does not consume CPU cycles. And in desktop applications, having a few threads that block on I/O is not an issue.

In server applications however, e.g. WCF applications, the story is different!

If we have hundreds of concurrent requests of which processing requires accessing some I/O, it will be much better if we don’t block on I/O. The reason is that I/O does not require a thread, and instead of waiting, the thread can go and process another request and therefore enhance throughput.

In the examples I used in the article, all I/O operations are done synchronously and therefore they are not suitable for server applications that require high throughput.

It also makes sense not to block the current thread when we need to wait for a producer-consumer queue to become non-empty (on the consumer side) and non-full (on the producer side).

The TPL Dataflow Library can be used to implement the producer-consumer and pipeline patterns. It has good support for asynchronous operations and can be used in server scenarios.

The Dataflow pattern

The TPL Dataflow Library mentioned above also supports another variant of the producer-consumer pattern called the Dataflow pattern.

The Dataflow pattern is different from the pipeline pattern in that the flow of data does not have to be linear. For example, you can have documents go to different queues based on some condition. Or you can send a document to two queues to have them translated to both English and Spanish.

In an upcoming article, I am going to discuss this pattern in more details. I will also show examples of how to better handle I/O operations in server applications.

Conclusion:

In this article, I have discussed the producer-consumer pattern and one variant of this pattern; the pipeline pattern.

I discussed the reasons why one would use these patterns instead of simple parallelism. One reason to use these patterns is that they give us more control over the degree of parallelism that each stage of processing has. It also enhances throughput when the execution time of some operations changes frequently.

I also provided examples of how to implement these patterns using the BlockingCollection class in .NET.

The article was technically reviewed by Damir Arh.

What Others Are Reading!
Was this article worth reading? Share it with fellow developers too. Thanks!
Share on LinkedIn
Share on Google+

Author
Yacoub Massad is a software developer that works mainly with Microsoft technologies. Currently, he works at Zeva International where he uses C#, .NET, and other technologies to create eDiscovery solutions. He is interested in learning and writing about software design principles that aim at creating maintainable software. You can view his blog posts at criticalsoftwareblog.com


Page copy protected against web site content infringement 	by Copyscape




Feedback - Leave us some adulation, criticism and everything in between!

Categories

JOIN OUR COMMUNITY

POPULAR ARTICLES

FREE .NET MAGAZINES

Free DNC .NET Magazine

Tags

JQUERY COOKBOOK

jQuery CookBook