In .NET Framework 4.0, producer-consumer collections implement the IProducerConsumerCollection
interface:
public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection,
IEnumerable
where <T> specifies the type of elements in the collection.
Mixed producer-consumer collections also implement the IProducerConsumerCollection
interface, which is optimized for adding and removing elements using the same
thread.
ConcurrentBag is an example -- it maintains a local queue for each thread that accesses
the collection. Each thread adds and removes elements by using the private local
queue. Operations on the local queue are lock - free.
These collection classes use a variety of techniques to make concurrent collections
lock free, including the use of lightweight synchronization, including the SpinLock
and SpinWait structures.
SpinLock:
When there is contention, SpinLock does not block, but spins in user mode. This avoids
blocking, which requires a kernel-mode lock. Kernel Locks always cause an expensive
context switch. After the spin, ideally the contention has been removed and execution
can continue without the expensive context switch. SpinLock is ideal in scenarios
involving frequent contention with short waits.
SpinWait:
The SpinWait structure is also an alternative to kernel-mode locking. SpinWait can
spin for a short period where there is contention, and then attempt to reaquire
the resource. If the aquisition is successful, the overall cost is less than
using a monitor or semaphore.
Two-Phase Synchronization:
SpinWait structures are ideal for implementing the two-phase synchronization model.
The first phase consists of trying to acquire a shared resource. If it is not
available, you wait for a short period in user mode, then attempt to acquire
the resource a second time. If this fails, then enter the second phase, synchronizing
with a kernel-level lock and block. At some point, the kernel lock will signal
and execution will be continued.
Whereas generic collections require user-defined external locks for synchronization,
concurrent collections are implicitly thread safe. The internal implementation
of thread safety in concurrent collections is generally more efficient than generic
collections using external locks which often require locking the entire collection.
Also, generic collections do not expose properties to assist with thread synchronization.
Concurrent collection classes address these shortcomings and are both type safe and
thread safe.
Because external locks are not required, the code required for adding and removing
elements in a parallel scenario is much simpler for concurrent collections. Concurrent
collections are available for you to use anywhere, even if there is only one
processor on the box.
In the following example, I've implemented a producer-consumer queue with a BlockingCollection
to provide a queue of names to retrieve the list of MSDN forum groups, and then
to consume the list by retrieving the Xml document from MSDN and saving it to
the file system:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
namespace BlockingCollectionSample
{
public class ProducerConsumerQueue : IDisposable
{
private string ForumTemplate = "http://social.msdn.microsoft.com/Forums/en-US/{0}/threads?outputas=xml";
System.Collections.Concurrent.BlockingCollection<WorkItem> _taskQ =
new BlockingCollection<WorkItem>();
public ProducerConsumerQueue(int workerCount, string[] states)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume,states[i],TaskCreationOptions.LongRunning);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public Task EnqueueTask( object state)
{
return EnqueueTask( null,state);
}
public Task EnqueueTask( CancellationToken? cancelToken,object state)
{
var tcs = new TaskCompletionSource<object>();
_taskQ.Add(new WorkItem(tcs, cancelToken,state));
return tcs.Task;
}
void Consume(object state)
{
foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
if (workItem.CancelToken.HasValue &&
workItem.CancelToken.Value.IsCancellationRequested)
{
workItem.TaskSource.SetCanceled();
}
else
try
{
DoSomeWork(workItem.State);
workItem.TaskSource.SetResult(workItem.State ); // Indicate completion
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
void DoSomeWork(object state)
{
string
forumShortName = (string)state;
string
url = string.Format(ForumTemplate, forumShortName);
string
forumXml;
WebClient
wc = new WebClient();
try
{
forumXml= wc.DownloadString(url);
System.IO.File.WriteAllText(System.Environment.CurrentDirectory + @"\" +forumShortName + ".xml",forumXml);
}
catch
{
// we probably timed out here so, nada!
}
finally
{
wc.Dispose();
}
Console.WriteLine("retrieved: " + forumShortName);
}
}
}
The program that drives this in the Console app is as follows:
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
namespace BlockingCollectionSample
{
class Program
{
private static string _state;
static void Main(string[] args)
{
string[] forums = File.ReadAllLines(Environment.CurrentDirectory + @"\forums.txt");
Task[] tasks = new Task[forums.Length];
ProducerConsumerQueue q = new ProducerConsumerQueue(tasks.Length, forums );
DateTime start = DateTime.Now;
for (int i = 0; i < tasks.Length ; i++ )
{
_state = forums[i];
Task task = q.EnqueueTask(null,_state);
tasks[i] = task;
}
Task.WaitAll(tasks);
DateTime end = DateTime.Now;
TimeSpan elapsed = end - start;
Console.WriteLine("Done in " +elapsed.TotalMilliseconds.ToString( ) +" ms.");
Console.ReadKey();
}
}
}
You can download the sample Visual Studio 2010 solution here.