C# 4.0 Concurrent Collections

The System.Collections.Concurrent namespace contains the following concurrent collection types: BlockingCollection, ConcurrentBag, ConcurrentDictionary, ConcurrentQueue, and ConcurrentStack These implement the producer-consumer paradigm with the exception of the ConcurrentDictionary class. With producer-consumer, usually one thread adds elements, and a different thread removes elements.

In .NET Framework 4.0, producer-consumer collections implement the IProducerConsumerCollection interface:

public interface IProducerConsumerCollection<T> : IEnumerable<T>, ICollection, IEnumerable

where <T> specifies the type of elements in the collection.

Mixed producer-consumer collections also implement the IProducerConsumerCollection interface, which is optimized for adding and removing elements using the same thread.

ConcurrentBag is an example -- it maintains a local queue for each thread that accesses the collection. Each thread adds and removes elements by using the private local queue. Operations on the local queue are lock - free.

These collection classes use a variety of techniques to make concurrent collections lock free, including the use of lightweight synchronization, including the SpinLock and SpinWait structures.

SpinLock:

When there is contention, SpinLock does not block, but spins in user mode. This avoids blocking, which requires a kernel-mode lock. Kernel Locks always cause an expensive context switch. After the spin, ideally the contention has been removed and execution can continue without the expensive context switch. SpinLock is ideal in scenarios involving frequent contention with short waits.

SpinWait:

The SpinWait structure is also an alternative to kernel-mode locking.  SpinWait can spin for a short period where there is contention, and then attempt to reaquire the resource. If the aquisition is successful, the overall cost is less than using a monitor or semaphore.

Two-Phase Synchronization:


SpinWait structures are ideal for implementing the two-phase synchronization model. The first phase consists of trying to acquire a shared resource. If it is not available, you wait for a short period in user mode, then attempt to acquire the resource a second time. If this fails, then enter the second phase, synchronizing with a kernel-level lock and block. At some point, the kernel lock will signal and execution will be continued.

Whereas generic collections require user-defined external locks for synchronization, concurrent collections are implicitly thread safe. The internal implementation of thread safety in concurrent collections is generally more efficient than generic collections using external locks which often require locking the entire collection. Also, generic collections do not expose properties to assist with thread synchronization.
Concurrent collection classes address these shortcomings and are both type safe and thread safe.

Because external locks are not required, the code required for adding and removing elements in a parallel scenario is much simpler for concurrent collections. Concurrent collections are available for you to use anywhere, even if there is only one processor on the box.

In the following example, I've implemented a producer-consumer queue with a BlockingCollection to provide a queue of names to retrieve the list of MSDN forum groups, and then to consume the list by retrieving the Xml document from MSDN and saving it to the file system:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;


namespace BlockingCollectionSample
{
    public class ProducerConsumerQueue : IDisposable
    {
         private  string ForumTemplate = "http://social.msdn.microsoft.com/Forums/en-US/{0}/threads?outputas=xml";
        System.Collections.Concurrent.BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();
         public ProducerConsumerQueue(int workerCount, string[] states)
         {
             // Create and start a separate Task for each consumer:
            for (int i = 0; i < workerCount; i++)
                 Task.Factory.StartNew(Consume,states[i],TaskCreationOptions.LongRunning);
        }
         public void Dispose() { _taskQ.CompleteAdding(); }
         public Task EnqueueTask( object state)
         {
             return EnqueueTask(  null,state);
        }
         public Task EnqueueTask(  CancellationToken? cancelToken,object state)
        {
            var tcs = new TaskCompletionSource<object>();
             _taskQ.Add(new WorkItem(tcs, cancelToken,state));
          
            return tcs.Task;
        }
         void Consume(object state)
        {
            foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
                 if (workItem.CancelToken.HasValue &&
                      workItem.CancelToken.Value.IsCancellationRequested)
                 {
                     workItem.TaskSource.SetCanceled();
                 }
                 else
                     try
                    {
                        DoSomeWork(workItem.State);
                        workItem.TaskSource.SetResult(workItem.State );   // Indicate completion
                    }
                    catch (Exception ex)
                     {
                          workItem.TaskSource.SetException(ex);
                     }
         }

          void DoSomeWork(object state)
        {
            string forumShortName = (string)state;
            string url = string.Format(ForumTemplate, forumShortName);
            string forumXml;
            WebClient wc = new WebClient();
             try
            {
             forumXml=   wc.DownloadString(url);
               System.IO.File.WriteAllText(System.Environment.CurrentDirectory + @"\" +forumShortName + ".xml",forumXml);
             }
             catch
             {
                 // we probably timed out here so, nada!
            }
            finally
            {
                wc.Dispose();
            }
           Console.WriteLine("retrieved: " + forumShortName);
        }
    }
}


The program that drives this in the Console app is as follows:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace BlockingCollectionSample
{
    class Program
    {
      

        private static string _state;
        
         static void Main(string[] args)
        {
            string[] forums = File.ReadAllLines(Environment.CurrentDirectory + @"\forums.txt");
            Task[] tasks = new Task[forums.Length];
            ProducerConsumerQueue  q = new ProducerConsumerQueue(tasks.Length, forums );
            DateTime start = DateTime.Now;
            for (int i = 0; i < tasks.Length ; i++ )
            {
                _state = forums[i];
                Task task = q.EnqueueTask(null,_state);
                tasks[i] = task;
            }
            Task.WaitAll(tasks);
            DateTime end = DateTime.Now;
            TimeSpan elapsed = end - start;
           Console.WriteLine("Done in " +elapsed.TotalMilliseconds.ToString( ) +" ms.");
            
           Console.ReadKey();
        }
    }
}

You can download the sample Visual Studio 2010 solution here.

By Peter Bromberg   Popularity  (5760 Views)