Today’s global market requires that businesses and industries be agile enough to respond to a constant flow of changing data. These workflows that are designed to process such streams of data are frequently large, and sometimes infinite or unknown in size.
Often, the data requires complex processing, making it a challenge to meet high throughput demands and potentially immense computational loads. To cope with these requirements, the key is to use parallelism to exploit system resources and multiple cores.
When designing a reactive application, it’s fundamental to build and treat the system components as units of work in order to take full advantage of asynchronous execution. This design aims to exploit asynchronous message passing for the interactions and communication within the reactive components. In fact, in reactive systems, each component should embrace an asynchronous non-blocking development model for non-blocking I/O.
What is a Reactive application?
Reactive programming is a set of design principles used in asynchronous programming to create cohesive systems that respond to commands and requests in a timely manner.
Reactive programming is a way of thinking about systems architecture and design in a distributed environment where implementation techniques, tooling, and design patterns are components of a larger whole—a system. For more information check the online Reactive Manifesto (www.reactivemanifesto.org).
These units react to messages, which are propagated by other components in the chain of processing.
The reactive programming model emphasizes a push-based model for applications, rather than a pull-based model. This push-based strategy ensures that the individual components are easy to test and link, and, most importantly, easy to understand (for further information check this online documentation http://introtorx.com).
The .NET Task Parallel Library DataFlow (TPL DataFlow) helps to tackle the complexity of developing modern systems with an API that builds on Task-based Asynchronous Pattern (TAP). The TPL DataFlow fully supports asynchronous processing, in combination with a powerful compositionality semantic and a better configuration mechanism, than the TPL.
The TPL DataFlow library eases concurrent processing and implements tailored asynchronous parallel workflow and batch queuing. Furthermore, it facilitates the implementation of sophisticated patterns based on combining multiple components that talk to each other by passing messages.
Download the entire source code of this article (Github).
The power of TPL DataFlow
Let’s say you’re building a sophisticated producer-consumer pattern that must support multiple producers and/or multiple consumers in parallel, or perhaps it has to support workflows that can scale the different steps of the process independently.
One solution is to exploit Microsoft TPL DataFlow.
Since the release of .NET 4.5, Microsoft introduced TPL Dataflow as part of the tool set for writing concurrent applications. The TPL Dataflow library is designed with the higher-level constructs necessary to tackle easy parallel problems, while providing a simple to use and powerful framework for building asynchronous data processing pipelines.
The TPL DataFlow isn’t distributed as part of the .NET 4.5 framework, so to access its API and classes, you need to import the official Microsoft NuGet Package (install-Package Microsoft.Tpl.DataFlow).
The TPL DataFlow offers a rich array of components (also called blocks) for composing data-flow and pipeline infrastructures based on the in-process message passing semantic. This dataflow model promotes actor-based programming (https://en.wikipedia.org/wiki/Actor_model) by providing in-process message passing for coarse-grained dataflow and pipelining tasks.
The TPL DataFlow uses the task scheduler (TaskScheduler) of the TPL to efficiently manage the underlying threads and to support the TAP model (async/await) for optimized resource utilization. The TPL DataFlow library increases the robustness of highly concurrent applications and achieves better performance, compared to the sequential version of the code, for parallelizing CPU and I/O intensive operations which have high throughput and low latency.
Figure 1: Workflow composed by multiple steps. Each operation can be treated as an independent computation.
The concept behind the TPL DataFlow library is to ease the creation of multiple patterns, such as:
- batch-processing pipelines
- parallel stream processing
- data buffering, or joining and
- processing in batch data from one or more sources.
Each of these patterns can be used as a standalone, or may be composed with other patterns, enabling developers to easily express complex dataflows.
Designed to compose: TPL DataFlow blocks
Imagine you’re implementing a complex workflow process composed of many different steps, such as a stock analysis pipeline.
It’s ideal to split the computation in blocks, developing each block independently and then gluing them together. Making these blocks reusable and interchangeable enhances their convenience. This composable design would simplify the implementation of complex and convoluted systems.
Compositionality is the main strength of TPL DataFlow, because a set of independent containers, known as blocks, is designed to be combined. These blocks can be a chain of different tasks that constitute a parallel workflow, and are easily swapped, reordered, reused, or even removed.
The TPL DataFlow emphasizes a component’s architectural approach to ease the restructure of the design. These dataflow components are useful when you have multiple operations that must communicate with one another asynchronously or when you want to process data as it becomes available.
Here’s a high-level view of how the TPL Dataflow blocks operate:
1. Each block receives and buffers data from one or more sources, including other blocks, in the form of messages. When a message is received, the block reacts by applying its behavior to the input, which then can be transformed and/or used to perform side effects.
Figure 2: The TPL DataFlow embraces the concepts of reusable components. In this figure, each step of the workflow acts as reusable components. The TPL DataFlow brings few core primitives that allow you to express computations based on DataFlow graphs.
2. The output from the component (block) is then passed to the next linked block, and to the next one, if any, and so on, until a pipeline has completed to execute all the steps of the workflow.
The TPL DataFlow excels at providing a set of configurable properties by which it’s possible, with small changes, to control the level of parallelism, the buffer size of the mailbox, and enable support for cancellation.
There are three main types of DataFlow blocks:
§ Source—Operates as producer of data. It can also be read from.
§ Target—Acts as a consumer, which receives the data and can be written to.
§ Propagator—Acts as both a Source and a Target block.
The TPL DataFlow provides a set of blocks that inherit from one or more of these main DataFlow block types, each with a different purpose. It’s impossible to cover all the blocks in one article. Thus, in the following sections we focus on the most common and versatile ones to adopt in general pipeline composition applications.
Note: TPL Dataflow’s most commonly used blocks are the standard BufferBlock, ActionBlock, and TransformBlock. Each is based on a delegate, which can be in the form of an anonymous function that defines the work to compute. I recommend that you keep these anonymous methods short, simple to follow, and easy to maintain.
Using the BufferBlock<TInput> as a FIFO buffer
The TPL DataFlow BufferBlock<T> acts as a buffer for data. Such data is stored in a first in, first out (FIFO) order. In general, the BufferBlock is a great tool for enabling and implementing asynchronous Producer/Consumer patterns, where the internal message queue can be written to by multiple sources, or read from multiple targets.
Figure 3: The TPL DataFlow BufferBlock has an internal buffer where the messages are queued, waiting to be pushed to the output, usually another TPL Dataflow block . The Input and Output are the same types, and this block doesn’t apply any transformation on the data.
A multiple-producer/single-consumer pattern
The producer-consumer pattern is one of the most widely used patterns in parallel programming. Developers use it to isolate work to be executed from the processing of that work.
In a typical producer/consumer pattern, at least two separated threads run concurrently: one thread produces and pushes the data to process into a queue, and another thread verifies the presence of the new incoming piece of data and processes it. The queue that holds the tasks is shared among these threads, which requires care for accessing tasks safely.
The TPL DataFlow is a great tool for implementing this pattern, because it has intrinsic support for multiple readers and multiple writers concurrently, and it encourages a pipeline pattern of programming, with producers sending messages to decoupled consumers
Figure 4: Multiple-producers/one-consumer pattern using the TPL DataFlow BufferBlock, which can manage and throttle the pressure of multiple producers.
In the case of a multi-producers/single-consumer pattern, it is important to enforce a restriction between the numbers of items generated and the number of items consumed. This constraint aims to balance the work between the producers when the consumer cannot handle the load. This technique is called throttling.
Throttling protects the program from running out of memory if the producers are faster than the consumer. Fortunately, the TPL DataFlow has built-in support for throttling, which is achieved by setting the maximum size of the buffer through the property BoundedCapacity, part of the DataFlowBlockOptions.
In the following listing, this property ensures that there will never be more than 10 items in the BufferBlock queue. Furthermore, in combination with enforcing the limit of the buffer size, it is important to use the function SendAsync, which waits intelligently and without blocking for the buffer to have available space to place a new item.
Listing 1: Asynchronous multi producer/consumer based on the TPL DataFlow BufferBlock
BufferBlock<int> buffer = new BufferBlock<int>(
// Set the BoundedCapacity to be able to manage
// and throttle the pressure from multiple producers
new DataFlowBlockOptions { BoundedCapacity = 10 });
async Task Produce(IEnumerable<int> values)
{
foreach (var value in values)
// Send the message to buffer block asynchronously.
// The SendAsync method helps to throttle the messages sent
await buffer.SendAsync(value);;
}
async Task MultipleProducers(params IEnumerable<int>[] producers)
{
// Running multiple producers in parallel waiting
// all to terminate before notify the buffer block to complete
await Task.WhenAll(
from values in producers select Produce(values).ToArray())
.ContinueWith(_ => buffer.Complete());
}
async Task Consumer(Action<int> process)
{
// Safeguard the buffer block from receiving a message
// only if there are any items available in the queue
while (await buffer.OutputAvailableAsync())
process(await buffer.ReceiveAsync());
}
async Task Run() {
IEnumerable<int> range = Enumerable.Range(0, 100);
await Task.WhenAll(MultipleProducers(range, range, range),
Consumer(n => Console.WriteLine($"value {n} - ThreadId
{Thread.CurrentThread.ManagedThreadId}")));
}
By default, the TPL DataFlow blocks have the value DataFlowBlockOptions.Unbounded set to -1, which means that the queue is unbounded (unlimited) to the number of messages. However, you can reset this value to a specific capacity that limits the number of messages the block may be queueing.
When the queue reaches maximum capacity, any additional incoming messages will be postponed for later processing making the producer await before further work. Likely, making the producer slowdown (or await) is not a problem because the messages are sent asynchronously.
Transforming data with the TransformBlock<TInput, TOutput>
The TPL Dataflow TransformBlock<TInput,TOutput> acts like a mapping function, which applies a projection function to an input value and provides a correlated output.
The transformation function is passed as an argument in the form of a delegate Func<TInput,TOutput>, which is generally expressed as a lambda expression. This block’s default behavior is to process one message at a time, maintaining strict FIFO ordering.
Figure 5: The TPL DataFlow TransformBlock has an internal buffer for both the input and output values; this type of block has the same buffer capabilities of the BufferBlock. The purpose of this block is to apply a transformation function on the data; the Input and Output are likely different types.
Note that TransformBlock<TInput,TOutput> performs as the BufferBlock<TOutput>, which buffers both the input and output values. The underlying delegate can run synchronously or asynchronously.
The asynchronous version takes a delegate with signature Func<TInput,Task<TOutput>> whose purpose is to run the underlying function asynchronously. The block treats the processing of that element completed when the returned Task completes. This listing shows how to use the TransformBlock type.
Listing 2: Downloading images using the TPL Dataflow TransformBlock
var fetchImageFlag = new TransformBlock<string, (string, byte[])>(
async urlImage => {
using (var webClient = new WebClient()) {
byte[] data = await webClient.DownloadDataTaskAsync(urlImage);
Console.WriteLine($”The image {Path.GetFileName(urlImage)} has a size
of {data.Length} bytes”);
return (urlImage, data);
}
});
List<string> urlFlags = new List<string>{
"Italy#/media/File:Flag_of_Italy.svg",
"Spain#/media/File:Flag_of_Spain.svg",
"United_States#/media/File:Flag_of_the_United_States.svg"
};
foreach (var urlFlag in urlFlags)
fetchImageFlag.Post($"https://en.wikipedia.org/wiki/{urlFlag}");
In Listing 2, the TransformBlock<string,(string, byte[])> fetchImageFlag block fetches the flag image into a tuple of string and byte array format (urlImage, data). In this case, the output isn’t consumed anywhere, so the code isn’t too useful. You need another block to process the outcome in a meaningful way.
Completing the work with ActionBlock<TInput >
The TPL Dataflow ActionBlock executes a given callback for any item sent to it. You can think of this block logically as a buffer for data combined with a task for processing that data. ActionBlock<TInput> is a target block that calls a delegate when it receives data, similar to foreach loop.
Figure 6: The TPL DataFlow ActionBlock has an internal buffer for input messages that are queued if the Task is busy processing another message. This type of block has the same buffer capabilities of the BufferBlock. The purpose of this block is to apply an action that completes the workflow without output and that likely produces side effects. In general, because the ActionBlock doesn’t have an output, it cannot compose to a following block, so it’s used to terminate the workflow.
ActionBlock<TInput> is usually the last step in a TPL Dataflow pipeline; in fact, it doesn’t produce any output. This design prevents the ActionBlock from being combined with further blocks, making it the perfect candidate to terminate the workflow process. For this reason, the ActionBlock is likely to produce side effects as a final step to complete the pipeline processing.
Listing 3 shows the TransformBlock from Listing 2 pushing its outputs to the ActionBlock to persist the flag images in the local file system.
Listing 3: Persisting data using the TPL Dataflow ActionBlock
var saveData = new ActionBlock<(string, byte[])>(async data => {
(string urlImage, byte[] image) = data;
string filePath = urlImage.Substring(urlImage.IndexOf("File:") + 5);
await File.WriteAllBytesAsync(filePath, image);
});
// Links the output from the TransformBlock to the ActionBlock
fetchImageFlag.LinkTo(saveData);
NOTE: the code in this article runs on .NET Core 2.0 (or higher). The method File.WriteAllBytesAsync, in the previus code listing, is part of the .NET Core.
The argument passed into the constructor during the instantiation of the ActionBlock block can be either a delegate Action<TInput> or Func<TInput,Task>. The latter performs the internal action (behavior) asynchronously for each message input (received). Note that the ActionBlock has an internal buffer for the incoming data to be processed, which works exactly like the BufferBlock.
It’s important to remember that the ActionBlock saveData is linked to the previous TransformBlock fetchImageFlag using the LinkTo extension method. In this way, the output produced by the TransformBlock is pushed to the ActionBlock as soon as available.
Linking DataFlow blocks
The TPL Dataflow blocks can be linked with the help of the LinkTo extension method.
Linking DataFlow blocks is a powerful technique for automatically transmitting the result of each computation to the connected blocks in a message-passing manner. The key component for building sophisticated pipelines in a declarative manner is to use connecting blocks.
Buffering and Parallelizing message processing
By default, the TPL Dataflow block processes only one message at a time, while buffering the other incoming messages until the previous one completes. Each block is independent of others, so one block can process one item while another block processes a different item.
However, when constructing the block, you can change this behavior by setting the MaxDegreeOfParallelism property in the DataFlowBlockOptions to a value higher than 1 (this property is used later in the article in a program example). You can use the TPL Dataflow to speed up the computations by specifying the number of messages that can be processed in parallel. The internals of the class handle the rest, including the ordering of the data sequence.
Reactive Extensions (Rx) and TPL DataFlow meshes
The TPL Dataflow and .NET Reactive Extensions (Rx) have important similarities, despite having independent characteristics and strengths. In fact, these libraries complement each other, making them easy to integrate.
The TPL Dataflow is closer to an agent-based programming model, focused on providing building blocks for message passing, which simplifies the implementation of parallel CPU- and I/O-intensive applications with high throughput and low latency, while also providing developers explicit control over how data is buffered.
An agent is not an actor
On the surface, there are some similarities between agents and actors, which sometimes cause people to use these terms interchangeably. But, the main difference is that agents are in a process, while actors may be running on another process. In fact, the reference to an agent is a pointer to a specific instance, whereas an actor reference is determined through location transparency. Location transparency is the use of names to identify network resources, rather than their actual location, which means that the actor may be running in the same process, or on another process, or possibly on a remote machine.
Agent-based concurrency is inspired by the actor model, but, its construction is much simpler than the actor model. Actor systems have built-in sophisticated tools for distribution support, which include supervision to manage exceptions and potentially self-heal the system, routing to customize the work distribution and more.
Rx is keener to the functional paradigm, providing a vast set of operators that predominantly focus on coordination and composition of event streams with a LINQ-based API.
The TPL Dataflow has built-in support for integrating with Rx, which allows it to expose the source DataFlow blocks as both observables and observers (https://docs.microsoft.com/en-us/dotnet/standard/events/observer-design-pattern).
The AsObservable extension method transforms any TPL Dataflow Propagator block into an observable sequence, which allows the output of the DataFlow chain to flow efficiently into an arbitrary set of Reactive fluent extension methods for further processing. Specifically, the AsObservable extension method constructs an IObservable<T> for an ISourceBlock<T>.
NOTE TPL Dataflow can also act as an observer. The AsObserver extension method creates an IObserver<T> for an ITargetBlock<T>, where the OnNext calls for the observer result in the data being sent to the target. The OnError calls result in the exception faulting the target, and the OnCompleted calls will result in Complete called on the target.
Let’s see in the following example how to integrate Rx and TPL Dataflow.
A parallel workflow to compress, encrypt a large stream
In this section, you’ll build a complete asynchronous and parallelized workflow to demonstrate the power of the TPL Dataflow library.
Asynchrony vs. parallelism
Parallelism is primarily about application performance, and it facilitates CPU-intensive work on multiple threads at one time, taking advantage of modern multicore computer architectures. Asynchrony is a subset of concurrency, which focuses on I/O-bound rather than CPU-bound operations. Asynchronous programming addresses the issue of latency (Latency is anything that takes long time to run).
This example uses a combination of TPL Dataflow blocks and Reactive Extensions work as a parallel pipeline. The purpose of this example is to analyze and architect a real case application. It then evaluates the challenges encountered during the development of the program, and examines how TPL Dataflow can be introduced in the design of the application.
The goal of this application is to leverage the TPL Dataflow library to efficiently spread the work out across multiple CPU cores to maximize the speed of computation and overall scalability, especially when the blocks that compose a workflow, process the messages at different rates and in parallel. This is particularly useful when you need to process a large stream of bytes that could generate hundreds, or even thousands, of chunks of data.
Case Study: The problem of processing a large stream of data
Let’s say that you need to compress a large file to make it easier to persist or transmit over the network, or that a file’s content must be encrypted to protect that information.
Often, both compression and encryption must be applied.
These operations can take a long time to complete if the full file is processed all at once. Furthermore, it’s challenging to move a file, or stream data, across the network, and the complexity increases with the size of the file, due to external factors, such as latency or unpredictable bandwidth.
In addition, if the file is transferred in one transaction, and something goes wrong, then the operation tries to resend the entire file, which can be time and resource consuming.
In the following sections, you’ll tackle this problem step-by-step.
In .NET, it isn’t easy to compress a file larger than 4 GB, due to the framework limitation on the size of data to compress. Due to the maximum addressable size for a 32-bit pointer, if you create an array over 4 GB, an OutOfMemory exception is thrown.
Starting with .NET 4.5 and for 64-bit platforms, the option gcAllowVeryLargeObjects is available to enable arrays greater than 2 GB. This option allows 64-bit applications to have a multi-dimensional array with size UInt32.MaxValue (4,294,967,295) elements. Technically, you can apply the standard GZip compression that’s used to compress streams of bytes to data larger than 4 GB; but the GZip distribution doesn’t support this by default. The related .NET GZipStream class inheritably has a 4 GB limitation.
How can you compress and encrypt a large file without being constrained by the 4 GB limit imposed by the framework classes?
A practical solution involves using a chunking routine to chop the stream of data.
Chopping the stream of data makes it easier to compress and/or encrypt each block individually and ultimately write the block content to an output stream. The chunking technique splits the data, generally into chunks of the same size, applies the appropriate transformation to each chunk (compression before encryption), and glues the chunks together in the correct order.
It’s vital and good practice to guarantee the correct order of the chunks upon reassembly at the end of workflow. Due to the intensive I/O asynchronous operations (compress and encrypt), the packages might not arrive in the correct sequence, especially if the data is transferred across the network. You must verify order during reassembly.
Figure 7: The Transform blocks process the messages in parallel. The result is sent to the next block when the operation completes. The aggregate agent’s purpose is to maintain the integrity of the order of the messages, similar to the AsOrdered PLINQ extension method.
The opportunity for parallelism fits naturally in this design, because the chunks of the data can be processed independently.
Encryption and compression: order matters
It might seem that because the compression and encryption operations are independent of one another, it makes no difference in which order they’re applied to a file. This isn’t true. The order in which the operations of compression and encryption are applied, is vital. Encryption has the effect of turning input data into high-entropy data (Information entropy is defined as the average amount of information produced by a stochastic source of data. See https://en.wikipedia.org/wiki/Entropy_(information_theory), which is a measure of the unpredictability of information content. Therefore, the encrypted data appears like a random array of bytes, which makes finding common patterns less probable. Conversely, compression algorithms work best when there are several similar patterns in the data, which can be expressed with fewer bytes.
When data must be both compressed and encrypted, you achieve the best results by first compressing and then encrypting the data. In this way, the compression algorithm can find similar patterns to shrink, and consequently the encryption algorithm produces the chunks of data having almost the same size of the compressed ones. Furthermore, if the order of the operations is compression and then encryption, not only should the output be a smaller file, but the encryption will most likely take less time because it’ll operate on less data.
Listing 4 shows the full implementation of the parallel compression–encryption workflow. Note that in the source code, you can find the reverse workflow to decrypt and decompress the data, as well as use asynchronous helper functions for compressing and encrypting bytes array.
Note: the full source code of this article is available for downloading here.
The function CompressAndEncrypt takes as an argument the source and destination streams to process, the chunkSize argument defines the size in which the data is split (the default is 1 MB if no value is provided), and CancellationTokenSource stops the DataFlow execution at any point. If no CancellationTokenSource is provided, a new token is defined and propagated through the DataFlow operations.
The core of the function consists of three TPL Dataflow building blocks, in combination with a Reactive Extensions operator (Scan) that completes the workflow. The inputBuffer is a BufferBlock type that, as the name implies, buffers the incoming chunks of bytes read from the source stream, and holds these items to pass them to the next block in the flow, which is the linked TransformBlock compressor.
Listing 4: Parallel stream compression and encryption using TPL Dataflow
async Task CompressAndEncrypt(
Stream streamSource, Stream streamDestination,
long chunkSize = 1048576, CancellationTokenSource cts = null)
{
cts = cts ?? new CancellationTokenSource();
// Sets the BoundedCapacity value to throttle the messages
var compressorOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 20,
CancellationToken = cts.Token
};
var inputBuffer = new BufferBlock<CompressingDetails>(
new DataflowBlockOptions
{
CancellationToken = cts.Token,
BoundedCapacity = 20
});
var compressor = new TransformBlock<CompressingDetails,
CompressedDetails>(async details => {
// Compresses asynchronously the data
// (the method is provided in the source code)
var compressedData = await IOUtils.Compress(details.Bytes);
// Converts the current data structure into the CompressionDetails
// message type that is sent to the next block
return details.ToCompressedDetails(compressedData);
}, compressorOptions);
var encryptor = new TransformBlock<CompressedDetails, EncryptDetails>(
async details => {
// Combines the data and metadata into a byte array pattern
// that will be deconstructed and parsed during the reverse
// operation decrypt–decompress
byte[] data = IOUtils.CombineByteArrays(details.CompressedDataSize,
details.ChunkSize, details.Bytes);
// Encrypts asynchronously the data
// (the method is provided in the source code)
var encryptedData = await IOUtils.Encrypt(data);
return details.ToEncryptDetails(encryptedData);
}, compressorOptions);
// Enables Rx integration with TPL Dataflow
encryptor.AsObservable()
.Scan((new Dictionary<int, EncryptDetails>(), 0),
(state, msg) =>
// Runs Rx Scan operation asynchronously
Observable.FromAsync(async () => {
(Dictionary<int, EncryptDetails> details, int lastIndexProc) = state;
details.Add(msg.Sequence, msg);
// Guarantee to process any chunk of data available that are
// sequentially in correct order and succession
while (details.ContainsKey(lastIndexProc + 1))
{
msg = details[lastIndexProc + 1];
// Persists asynchronously the data; the stream could be replaced
// with a network stream to send the data across the wire.
await streamDestination.WriteAsync(msg.EncryptedDataSize, 0,
msg.EncryptedDataSize.Length);
await streamDestination.WriteAsync(msg.Bytes, 0, msg.Bytes.Length);
lastIndexProc = msg.Sequence;
// the chunk of data that is processed is removed
// from the local state, keeping track of the items to perform
details.Remove(lastIndexProc);
}
return (details, lastIndexProc);
})
.SingleAsync())
// Rx subscribes to TaskPoolScheduler
.SubscribeOn(TaskPoolScheduler.Default).Subscribe();
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
// Links the DataFlow blocks to compose workflow
inputBuffer.LinkTo(compressor, linkOptions);
compressor.LinkTo(encryptor, linkOptions);
long sourceLength = streamSource.Length;
byte[] size = BitConverter.GetBytes(sourceLength);
// The total size of the file stream is persisted as the first chunk of data;
// in this way, the decompression algorithm knows how to retrieve
// the information and how long to run.
await streamDestination.WriteAsync(size, 0, size.Length);
// Determines the chunk size to partition data
chunkSize = Math.Min(chunkSize, sourceLength);
int indexSequence = 0;
// Reads the source stream into chunks until the end of the stream
while (sourceLength > 0)
{
byte[] data = new byte[chunkSize];
int readCount = await streamSource.ReadAsync(data, 0, data.Length);
byte[] bytes = new byte[readCount];
Buffer.BlockCopy(data, 0, bytes, 0, readCount);
var compressingDetails = new CompressingDetails
{
Bytes = bytes,
ChunkSize = BitConverter.GetBytes(readCount),
Sequence = ++indexSequence
};
await inputBuffer.SendAsync(compressingDetails);
sourceLength -= readCount;
if (sourceLength < chunkSize)
chunkSize = sourceLength;
// Checks the current source stream position
// after each read operation to decide when to complete the operation
if (sourceLength == 0)
inputBuffer.Complete();
}
await encryptor.Completion;
await streamDestination.FlushAsync();
}
The bytes read from the stream are sent to the buffer block by using the SendAsync method:
var compressingDetails = new CompressingDetails {
Bytes = bytes,
ChunkSize = BitConverter.GetBytes(chunkSize),
Sequence = ++indexSequence
};
await buffer.SendAsync(compressingDetails);
Each chunk of bytes read from the streamSource is wrapped into the CompressingDetails object , which contains additional information of byte array size.
The monotonic value is later used in the sequence of chunks generated to preserve the order. A monotonic value is a function between ordered sets that preserves or reverses the given value, and the value always either decreases or increases. The order of the block is important both for a correct compression–encryption operation and for correct decryption and decompression into the original shape.
In general, if the purpose of a TPL Dataflow block is purely to forward items from one block to several others, then you don’t need the BufferBlock. But in the case of reading a large or continuous stream of data, this block is useful for taming the backpressure generated from the massive amount of data given to the process by setting an appropriate BoundedCapacity.
In this example, the BoundedCapacity is restricted to a capacity of 20 items. When there are 20 items in this block, it will stop accepting new items until one of the existing items passes to the next block. Because the DataFlow source of data originated from asynchronous I/O operations, there’s a risk of potentially large amounts of data to process. It’s recommended that you limit the internal buffering to throttle the data by setting the BoundedCapacity property in the options defined when constructing the BufferBlock.
The next two operation types are compression transformation and encryption transformation.
During the first phase (compression), the TransformBlock applies the compression to the chunk of bytes and enriches the message received CompressingDetails with the relative data information, which includes the compressed byte array and its size. This information persists as part of the output stream which is needed later during the decompression.
The second phase (encryption) enciphers the chunk of compressed byte array and creates a sequence of bytes resulting from the composition of three arrays: CompressedDataSize, ChunkSize, and data array. This structure instructs the decompression and decryption algorithms to target the right portion of bytes to revert from the stream.
NOTE: Keep in mind that when there are multiple TPL Dataflow blocks, certain Tasks may be idle while the others are executing, so you have to tune the block’s execution option to avoid potential starvation. Details regarding this optimization are explained in the coming section.
Ensuring the order integrity of a stream of messages
The TPL Dataflow documentation guarantees that the TransformBlock will propagate the messages in the same order in which they arrived.
Internally, the TransformBlock uses a reordering buffer to fix any out-of-order issues that might arise from processing multiple messages concurrently. Unfortunately, due to the high number of asynchronous and I/O intensive operations running in parallel, keeping the integrity of the message order doesn’t apply to your case. This is why you implemented the additional sequential ordering preservation using monotonically values.
If you decide to send or stream the data over the network, then the guarantee of delivering the packages in the correct sequence is lost, due to variables such as the unpredictable bandwidth and unreliable network connection. In addition, because these packages of data are processed (compressed and encrypted) in parallel and sent asynchronously through the network independently, the order by which these chucks of data are sent won’t necessarily match the order of arrived to destination.
To safeguard the order integrity when processing chunks of data, the final step in the workflow is leveraging Reactive Extensions with the Observable.Scan operator. This operator behaves as a multiplexer by reassembling the items and persists them in the local file system, maintaining the correct sequence. The order of the sequence is kept in a property of the EncryptDetails data structure, which is passed into the Scan operation. The particularity of this component is the presence of an internal state that keeps track of the messages received and their order.
NOTE: To complete successfully the workflow in the previous example, we need keep the order of which the chunks are processed to be able to re-compose them correctly. For this reason, we are using Reactive Extensions, with the function Scan, to maintain the state of the exact order of the messages. This would not be possible using only the TPL Dataflow blocks, because are stateless per nature.
The multiplexer pattern
The multiplexer is a pattern generally used in combination with a Producer/Consumer design. It allows the consumer, which in the previous example is the last stage of the pipeline, to receive the chunks of data in the correct sequential order. The chunks of data don’t need to be sorted or reordered. Instead, the fact that each producer (TPL DataFlow block) queue is locally ordered allows the multiplexer to look for the next value (message) in the sequence. The multiplexer waits for a message from a producer DataFlow block. When a chunk of data arrives, the multiplexer looks to see if the chunk’s sequence number is the next in the expected sequence. If it is, the multiplexer persists the data to the local file system. If the chunk of data isn’t the one expected next in the sequence, the multiplexer holds the value in an internal buffer (in the previous example is used a collection of Dictionary<int,EncryptDetails>) and repeats the analysis operation for the next message received. This algorithm allows the multiplexer to put together the inputs from the incoming producer message in a way that ensures sequential order without sorting the values.
The accuracy for the whole computation requires preservation of the order of the source sequence to ensure that the order is consistent at merge time.
The TPL Dataflow blocks and Rx observable streams can be completed successfully or with errors, and the AsObservable method will translate the block completion into the completion of the observable stream. But if the block faults with an exception, that exception will be wrapped in an AggregateException when it is passed to the observable stream. This is similar to how linked blocks propagate their faults.
NOTE In the case of sending the chunks of data over the network, the same strategy of writing data to the local file system is applicable by having a the Scan Rx function working as part of the receiver on the other side of the wire.
The state of the Observable.Scan operator is preserved using a tuple. The first item of the tuple is a collection Dictionary<int, EncryptDetails>, where the key represents the sequence value of the original order by which the data was sent. The second item, lastIndexProc, is the index of the last item processed, which prevents reprocessing the same chunks of data more than once.
The body of Observable.Scan runs the while loop that uses this value lastIndexProc and makes sure that the processing of the chunks of data starts from the last item unprocessed. The loop continues to iterate until the order of the items is continuous, otherwise it breaks out from the loop and waits for the next message, which might complete the missing gap in the sequence.
Linking, propagating, and completing
The TPL Dataflow blocks in the compress-encrypt workflow are linked using the LinkTo extension method, which by default propagates only data (messages). But if the workflow is linear, as in this example, it’s good practice to share information among the blocks through an automatic notification, such as when the work is terminated or eventual errors occur. This behavior is achieved by constructing the LinkTo method with a DataFlowLinkOptions optional argument and the PropagateCompletion property set to true. Here’s the code from the previous example with this option built-in (in bold).
var linkOptions = new DataFlowLinkOptions { PropagateCompletion = true };
inputBuffer.LinkTo(compressor, linkOptions);
compressor.LinkTo(encryptor, linkOptions);
The PropagateCompletion optional property informs the DataFlow block to automatically propagate its results and exceptions to the next stage when it completes. This is accomplished by calling the Complete method to trigger the complete notification upon reaching the end of the stream:
if (sourceLength < chunkSize)
chunkSize = sourceLength;
if (sourceLength == 0)
buffer.Complete();
..and then all the DataFlow blocks are announced in cascade as a chain that the process is completed:
await encryptor..Completion;
Ultimately, you can run the code as follows:
using (var streamSource = new FileStream(sourceFile, FileMode.OpenOrCreate,
FileAccess.Read, FileShare.None, useAsync: true))
using (var streamDestination = new FileStream(destinationFile,
FileMode.Create, FileAccess.Write, FileShare.None, useAsync: true))
await CompressAndEncrypt(streamSource, streamDestination);
Note that the code runs on .NET Core 2.0 (or higher).
The following table shows the benchmarks for compressing and encrypting different file sizes, including the inverted operation of decrypting and decompressing. The benchmark result is the average of each operation run three times.
In conclusion
§ A system written using TPL DataFlow benefits from a multicore system because all the blocks that compose a workflow can run in parallel.
§ The TPL Dataflow enables effective techniques for running embarrassingly parallel problems, where many independent computations can be executed in parallel in an evident way.
§ The TPL Dataflow has built-in support for throttling and asynchrony, improving both the I/O-bound and CPU-bound operations. In particular, it provides the ability to build responsive client applications while still getting the benefits of massively parallel processing.
§ The TPL Dataflow can be used to parallelize the workflow to compress and encrypt a large stream of data by processing blocks at different rates.
§ The combination and integration with Rx and TPL Dataflow simplifies the implementation of parallel CPU and I/O-intensive applications, while also providing developers explicit control over how data is buffered.
Resources
For more information about the DataFlow library, see the online MSDN documentation.
Download the entire source code of this article (Github)
This article was partially reviewed by Yacoub Massad.
This article has been editorially reviewed by Suprotim Agarwal.
C# and .NET have been around for a very long time, but their constant growth means there’s always more to learn.
We at DotNetCurry are very excited to announce The Absolutely Awesome Book on C# and .NET. This is a 500 pages concise technical eBook available in PDF, ePub (iPad), and Mobi (Kindle).
Organized around concepts, this Book aims to provide a concise, yet solid foundation in C# and .NET, covering C# 6.0, C# 7.0 and .NET Core, with chapters on the latest .NET Core 3.0, .NET Standard and C# 8.0 (final release) too. Use these concepts to deepen your existing knowledge of C# and .NET, to have a solid grasp of the latest in C# and .NET OR to crack your next .NET Interview.
Click here to Explore the Table of Contents or Download Sample Chapters!
Was this article worth reading? Share it with fellow developers too. Thanks!
Riccardo is an information systems and technology professional and architect specializing in software & systems development. He has over 20 years’ experience delivering cost-effective technology solutions in the competitive business environment. Riccardo is passionate about integrating advanced technology tools to increase internal efficiency, enhance work productivity, and reduce operating costs. He is a Microsoft Most Valuable Professional (MVP) who is active in the .NET, functional programming, and F# communities. Riccardo believes in multi-paradigm programming to maximize the power of code and is the author of
Functional Concurrency in .NET; which, features how to develop highly-scalable systems in F# & C#.