The Dataflow Pattern in .NET

Posted by: Yacoub Massad , on 1/18/2018, in Category Patterns & Practices
Views: 57473
Abstract: This article discusses the dataflow pattern, a variant of the producer-consumer pattern, along with examples of applying this pattern in .NET. Finally, you will also be introduced to ProceduralDataflow, a new library I created to help write clean dataflows.

In a previous article, The Consumer-Producer pattern in .NET, I talked about the producer-consumer pattern, the pipeline pattern, and briefly touched upon the dataflow pattern.

In this article I am going to talk about the dataflow pattern in more details, explain the Dataflow API in .NET, and introduce ProceduralDataflow, a new library I created that allows us to write dataflows in a procedural way.

Note: This article is a continuation of the previous one. I recommend that the reader reads that article before reading this one.

Are you keeping up with new developer technologies? Advance your IT career with our Free Developer magazines covering C#, Patterns, .NET Core, MVC, Azure, Angular, React, and more. Subscribe to the DotNetCurry (DNC) Magazine for FREE and download all previous, current and upcoming editions.

Producer-consumer pattern - a quick recap

In the simplest implementation of the Producer-Consumer pattern, we have a single thread producing some data and putting it in a queue, and another thread consuming the data from the queue.

simple-producer-consumer

Figure 1: The producer-consumer pattern

The Pipeline pattern is a variant of the producer-consumer pattern.

This pattern allows the consumer to also be a producer of data. This data will be put in a second queue, and another consumer will consume it.

pipeline

Figure 2: the pipeline pattern

In the example above, we have a pipeline that does three stages of processing.

In the first stage, some data is produced, in the second stage this data is consumed and some other data is produced. This other data will be consumed in the third stage.

In the previous article, I gave an example where, in the first stage, we read documents from some store (e.g. a database), in the second stage we translate them, and in the third stage, we store the translated documents into some destination store. In this example, the first queue stores documents read from the store, while the second queue stores translated documents.

The Dataflow pattern

In the pipeline pattern, there is no restriction on the number of stages we can have. However, the flow has to be linear.

The dataflow pattern removes any such restriction!

The flow can branch based on some condition. Or it can branch unconditionally so that an item is sent to two processing nodes, instead of one.

dataflow-branch

Figure 3: A branch in a dataflow

In this figure, in the first stage, a document is read. Then, based on the language of the document, it is either enqueued in the queue for Spanish documents, or the queue for German documents.

A special consumer thread reads Spanish documents from the first queue and translates them. Meanwhile, another special consumer thread reads German documents from the other queue and translates them.

These two consumers are also producers since they will add the translated documents to the translated documents queue. A special consumer thread reads the translated documents and writes them to the store (e.g. a database).

Before continuing past this example, let’s first discuss why we need this branching, e.g. why don’t we simply use the pipeline pattern as in the following figure?

pipeline-insteadof-dataflow

Figure 4: Using a pipeline instead of a dataflow

In this figure, we have a simple pipeline. In the first stage, we read a document and put it in a single queue regardless of the language in which it is written. In the second stage, we determine the document language and decide whether to invoke some code to translate the document from Spanish or some other code to translate from German.

Notice how the degree of parallelism for this stage, is two.

In both the examples in figure 3 and 4, we translate two documents at the same time.

So what benefits does the dataflow approach give us?

In the dataflow approach, at most one Spanish document will be processed at any given time. Also, at most one German document will be processed at a time.

On the other hand, in the pipeline approach, it is possible that two threads will be processing two Spanish documents in parallel (or two German documents).

So, the dataflow approach gives us better control for the degree of parallelism for each operation.

Note: refer to the previous article for reasons you might want to control the degree of parallelism for each operation.

Also, this approach allows us to have more control on the queue sizes. For example, consider the scenario where the Spanish documents in the source store are very large and German documents are small. We might decide to have at most ten untranslated documents in the Spanish documents queue, and at most 100 untranslated documents in the queue for German documents.

In the previous example, the branching was conditional. That is, a document either went to the Spanish documents queue or the German documents queue.

Another type of branching is unconditional branching. In this type of branching, the item will be sent both ways unconditionally.

For example, assume that all documents we read from the source store are written in English and we want to translate each document to both Spanish and German. In this case, each document will be sent to two queues.

Next, let’s look at how we can implement the first example in .NET.

Implementation

In this section, we will look at different implementation options.

The BlockingCollection class

As in the previous article, I am going to first use the BlockingCollection class to implement the Dataflow example.

public void ProcessDocumentsUsingDataflowPattern()
{
    string[] documentIds = GetDocumentIdsToProcess();

    BlockingCollection<string> inputQueue = CreateInputQueue(documentIds);

    BlockingCollection<Document> spanishDocumentsQueue = new BlockingCollection<Document>(10);

    BlockingCollection<Document> germanDocumentsQueue = new BlockingCollection<Document>(100);

    BlockingCollection<Document> translatedDocumentsQueue = new BlockingCollection<Document>(100);


    var readingTask =
        Task.Run(() =>
        {
            foreach (var documentId in inputQueue.GetConsumingEnumerable())
            {
                var document = ReadDocumentFromSourceStore(documentId);

                var language = GetDocumentLanguage(document);

                if (language == Language.Spanish)
                    spanishDocumentsQueue.Add(document);
                else if (language == Language.German)
                    germanDocumentsQueue.Add(document);
            }
        });
            
    var spanishTranslationTask =
        Task.Run(() =>
        {
            foreach (var readDocument in spanishDocumentsQueue.GetConsumingEnumerable())
            {
                var translatedDocument =
                    TranslateSpanishDocument(readDocument, Language.English);

                translatedDocumentsQueue.Add(translatedDocument);
            }
        });

    var germanTranslationTask =
        Task.Run(() =>
        {
            foreach (var readDocument in germanDocumentsQueue.GetConsumingEnumerable())
            {
                var translatedDocument =
                    TranslateGermanDocument(readDocument, Language.English);

                translatedDocumentsQueue.Add(translatedDocument);
            }
        });

    var savingTask = Task.Run(() =>
    {
        foreach (var translatedDocument in translatedDocumentsQueue.GetConsumingEnumerable())
        {
            SaveDocumentToDestinationStore(translatedDocument);
        }
    });

    readingTask.Wait();

    spanishDocumentsQueue.CompleteAdding();

    germanDocumentsQueue.CompleteAdding();

    spanishTranslationTask.Wait();

    germanTranslationTask.Wait();

    translatedDocumentsQueue.CompleteAdding();

    savingTask.Wait();
}

This example is very similar to the examples I provided in the previous article. We simply create three BlockingCollection objects for the three queues (plus one for the input queue), and then create four tasks.

The first one reads documents from the store and puts them in the appropriate queue based on their language. This is where the branching occurs.

The next two tasks take documents from the Spanish and German document queues, translate them, and then put them in the translated documents queue.

The fourth task takes documents from the translated documents queue and saves them to the destination store.

What if the German documents translation method, i.e, TranslateGermanDocument, is an asynchronous IO-bound method? i.e., what if it had the following signature?

public async Task<Document> TranslateGermanDocumentAsync(
    Document document,
    Language language)

In this case, it would be better if we don’t tie up a thread while we wait for this asynchronous method to complete.

A note about asynchronous operations:

Consider for example that the Spanish document translation happens in the same machine as our application. This means that this operation is a CPU-intensive operation and therefore it requires a thread.

On the other hand, consider that German document translation is done via a commercial web service call. This means that this operation is an I/O-bound operation and therefore does not require a thread.

Threads are expensive resources in server applications that are required to handle a larger number of client requests at the same time. Using an asynchronous method means that we can save such threads.

 

When we create the German documents translation task, we can use an overload of Task.Run that accepts an asynchronous action, i.e., Func<Task> like this:

var germanTranslationTask =
    Task.Run(async () =>
    {
        foreach (var readDocument in germanDocumentsQueue.GetConsumingEnumerable())
        {
            var translatedDocument =
                await TranslateGermanDocumentAsync(readDocument, Language.English);

            translatedDocumentsQueue.Add(translatedDocument);
        }
    });

This way, we don’t tie up a thread while we wait for the translation method to complete. However, we still block the current thread in these two places:

  • The implicit call on IEnumerable<T>.MoveNext() on the enumerable returned by germanDocumentsQueue.GetConsumingEnumerable() when the German documents queue is currently empty. That is, when we wait for the empty queue to become non-empty.
  • The call to translatedDocumentsQueue.Add when the queue is full. That is, when we wait for the full queue to become non-full.

Although we are using an asynchronous action as a parameter for the Task.Run method, these two calls will still be done synchronously. In scenarios where we have an operation with a high degree of parallelism, such synchronous waiting will cause us to tie up many threads unnecessarily which will affect the scalability of the system.

We can solve these problems by using a class similar to the BlockingCollection class but that supports asynchronous taking and adding from the queue. That is, a class that allow us to asynchronously block for the queue to be non-empty or non-full.

One such class can be found in the AsyncEx library and is called AsyncProducerConsumerQueue. We can replace the BlockingCollection class with this class in the previous example to spare threads, while waiting for the queues to become non-empty or non-full. I am not including the details of how to do this here as I leave this as an exercise for the reader.

Please note that it also makes sense to block asynchronously waiting for the queues, even with synchronous operations (e.g. CPU-bound document translation).

The TPL Dataflow API

Next, I am going to show you how to implement the same example using the TPL Dataflow API.

Although not shipped with the .NET framework, the TPL Dataflow library is a library from Microsoft created specially to help us build dataflows. The library provides a set of blocks, each having specific features. Here are some example blocks:

  • The TransformBlock block processes data it receives to produce other data. It includes one input and one output buffers. Such buffers are similar to the queues I have been discussing since the previous article.
  • The ActionBlock block processes data it receives but does not produce any data out of processing. It includes an input buffer.

Take a look at the TPL Dataflow documentation for more information about the different block types provided by the TPL Dataflow library.

In this article, I am going to show how to implement the same example I used before using the TPL Dataflow library. I will also talk about different features of the API, as I explain the example.

Consider this code:

public void ProcessDocumentsUsingTplDataflow()
{
    var readBlock =
        new TransformBlock<string, Document>(
            x => ReadDocumentFromSourceStore(x),
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); //1

    var spanishDocumentTranslationBlock =
        new TransformBlock<Document, Document>(
            x => TranslateSpanishDocument(x, Language.English),
            new ExecutionDataflowBlockOptions { BoundedCapacity = 10}); //2

    var germanDocumentTranslationBlock =
        new TransformBlock<Document, Document>(
            x => TranslateGermanDocumentAsync(x, Language.English),
            new ExecutionDataflowBlockOptions { BoundedCapacity = 100 }); //3

    var saveDocumentsBlock =
        new ActionBlock<Document>(
            x => SaveDocumentToDestinationStore(x),
            new ExecutionDataflowBlockOptions { BoundedCapacity = 100 }); //4

    readBlock.LinkTo(
        spanishDocumentTranslationBlock,
        new DataflowLinkOptions {  PropagateCompletion = true},
        x => GetDocumentLanguage(x) == Language.Spanish); //5

    readBlock.LinkTo(
        germanDocumentTranslationBlock,
        new DataflowLinkOptions { PropagateCompletion = true },
        x => GetDocumentLanguage(x) == Language.German); //6

    spanishDocumentTranslationBlock.LinkTo(
        saveDocumentsBlock); //7

    germanDocumentTranslationBlock.LinkTo(
        saveDocumentsBlock); //8

    string[] documentIds = GetDocumentIdsToProcess(); //9

    foreach (var id in documentIds)
        readBlock.Post(id); //10

    Task.WhenAll(
            spanishDocumentTranslationBlock.Completion,
            germanDocumentTranslationBlock.Completion)
        .ContinueWith(_ => saveDocumentsBlock.Complete()); //11

    readBlock.Complete(); //12

    saveDocumentsBlock.Completion.Wait(); //13
}

This method contains 13 statements, duly marked in the comments. I will explain them now.

In the 1st statement, I create the block that will read documents from the store. This block is a TransformBlock<string, Document> which means that this block consumes strings and produces Document objects.

In this case the string is the document id. The first parameter in the constructor is a delegate to the function that will transform the string to a document. We give it a lambda expression that invokes the ReadDocumentFromSourceStore method. The second parameter allows us to configure more options for this block. I have set the MaxDegreeOfParallelism property to 1. This code isn’t really needed because the default value is one, but I have nevertheless set it just to show you how you can control the degree of parallelism for this block.

In the 2nd and 3rd statements, I create two TransformBlock<Document, Document> objects. The first one translates Spanish documents and the second one, German documents. Notice how the constructor of TransformBlock allows me to specify a synchronous transform function in the case of Spanish documents, and an asynchronous one, for German documents.

Note that when I create these two TransformBlock objects, I set the bounded capacity to 10 and 100 for the Spanish and German blocks respectively. As I mentioned before, the TransformBlock block has an input buffer and an output buffer. The bounded capacity setting allows us to control the sizes of such buffers.

In the 4th statement, I create an ActionBlock<Document> block that will save the documents to the destination store. Here we need an ActionBlock because the SaveDocumentToDestinationStore method does not return anything. This block is the last block in the dataflow.

I still haven’t connected any of these blocks. Now, the connecting (or linking as called in the TPL DataFlow library) begins.

In the 5th statement, I use the LinkTo method to link the document reading block with the Spanish documents translation block. This basically means that data coming out of the read block will be sent to the Spanish documents translation block.

There are two things to note here.

The first one is that I pass a DataflowLinkOptions object with PropagateCompletion set to true. I will explain this soon.

The second one is that I gave the LinkTo method a predicate that will be used to filter the data items sent from the source block to the destination block. In this particular case, I only want Spanish documents to be sent to the Spanish documents translation block.

In the 6th statement, I do the same thing for German documents.

In the 7th and 8th statements, I link the Spanish and German translation blocks, respectively, to the save-document block.

Two things to note here.

First, we are linking these two translation blocks to the same target block. Second, here we don’t set PropagateCompletion to true. I will explain the why part soon.

In the 9th statement, I simply get the list of document ids to process.

In the 10th statement, I loop through this list and call the Post method on the document reading block. This will cause the processing to actually start. Each item posted to the read block will flow through the blocks. This of course depends on how we have linked the blocks together and the settings we have specified for the different blocks.

Now let’s talk about the concept of Completion a little bit.

In all of my code examples (in this and the previous article), at the end of processing, I invoked some code to wait for processing to complete. This is needed in many scenarios because we need to make sure that all the data has been processed before doing something else. Or to simply inform the user that all the data has been processed.

In many cases, this meant that I invoked the Wait method on some Task object before leaving the method.

The TPL Dataflow library provides the Completion concept to solve this problem.

In the 13th statement, I invoke the getter of the saveDocumentsBlock.Completion property. This will give us a Task object that completes when the block completes. I then invoke the Wait method on this Task object to block until this task completes. Basically, I want to wait for all data to be completely processed.

How can the saveDocumentsBlock block “complete”? Even if it has processed every item it received, how does it know that there are no more items to come?

The Dataflow blocks support the concept of propagating a completion signal.

In the 12th statement, I call the Complete method on the readBlock object. This tells this block that we are done posting new items to it. Not only that, if it was configured to do so, it will send (or propagate) a completion signal to other blocks it is linked to, when it has processed all the items it received.

What I would like to have is the completion signal propagate through all the blocks until it reaches the final block, i.e., the save-document block. This way, once the final block receives the completion signal and completes processing all items, the Task returned from saveDocumentsBlock.Completion will complete.

One small problem is that we have a branch. The completion signal can reach the save-document block in two ways (take a look at figure 3). The first way is via the Spanish document translation block. And the second one via the German document translation block.

If we allow the completion signal to be propagated automatically, this might cause the save-document block to complete (and stop receiving new items) before all documents are processed.

For example, this can happen if all Spanish documents are processed while there are still some German documents that are not processed yet. The Spanish document translation block would propagate the completion signal to the save-document block and cause the processing to stop.

This is why I did not setup completion signal propagation from the translation blocks to the save-document block in the 7th and 8th statements.

To fix the problem, in the 11th statement I use standard TPL code to tell the system to invoke the Complete method on the save-document block when both the Spanish and German document translation blocks have completed (i.e., when they have received the completion signal from the read-document block AND have processed all the documents they received).

For another example that uses the TPL Dataflow, consider the Building an Image Resizer using .NET Parallel Dataflow Library in .NET 4.5 article here on DotNetCurry.

The problem with flow clarity

In both the implementation that uses the BlockingCollection class and the implementation that uses the TPL Dataflow API, the flow logic is tangled with API code. To explain this further, let me show you how our code would look like if we used simple data parallelism to process our documents:

public void ProcessDocumentsUsingParallelForEach()
{
    string[] documentIds = GetDocumentIdsToProcess();

    Parallel.ForEach(documentIds, documentId =>
    {
        var document = ReadDocumentFromSourceStore(documentId);

        var language = GetDocumentLanguage(document);

        Document translatedDocument;

        if (language == Language.Spanish)
            translatedDocument = TranslateSpanishDocument(document, Language.English);
        else // if (language == Language.German)
            translatedDocument = TranslateGermanDocument(document, Language.English);

        SaveDocumentToDestinationStore(translatedDocument);
    });
}

This code looks better, right?

The flow logic is very clear. The body of the ForEach loop shows us the steps needed to process a single document. We first read the document. Then we determine its language. Then we branch based on the language; We call TranslateSpanishDocument for Spanish documents and TranslateGermanDocument for German documents. Finally, we invoke SaveDocumentToDestinationStore to save the translated document.

Now try to go back to the dataflow implementations in this article and try to find these pieces of logic. It is all over the place.

In the BlockingCollection based example, reading the document and determining its language is in one place. Translating Spanish and German documents is in two other places. And saving documents is yet in another place. Between all of these pieces of code, we have infrastructure code unrelated to logic of the flow.

In the TPL DataFlow API based example, things are even worse. The branching logic for example is split between two different LinkTo calls.

Can we fix this?

Can we get the clarity of simple data-parallelism and yet get all the benefits the producer-consumer based patterns provide?

I think we can, and I created a library called ProceduralDataflow to demonstrate this.

The ProceduralDataflow library

Let’s look first at the code and then I will explain it to you.

public void ProcessDocumentsUsingProceduralDataflow()
{
    var readDocumentsBlock =
        ProcDataflowBlock.StartDefault(
            maximumNumberOfActionsInQueue: 100,
            maximumDegreeOfParallelism: 1);

    var spanishDocumentsTranslationBlock =
        ProcDataflowBlock.StartDefault(
            maximumNumberOfActionsInQueue: 10,
            maximumDegreeOfParallelism: 1);

    var germanDocumentsTranslationBlock = 
        AsyncProcDataflowBlock.StartDefault(
            maximumNumberOfActionsInQueue: 100,
            maximumDegreeOfParallelism: 1);

    var saveDocumentsBlock =
        ProcDataflowBlock.StartDefault(
            maximumNumberOfActionsInQueue: 100,
            maximumDegreeOfParallelism:1);

    DfTask DfReadDocumentFromSourceStore(
        string documentId) =>
        readDocumentsBlock.Run(
            () => ReadDocumentFromSourceStore(documentId));

    DfTask DfTranslateSpanishDocument(
        Document document,
        Language destinationLanguage) =>
        spanishDocumentsTranslationBlock.Run(
            () => TranslateSpanishDocument(document, destinationLanguage));

    DfTask DfTranslateGermanDocument(
        Document document,
        Language destinationLanguage) =>
        germanDocumentsTranslationBlock.Run(
            () => TranslateGermanDocumentAsync(document, destinationLanguage));

    DfTask DfSaveDocumentToDestinationStore(
        Document document) =>
        saveDocumentsBlock.Run(
            () => SaveDocumentToDestinationStore(document));

    async Task ProcessDocument(string documentId)
    {
        var document = await DfReadDocumentFromSourceStore(documentId);

        var language = GetDocumentLanguage(document);

        Document translatedDocument;

        if (language == Language.Spanish)
            translatedDocument =
                await DfTranslateSpanishDocument(document, Language.English);
        else // if (language == Language.German)
            translatedDocument =
                await DfTranslateGermanDocument(document, Language.English);

        await DfSaveDocumentToDestinationStore(translatedDocument);
    }

    string[] documentIds = GetDocumentIdsToProcess();

    var processingTask =
        EnumerableProcessor.ProcessEnumerable(
            documentIds,
            documentId => ProcessDocument(documentId),
            maximumNumberOfNotCompletedTasks: 100);

    processingTask.Wait();
}

I first start by creating four Dataflow blocks.

The ProcDataflowBlock and AsyncProcDataflowBlock classes come from the ProceduralDataflow library. The difference between these two classes is that the ProcDataflowBlock class is used for CPU-bound operations and the AsyncProcDataflowBlock class is used for asynchronous operations, e.g. I/O bound operations.

This is very similar to the way we created the TPL Dataflow blocks.

Each block has an input queue. I specify the queue size and the degree of parallelism for each block. Note however that here I don’t specify the code that each block will execute.

Next, I create four local functions: DfReadDocumentFromSourceStore, DfTranslateSpanishDocument, DfTranslateGermanDocument, and DfSaveDocumentToDestinationStore. These functions use the blocks created above to execute the corresponding methods.

Note: Local functions is a new feature in C#7. For a primer on C# 7, read the “C# 7 – What’s New” tutorial at www.dotnetcurry.com/csharp/1286/csharp-7-new-expected-features.

For example, the DfReadDocumentFromSourceStore method uses the readDocumentsBlock to run the ReadDocumentFromSourceStore method. It does so by invoking the Run method on the block. This returns a DfTask. The DfTask class is a class from the ProceduralDataflow library and is similar to the Task class in .NET. There are differences though, but I am not going to delve into the details of how the ProceduralDataflow library works in this article.

The next part is the most interesting part: The ProcessDocument local function. The body of this function looks very similar to the ForEach loop in the ProcessDocumentsUsingParallelForEach method from before.

It shows very clearly the steps each document will go through. The flow logic is expressed in a very clear way.

In this function, we call the Df* version of the methods. These methods return DfTask or DfTask<TResult>. The code uses the await keyword to asynchronously wait for these methods to complete.

Please note that since these methods return DfTask and not Task, the await keyword behaves differently. Again, I am not going to delve into details because in this article I want to focus on how to use ProceduralDataflow, and not how it works internally.

Here is how you can think about this method (the ProcessDocument local function): It describes the stages of processing a document in a procedural way, i.e., it uses standard procedural code like calling methods, using the if statement, etc. And each one of the four blocks will execute only one part of this method.

This is the power of async/await. It allows parts of a single method to be executed as if they are totally separate.

Don’t let the procedural logic trick you!

Under the hood, it works like a dataflow. Processing of multiple documents by different blocks is done in parallel, the degree of parallelism for each block is respected, and there are queues to allow slow consumers to slow down fast producers.

Note that the ProcessDocument local function takes a document id and returns a Task. This Task represents the whole process for a single document.

Note: I used local functions in this example for convenience. These methods can be normal methods.

The code that follows calls the GetDocumentIdsToProcess method to get the document ids, and then calls a method from ProceduralDataflow called EnumerableProcessor.ProcessEnumerable to process all the documents.

There is nothing special about this method. It enumerates the passed enumerable, and calls the supplied delegate for each item (a call to ProcessDocument in this example). The only purpose of this method is to manage the processing of items so that we don’t enqueue all data at once in the queue if the number of items is huge. In this particular example, I am limiting the number of uncompleted tasks to 100.

This method also returns a Task that completes when all the tasks are completed.

Now, consider the following code:

async Task ProcessDocument(string documentId)
{
    var document = await DfReadDocumentFromSourceStore(documentId);

    var language = GetDocumentLanguage(document);

    Document translatedDocument;

    try
    {
        if (language == Language.Spanish)
            translatedDocument =
                await DfTranslateSpanishDocument(document, Language.English);
        else // if (language == Language.German)
            translatedDocument =
                await DfTranslateGermanDocument(document, Language.English);
    }
    catch (Exception ex)
    {
        await DfStoreDocumentInFaultedDocumentsStore(documentId, document, ex);
        return;
    }

    await DfSaveDocumentToDestinationStore(translatedDocument);
}

This is a modified version of the ProcessDocument local function. It adds a try/catch block around the calls to the translation methods. If an exception occurs in translation, a method named DfStoreDocumentInFaultedDocumentsStore will be called to store such a document in a special database.

This method also has a corresponding block with its own degree of parallelism and queue size. Note that the try block surrounds the two calls to translate documents (Spanish and German translation methods). If any of them fail, the document will be sent to this special database.

Also note that here in the catch block, we have access to the documentId and the document variables. To appreciate the value of such an approach while implementing a Dataflow, try implementing this example using the BlockingCollection class or the TPL Dataflow API.

Other features in the ProceduralDataflow library

The ProceduralDataflow library has more features.

1. It supports deadlock-free loops.

What if the flow of processing has a loop? For example, the flow goes from block one to block two and then back to block one. This might cause a deadlock.

Consider the following figure:

loop-dataflow

Figure 5: A loop in a Dataflow

In this figure, after Block 2 processes an item, the result of processing might go back to the queue of Block 1 or it might go to the final block queue based on some condition.

Now, assume that all queues are full. Without the loop, this cannot be a problem because eventually the final block queue will become non-full as a result of the final block processing some item.

With the loop, however, Block 1 might be waiting for Block 2’s queue to become non-full and Block 2 might be waiting for Block 1’s queue to become non-full.

This causes a deadlock.

ProceduralDataflow fixes this problem by making each block have two queues.

The first queue is for data that has not come to the block through a loop, and the second queue is for data that has come to the queue as a result of a loop.

What is special about the second queue is that adding items to such a queue does not cause the producer (the block that is adding the data to the queue) to block, which prevents a deadlock. In other words, this second queue is not bounded.

Also, when getting the next item to process, a block favors items form the second queue over items from the first queue. The details of how ProceduralDataflow detects whether a data item has come through a loop or not, is outside the scope of this article.

2. It supports unconditional branching and joining:

ProceduralDataflow allows you process a data item with two or more blocks in parallel. Once they are complete, you can join the data from these blocks and process them as a single unit.

For example, you can translate a single English document to both Spanish and German (each via its own block), then asynchronously wait for both operations to complete, and then join the two translated documents as a single data item to create a zip file containing these two documents in a separate block. All this can be done procedurally.

Here is a code example:

async Task ProcessDocument(string documentId)
{
    var englishDocument = await DfReadDocumentFromSourceStore(documentId);

    DfTask<Document> toSpanishTask =
        DfTranslateEnglishDocumentToSpanish(englishDocument);

    DfTask<Document> toGermanTask =
        DfTranslateEnglishDocumentToGerman(englishDocument);

    SpanishAndGermanDocuments documents =
        await DfTask.WhenAll(
            toSpanishTask,
            toGermanTask,
            (spanish, german) =>
                new SpanishAndGermanDocuments(spanish, german));

    await DfCompressAndSaveDocuments(documents);
}

In this example, after we read an English document, we invoke DfTranslateEnglishDocumentToSpanish and DfTranslateEnglishDocumentToGerman without awaiting the returned tasks. This means that we don’t wait for translation to Spanish to complete before we start translating to German.

Note that each of these translations will be done using a different block.

Then, we use the DfTask.WhenAll method to asynchronously wait for both translations to complete. Then, both the translated documents will be given to another block to compress and save the two documents together.

3. It supports sub procedures: If your flow logic is large, you split it into multiple DfTask or DfTask<T> returning methods.

4. No threads used unnecessarily: ProceduralDataflow does not cause any threads to block while waiting for queues to become non-empty or non-full. Also, no threads are used in asynchronous blocks.

How to get the ProceduralDataflow library

The ProceduralDataflow library is open source and you can find it in the following GitHub repository: https://github.com/ymassad/ProceduralDataflow

I have also published the library in Nuget. You can find it by the name of YMassad.ProceduralDataflow. Currently it is in prerelease, so if you want to use it inside of Visual Studio, make sure you check the “include prerelease” checkbox in the Nuget package manager inside of Visual Studio.

I encourage the reader to try this library out. I would appreciate any feedback or suggestions.

Conclusion:

In this article, I have introduced the Consumer-Producer Dataflow pattern. This pattern is a variant of the Producer-Consumer pattern. Unlike the Pipeline pattern which allows only a linear flow of data between blocks, the Dataflow pattern allows the flow to be non-linear. For example, it allows conditional and non-conditional branching of data.

I have shown examples of how to implement this pattern using the BlockingCollection class and the TPL DataFlow API and talked about how such implementations can make the flow logic less readable.

I have also introduced a new library called ProceduralDataflow and showed with an example how it can help us implement the Dataflow pattern while keeping the flow logic clear and readable.

Download the entire source code of this article (Github)

This article was technically reviewed by Damir Arh.

This article has been editorially reviewed by Suprotim Agarwal.

Absolutely Awesome Book on C# and .NET

C# and .NET have been around for a very long time, but their constant growth means there’s always more to learn.

We at DotNetCurry are very excited to announce The Absolutely Awesome Book on C# and .NET. This is a 500 pages concise technical eBook available in PDF, ePub (iPad), and Mobi (Kindle).

Organized around concepts, this Book aims to provide a concise, yet solid foundation in C# and .NET, covering C# 6.0, C# 7.0 and .NET Core, with chapters on the latest .NET Core 3.0, .NET Standard and C# 8.0 (final release) too. Use these concepts to deepen your existing knowledge of C# and .NET, to have a solid grasp of the latest in C# and .NET OR to crack your next .NET Interview.

Click here to Explore the Table of Contents or Download Sample Chapters!

What Others Are Reading!
Was this article worth reading? Share it with fellow developers too. Thanks!
Share on LinkedIn
Share on Google+

Author
Yacoub Massad is a software developer and works mainly on Microsoft technologies. Currently, he works at Zeva International where he uses C#, .NET, and other technologies to create eDiscovery solutions. He is interested in learning and writing about software design principles that aim at creating maintainable software. You can view his blog posts at criticalsoftwareblog.com. He is also the creator of DIVEX(https://divex.dev), a dependency injection tool that allows you to compose objects and functions in C# in a way that makes your code more maintainable. Recently he started a YouTube channel about Roslyn, the .NET compiler.



Page copy protected against web site content infringement 	by Copyscape




Feedback - Leave us some adulation, criticism and everything in between!