A producer/consumer queue is a common requirement in threading. A queue is set up
to describe work items or data on which work is performed. When a task needs
to be executed, it is enqueued, allowing the caller to perform other actions.
One or more worker threads work in the background, dequeueing and executing queued
items.
One advantage of this model is that you have precise control over how many worker
threads execute at once. This allows you to limit consumption of CPU time and
other resources. If the tasks perform intensive disk I/O you might have just
one worker thread to avoid starving the operating system. Another type of application
may have 100 worker threads. You can also dynamically add and remove workers
throughout the queue’s life. The CLR ThreadPool itself is a kind of producer/consumer
queue.
A producer/consumer queue typically holds items of data on which the same task is
performed. By making the item a delegate, you can write a very general-purpose
producer/consumer queue where each item can do anything.
.Net Framework 4.0 provides a set of new collections in the System.Collections.Concurrent
namespace. These are fully thread-safe, and are designed to be the Parallel (PFX)
counterparts of their Generic non-parallel equivalents:
ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>
BlockingCollection<T>
ConcurrentDictionary<TKey,TValue>
ConcurrentBag<T>, which has no non-parallel counterpart, is unique in that
it stores an unordered collection of objects with duplicates permitted. ConcurrentBag<T>
is suitable in situations when you don’t care which element you get when calling
Take or TryTake. The benefit of ConcurrentBag<T> over a concurrent Queue
or Stack is that the Bag’s Add method has almost no contention when called by
many threads at once.
The IProducerConsumerCollection<T>
A producer/consumer collection is one for which the two primary use cases are:
- Adding an element (“producing”)
- Retrieving an element while removing it (“consuming”)
The typical examples would be Stacks and Queues. Producer/consumer collections are
significant in parallel programming because they are conducive to efficient lock-free
implementations.
The IProducerConsumerCollection<T> interface represents a thread-safe producer/consumer
collection. The following classes implement this interface:
ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>
The BlockingCollection<T>
If you call TryTake on any of the producer/consumer collections mentioned and the collection is empty,
the method returns false. Sometimes it would be more useful to wait until an
element is available.
The designers of PFX encapsulated this functionality into a wrapper class called
BlockingCollection<T>. A blocking collection wraps any collection that
implements IProducerConsumerCollection<T> and lets you Take an element from the wrapped collection — blocking if no element is available.
A blocking collection also lets you limit the total size of the collection, blocking
the producer if that size is exceeded.
To use BlockingCollection<T>:
1. You instantiate the class, optionally specifying the IProducerConsumerCollection<T>
to wrap and the maximum size (bound) of the collection.
2.You Call Add (blocking) or TryAdd (non-blocking) to add elements to the underlying collection.
3. Call Take or TryTake to remove (consume) elements from the underlying collection.
If you call the constructor without passing in a collection, the class will automatically
instantiate a ConcurrentQueue<T>. The producing and consuming methods let you specify cancellation tokens and timeouts.
Add and TryAdd may block if the collection size is bounded; Take and TryTake
block while the collection is empty.
Another way to consume elements is to call GetConsumingEnumerable, which I use in the following example. This returns a sequence that yields elements as they become available. You can force
the sequence to end by calling CompleteAdding. This method also prevents further elements from being enqueued.
The example code here borrows on the sample code provided in "C# 4.0 in a Nutshell (book review)" by the Albahari brothers from O'Reilly, and uses the example model that I
used in this previous article. I've modified the code to permit the passing of a state object which allows the
passing of an MSDN forums "short name" allowing for the download of
the MSDN forum XML document for each forum based on an HTTP template:
static void Main(string[] args)
{
string[] forums = File.ReadAllLines(Environment.CurrentDirectory + @"\forums.txt");
Task[] tasks = new Task[forums.Length];
ProducerConsumerQueue q = new ProducerConsumerQueue(tasks.Length, forums );
DateTime start = DateTime.Now;
for (int i = 0; i < tasks.Length ; i++ )
{
_state = forums[i];
Task task = q.EnqueueTask(null,_state);
tasks[i] = task;
}
Task.WaitAll(tasks);
DateTime end = DateTime.Now;
TimeSpan elapsed = end - start;
Console.WriteLine("Done in " +elapsed.TotalMilliseconds.ToString( ) +" ms.");
Console.ReadKey();
}
public class WorkItem
{
public readonly TaskCompletionSource<object> TaskSource;
public readonly CancellationToken? CancelToken;
public readonly object State;
public WorkItem(TaskCompletionSource<object> taskSource,
CancellationToken? cancelToken,
object state)
{
TaskSource = taskSource;
//Action = action;
CancelToken = cancelToken;
State = state;
}
}
public class ProducerConsumerQueue : IDisposable
{
private string ForumTemplate = "http://social.msdn.microsoft.com/Forums/en-US/{0}/threads?outputas=xml";
System.Collections.Concurrent.BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();
public ProducerConsumerQueue(int workerCount, string[] states)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume,states[i],TaskCreationOptions.LongRunning);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public Task EnqueueTask( object state)
{
return EnqueueTask( null,state);
}
public Task EnqueueTask( CancellationToken? cancelToken,object state)
{
var tcs = new TaskCompletionSource<object>();
_taskQ.Add(new WorkItem(tcs, cancelToken,state));
return tcs.Task;
}
void Consume(object state)
{
foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
if (workItem.CancelToken.HasValue &&
workItem.CancelToken.Value.IsCancellationRequested)
{
workItem.TaskSource.SetCanceled();
}
else
try
{
DoSomeWork(workItem.State);
workItem.TaskSource.SetResult(workItem.State ); // Indicate completion
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
void DoSomeWork(object state)
{
string forumShortName = (string)state;
string url = string.Format(ForumTemplate, forumShortName);
WebClient wc = new WebClient();
try
{
wc.DownloadString(url);
}
catch
{
// we probably timed out here so, nada!
}
finally
{
wc.Dispose();
}
Console.WriteLine("retrieved: " + forumShortName);
}
}
In EnqueueTask, we enqueue a work item which encapsulates a target delegate (In this case, null)
and a task completion source — which lets us later control the task that we return
to the consumer.
In Consume, we first check whether a task has been canceled after dequeuing the work item.
If not, we run the DoSomeWork method and then call SetResult on the task completion source to indicate its completion.
We can now wait on task, perform continuations on it, have exceptions propagate to
continuations on parent tasks, and so on. In other words, you get the advantages
of the Task model while implementing your own scheduler. While the above example
does not take advantage of all these features, it provides a decent base to run
on that shows how they may be used.
BlockingCollection also provides static methods called AddToAny and TakeFromAny, which let you add or take an element while specifying several blocking collections.
The action is then honored by the first collection able to service the request.
As in the example from the previous article here, I've used the Task.Factory.StartNew(Consume,states[i],TaskCreationOptions.LongRunning)
which indicates to PFX that in parallelizing the operation, to use a separate
thread for each Task no matter how many cores we may have available.
Most developers I've worked with write single-threaded blocking code constructs to
perform multiple operations that are essentially the same. This is highly inefficient
and takes a long time to get a job done, because the paradigm is "WaitUntilIAmDoneBeforeICanDoItAgain", etc.
A few developers have mastered using the ThreadPool and BackgroundWorker classes
- certainly a big improvement.
That extends the paradigm to "ICanDoThisOnMultipleThreadsAtTheSameTime".
But if you really want to be able to have your code "sing" on multiple
cores with highly efficient multithreaded code constructs, you need to take the
time to study and understand PFX. That extends the paradigm to "ICanDoThisOnMultipleThreadsAtTheSameTimeAndUseAllYourCPUCoresToo".
Does your PC or Server have more than one Core? I bet it does. Don't let them sit
around with nothing to do on your concurrent programming needs - better to learn
how to put them all to work!
You can download the complete Visual Studio 2010 solution here.