LINQ executed in Parallel (PLINQ)

Parallel LINQ (or PLINQ, as it is called) offers all benefits of LINQ. In addition to that, it enables parallel execution of LINQ queries to take advantages of multiple processors of the host machine. This article gives an overview of PLINQ and its features using code examples.

LINQ was introduced with .NET Framework 3.5. It provides a unified model and query constructs to query any IEnumerable or IEnumerable<T> in a type safe manner. It allows us to use similar constructs to query in-memory collections (LINQ to Objects), Xml (LINQ to Xml) or a database (LINQ to Sql/ADO.NET Entity Framework). LINQ provides features of deferred execution (i.e. queries don’t get executed until they are enumerated on).

With .NET Framework 4.0, a parallel implementation of LINQ has been introduced, which is called Parallel LINQ or PLINQ. It provides all benefits of LINQ and on top of them, it enables parallel execution. This means that PLINQ queries attempt to make full use of all available processors of the system. The good part is, the programmer for most of the cases doesn’t have to worry about the internals of doing that (trust me, that can be intriguing).

We all know, using multiple processors/cores helps, but there are a lot of complexities that come with it. First of all, we need answers to a number of questions like:

• How many threads should be created – this itself depends on a number of dynamic factors like the number of available cores/CPUs, load on the server etc.
• How should the source collection be partitioned so that threads can operate independently
• How and when should we merge results back from multiple threads

If we use PLINQ, programmers needn’t worry about all these. But take that statement with a pinch of salt. Not all queries can benefit for parallelism. There are considerations before deciding on using PLINQ for a query.

ParallelEnumerable – the heart of PLINQ:
Just how Enumerable/Queryable is for LINQ, ParallelEnumerable class is for PLINQ. This class provides most of the extension methods which provide most of PLINQ’s functionality. It provides implementations for all standard LINQ query operators. In addition to all these, it implements two very special methods AsParallel and AsSequential which are used to opt-in or opt-out of PLINQ.

The Opt-in model:
One can enter PLINQ by calling AsParallel method on an IEnumerable (or IEnumerable<T>) to indicate the rest of the query should be parallelized (if possible). This method (or its other overloads) returns an instance of ParallelQuery/ParallelQuery<T>. ParallelQuery/ParallelQuery<T> represents a parallel sequence.

Here is an example of a parallel query:

Directory.GetFiles(_existingDir, "*.jpg").AsParallel()
.ForAll(file =>
{
var bitMap = new Bitmap(file);
bitMap.RotateFlip(RotateFlipType.Rotate180FlipNone);
bitMap.Save(Path.Combine(_newDir, Path.GetFileName(file)));
});

In the code sample above, the GetFiles method gives back an array of strings (string[]) containing the file names for files that have a “*.jpg” extension. Calling the extension method AsParallel on string[] returns an instance of ParallelQuery<string> which indicates that the rest of the query should execute in parallel. The ForAll extension method is defined by the ParallelEnumerable class, which takes an instance of Action<T> delegate and executes it in parallel for each element in the source collection. Here it has been used to rotate each image file by a fixed angle and save them back to the disk. File I/O is an expensive operation. Simply dump some images in the directory, try and compare the execution times of this code and a code that does the same thing sequentially (may be in a for loop). You'd see the difference even on a desktop machine (if it has multiple cores).

Similarly, to opt out of parallel execution, invoke AsSequential, which gives back an IEnumerable<T>. These two methods can be used to switch between parallel and sequential modes within the same query. Here is a code sample that combines both:

var query = GetOrders().AsParallel().OrderBy(o => o.OrderId)
.Select(ord => new
{
Id = ord.OrderId,
Date = ord.OrderDate,
Shipped = ord.ShippedDate
}).AsSequential().Take(5);

Here a call to AsSequential is required to preserve the ordering established by the previous clause of the query.

While PLINQ does a lot of things on its own, there are ways to tweak or alter its default behavior. One such case is its execution mode.

Execution Mode:

Just by calling AsParallel doesn’t always ensure that a query will get executed using multiple cores/processors. The PLINQ engine analyzes a query and uses its judgment to decide whether parallelization will help a particular query. If it doesn’t think so, it will execute it sequentially. For example, in the previous sample, instead of file operations, if we had some simple number manipulations (e.g. just calculating their square root), then it may execute that query sequentially. To alter this default behavior and to force parallel execution, one can use the extension method WithExecutionMode<T>. This takes an instance of the ParallelExecutionMode enumeration. This has two values, Default (specifying the default behavior explained above) and ForceParallelism (which forces the PLINQ engine to use parallelism even if that means using high-overhead algorithms).

Example:
Directory.GetFiles(_existingDir, "*.jpg").AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.ForAll(file =>
{
var bitMap = new Bitmap(file);   
bitMap.RotateFlip(RotateFlipType.Rotate180FlipNone);
bitMap.Save(Path.Combine(_newDir, Path.GetFileName(file)));
});

Degree of Parallelism:
When left on its own, the PLINQ engine uses all of the processors on the host computer up to a maximum of 64. One can limit the maximum number of processors used for a PLINQ query by using the WithDegreeOfParallelism<T> extension method. This can be essential when one wants to make sure that other processes on the host computer receive a certain amount of CPU time. The following code limits the number of processors to 2 (even if there are more available).

Directory.GetFiles(_existingDir, "*.jpg").AsParallel()
.WithDegreeOfParallelism(2)
.ForAll(file =>
{
var bitMap = new Bitmap(file);   
bitMap.RotateFlip(RotateFlipType.Rotate180FlipNone);
bitMap.Save(Path.Combine(_newDir, Path.GetFileName(file)));
});

Order Preservation:
In some queries, the query operator must preserve the ordering of the source sequence items. For such cases, the query still gets executed in parallel, but the results are buffered and sorted to preserve order. Since order preservation requires extra work, by default, PLINQ doesn’t preserve order of the source sequence. But it can be instructed to do so using the AsOrdered<T> extension method, which tells that the rest of the query should preserve order.
Here is a code sample that forces ordering:

var customers = GetCustomers().AsParallel().AsOrdered()
.Select(c => string.Format("Id: {0}, Name: {1}", c.CustomerId, c.CustomerName)).Reverse();

If we know that a particular query operator does not require order preservation, one can switch back using the UnOrdered<T> extension method (both the methods reside in the ParallelEnumerable class).

Merge Options:
When a PLINQ query executes in parallel, it partitions the sequence and assigns each such partition to a separate worker thread to work on those parts concurrently. However, if the results are to be consumed on a single thread (e.g. using a foreach loop), then results from each thread must be merged back into one sequence. The kind of merge performed depends on the operators used in a query. If ordering needs to be preserved, then results from all worker threads are buffered. The consuming thread of such a fully buffered query might have to wait for a considerable amount of time before seeing the final result. However, for an operator like ForAll¸ results aren’t buffered by default and worker threads yield their results immediately. Other operators are partially buffered (i.e. they yield their results in batches). Results of a query that is partially or not buffered, can improve perceived performance from the consumer thread’s point of view.
One can influence the merge options by using the WithMergeOptions<T> extension method that takes an instance of ParallelMergeOptions enumeration. There are three merge options as follows:
NotBuffered  - Results from worker threads are available to consumer thread immediately for consumption. This provides a streamed output to the consumer. While this may lead to improved perceived performance to the consumer, the total time might still be longer than other merge options
AutoBuffered  - This is the default. This implies that results from worker threads are accumulated to an output buffer (size of which is chosen by the system) and yielded to the consumer thread periodically
FullyBuffered  - Outputs from all worker threads are buffered before the first result is yielded to the consumer. This can take longer to produce the first element to the consumer but might still work faster than other merge options when total execution time is considered.

Here is an example to set the merge option for a query to NotBuffered:

var numbers = Enumerable.Range(1, 1000).AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered).Where(n => n % 2 == 0).Select(n => DoWork(n));

Here is the implementation of DoWork method that uses Thread.SpinWait method to simulate some work for each item.

private static string DoWork(int i)
{
Thread.SpinWait(2000000);
return string.Format("{0} **************************", i);
}

One can alter the value of merge option in the above query and note a difference in time before the first element is printed to the console for each option.

ForAll Operator:
Just as LINQ, PLINQ also executes a query when one loops on the results (e.g. using foreach) or calls a method like ToList or ToArray (or any other equivalent method that forces execution). One can use a foreach loop to iterate through the results of a PLINQ query, but foreach itself doesn’t execute in parallel. Therefore, it requires outputs from different worker threads to be merged back to the thread on which the foreach loop executes. One can use foreach when one needs to preserve the final ordering of results or results have to be processed sequentially (e.g. printing them to Console output). If order preservation is not required, one can use the ForAll<T> extension method provided by the ParallelEnumerable class to parallelize the processing of results. ForAll<T> skips the final merge step which is required by a foreach loop.

Here is a code example:

var nums = Enumerable.Range(10, 10000);
var query = from num in nums.AsParallel()
             where num % 10 == 0
            select num;

query.ForAll((e) => concurrentBag.Add(Compute(e)));

Here, the ForAll method uses a ConcurrentBag<T> collection to add the results of the query since it is optimized for concurrent additions by multiple threads without removal.

Cancellation:
PLINQ supports the cancellation framework provided by the .NET Framework 4.0. To allow a PLINQ query to be cancelled from another thread, one has to use the WithCancellation<T> extension method on the query and create and pass a cancellation token as an argument to this method. When the token is used to cancel the query, PLINQ notices that, stops processing on all threads and throws an OperationCanceledException.
Here is a sample program that illustrates cancellation of a PLINQ query with a cancellation token:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ParallelProgramming.PLINQ
{
class CancellationPlinqProgram
{
static void Main(string[] args)
{
ExecuteCancellationQuery();
}

private static void ExecuteCancellationQuery()
{
var numbers = Enumerable.Range(1, 10000000).ToArray();
var cts = new CancellationTokenSource();

Task.Factory.StartNew(() => CancelQuery(cts));
int[] result = null;

try
{
result = numbers.AsParallel().WithCancellation(cts.Token) .Where(n => n % 3 == 0).OrderByDescending(n => n).ToArray();
}
catch (OperationCanceledException)
{
Console.WriteLine("Query has been cancelled");
}

if (result != null)
{
Console.WriteLine("\n----Result----");

foreach (var n in result)
Console.WriteLine(n);
}
}

private static void CancelQuery(CancellationTokenSource cts)
{
Console.WriteLine("Press c to cancel");
if(Console.ReadKey().KeyChar == 'c')
cts.Cancel();
}
}
}

In the ExecuteCancellationQuery method, can cancellation token gets created and passed to a separate method CancelQuery which is started in a separate asynchronous task (this allows to initiate cancellation from a different thread). The CancelQuery method just waits for user to enter the character ‘c’ from console input and then invokes cancellation on the passed token. This causes the PLINQ query (which is a simple one that just prints a list of ordered integers to console output) to throw an OperationCanceledException, which is handled by the main thread.

Exception handling with PLINQ:
PLINQ can potentially execute a query on multiple threads, which can throw different exceptions simultaneously. There are chances that the code to handle those exceptions may reside on a different thread than the ones where they are raised. PLINQ encapsulates all such exceptions from worker threads into a single instance of AggregateException class and marshals it to the calling thread. This allows the calling thread to have a single exception block to trap the AggregateException. Inside the block, one can then iterate through all the encapsulated exceptions and handle them separately. There is one caution though. If the query throws only a single OperationCanceledException and no other exception, then the PLINQ framework doesn’t roll it up to an AggregateException. So, it’s a good practice to have a separate block to handle OperationCanceledException in addition to AggregateException in the calling thread in dealing with exceptions from a PLINQ query.
Here is a sample code that shows a catch block for an AggregateException:

try
{
//Some code that runs a PLINQ query
}
catch (AggregateException ae)
{
ae.Handle(e =>
{
if (e is ArgumentException)
{
Console.WriteLine("Argument ex: " + e.Message);
}
else
Console.WriteLine("Other ex: " + e.Message);

return true;
});
}

AggregateException class provides a Handle method that takes in a Func<Exception, bool> which gets executed for each exception that is wrapped by this AggregateException instance. If one handles an exception and does not want it to propagate, one should return true from the delegate. In this example, the handler block is interested in only handling exceptions of type ArgumentException.

By Indranil Chatterjee   Popularity  (3756 Views)