Producer/Consumer Queue and BlockingCollection in C# 4.0

Shows how to use the new BlockingCollection class in C# 4.0

A producer/consumer queue is a common requirement in threading. A queue is set up to describe work items or data on which work is performed. When a task needs to be executed, it is enqueued, allowing the caller to perform other actions. One or more worker threads work in the background, dequeueing and executing queued items.

One advantage of this model is that you have precise control over how many worker threads execute at once. This allows you to limit consumption of CPU time and other resources. If the tasks perform intensive disk I/O you might have just one worker thread to avoid starving the operating system. Another type of application may have 100 worker threads. You can also dynamically add and remove workers throughout the queue’s life. The CLR ThreadPool itself is a kind of producer/consumer queue.

A producer/consumer queue typically holds items of data on which the same task is performed. By making the item a delegate, you can write a very general-purpose producer/consumer queue where each item can do anything.

.Net Framework 4.0 provides a set of new collections in the System.Collections.Concurrent namespace. These are fully thread-safe, and are designed to be the Parallel (PFX) counterparts of their Generic non-parallel equivalents:

ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>
BlockingCollection<T>
ConcurrentDictionary<TKey,TValue>

ConcurrentBag<T>, which has no non-parallel counterpart, is unique in that it stores an unordered collection of objects with duplicates permitted. ConcurrentBag<T> is suitable in situations when you don’t care which element you get when calling Take or TryTake. The benefit of ConcurrentBag<T> over a concurrent Queue or Stack is that the Bag’s Add method has almost no contention when called by many threads at once.


The IProducerConsumerCollection<T>


A producer/consumer collection is one for which the two primary use cases are:
- Adding an element (“producing”)
- Retrieving an element while removing it (“consuming”)

The typical examples would be Stacks and Queues. Producer/consumer collections are significant in parallel programming because they are conducive to efficient lock-free implementations.

The IProducerConsumerCollection<T> interface represents a thread-safe producer/consumer collection. The following classes implement this interface:

ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>


The BlockingCollection<T>


If you call TryTake on any of the producer/consumer collections mentioned and the collection is empty, the method returns false. Sometimes it would be more useful to wait until an element is available.

The designers of PFX encapsulated this functionality into a wrapper class called BlockingCollection<T>. A blocking collection wraps any collection that implements IProducerConsumerCollection<T> and lets you Take an element from the wrapped collection — blocking if no element is available.

A blocking collection also lets you limit the total size of the collection, blocking the producer if that size is exceeded.

To use BlockingCollection<T>:
1. You instantiate the class, optionally specifying the IProducerConsumerCollection<T> to wrap and the maximum size (bound) of the collection.
2.You Call Add (blocking) or TryAdd (non-blocking) to add elements to the underlying collection.
3. Call Take or TryTake to remove (consume) elements from the underlying collection.

If you call the constructor without passing in a collection, the class will automatically instantiate a ConcurrentQueue<T>. The producing and consuming methods let you specify cancellation tokens and timeouts. Add and TryAdd may block if the collection size is bounded; Take and TryTake block while the collection is empty.

Another way to consume elements is to call GetConsumingEnumerable, which I use in the following example. This returns a sequence that yields elements as they become available. You can force the sequence to end by calling CompleteAdding. This method also prevents further elements from being enqueued.

The example code here borrows on the sample code provided in "C# 4.0 in a Nutshell (book review)" by the Albahari brothers from O'Reilly, and uses the example model that I used in this previous article. I've modified the code to permit the passing of a state object which allows the passing of an MSDN forums "short name" allowing for the download of the MSDN forum XML document for each forum based on an HTTP template:

static void Main(string[] args)
{
string[] forums = File.ReadAllLines(Environment.CurrentDirectory + @"\forums.txt");
Task[] tasks = new Task[forums.Length];
ProducerConsumerQueue q = new ProducerConsumerQueue(tasks.Length, forums );
DateTime start = DateTime.Now;
for (int i = 0; i < tasks.Length ; i++ )
{
_state = forums[i];
Task task = q.EnqueueTask(null,_state);
tasks[i] = task;
}
Task.WaitAll(tasks);
DateTime end = DateTime.Now;
TimeSpan elapsed = end - start;
Console.WriteLine("Done in " +elapsed.TotalMilliseconds.ToString( ) +" ms.");

Console.ReadKey();
}

public class WorkItem
{
public readonly TaskCompletionSource<object> TaskSource;
public readonly CancellationToken? CancelToken;
public readonly object State;
public WorkItem(TaskCompletionSource<object> taskSource,
CancellationToken? cancelToken,
object state)
{
TaskSource = taskSource;
//Action = action;
CancelToken = cancelToken;
State = state;
}
}

public class ProducerConsumerQueue : IDisposable
{
private string ForumTemplate = "http://social.msdn.microsoft.com/Forums/en-US/{0}/threads?outputas=xml";
System.Collections.Concurrent.BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();
public ProducerConsumerQueue(int workerCount, string[] states)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume,states[i],TaskCreationOptions.LongRunning);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public Task EnqueueTask( object state)
{
return EnqueueTask( null,state);
}
public Task EnqueueTask( CancellationToken? cancelToken,object state)
{
var tcs = new TaskCompletionSource<object>();
_taskQ.Add(new WorkItem(tcs, cancelToken,state));

return tcs.Task;
}
void Consume(object state)
{
foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
if (workItem.CancelToken.HasValue &&
workItem.CancelToken.Value.IsCancellationRequested)
{
workItem.TaskSource.SetCanceled();
}
else
try
{
DoSomeWork(workItem.State);
workItem.TaskSource.SetResult(workItem.State ); // Indicate completion
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}

void DoSomeWork(object state)
{
string forumShortName = (string)state;
string url = string.Format(ForumTemplate, forumShortName);
WebClient wc = new WebClient();
try
{
wc.DownloadString(url);
}
catch
{
// we probably timed out here so, nada!
}
finally
{
wc.Dispose();
}
Console.WriteLine("retrieved: " + forumShortName);
}
}


In EnqueueTask, we enqueue a work item which encapsulates a target delegate (In this case, null) and a task completion source — which lets us later control the task that we return to the consumer.

In Consume, we first check whether a task has been canceled after dequeuing the work item. If not, we run the DoSomeWork method and then call SetResult on the task completion source to indicate its completion.

We can now wait on task, perform continuations on it, have exceptions propagate to continuations on parent tasks, and so on. In other words, you get the advantages of the Task model while implementing your own scheduler. While the above example does not take advantage of all these features, it provides a decent base to run on that shows how they may be used.

BlockingCollection also provides static methods called AddToAny and TakeFromAny, which let you add or take an element while specifying several blocking collections. The action is then honored by the first collection able to service the request. As in the example from the previous article here, I've used the Task.Factory.StartNew(Consume,states[i],TaskCreationOptions.LongRunning) which indicates to PFX that in parallelizing the operation, to use a separate thread for each Task no matter how many cores we may have available.

Most developers I've worked with write single-threaded blocking code constructs to perform multiple operations that are essentially the same. This is highly inefficient and takes a long time to get a job done, because the paradigm is "WaitUntilIAmDoneBeforeICanDoItAgain", etc.

A few developers have mastered using the ThreadPool and BackgroundWorker classes - certainly a big improvement.
That extends the paradigm to "ICanDoThisOnMultipleThreadsAtTheSameTime".

But if you really want to be able to have your code "sing" on multiple cores with highly efficient multithreaded code constructs, you need to take the time to study and understand PFX. That extends the paradigm to "ICanDoThisOnMultipleThreadsAtTheSameTimeAndUseAllYourCPUCoresToo".

Does your PC or Server have more than one Core? I bet it does. Don't let them sit around with nothing to do on your concurrent programming needs - better to learn how to put them all to work!

You can download the complete Visual Studio 2010 solution here.

By Peter Bromberg   Popularity  (17458 Views)