A .NET Blocking Queue Class

by Daniel Schwieg

Blocking Queue

Microsoft provides a Queue object in its .NET collections namespace. Unfortunately, it is not thread safe and if Dequeue() is called on an empty queue an InvalidOperation exception is thrown. To cope with these limitations in a multi-threaded environment, the programmer must implement code like this:

Queue q = new Queue();

 

void ProducerThread()

  {

    while (!done)

     {

        MyData d = GetData();

        lock (q.SyncRoot)

         {

            q.Enqueue(d);

         }

        Thread.Sleep(100);

    }

  }

 

void ConsumerThread()

  {

     while (!done)

       {

         lock (q.SyncRoot)

          {

            while (q.Count > 0)

               {

                 MyData d = (MyData)q.Dequeue();

                 process(d);

               }

          }

           Thread.Sleep(100);

       }

  }



In this code snippet we have one producer thread and one or more consumer threads. The producer thread gets a data object and then it gets exclusive access to the queue. It then enqueues the data object and sleeps for 100 milliseconds. The consumer thread loops, getting exclusive access to the queue, and checking to see if there are any objects to dequeue. If there are, the objects are dequeued and processed. If there is not, the thread sleeps for 100 milliseconds and then tries again.

The down side to this is that if there is a lot of time between the enqueueing of objects, the consumer thread will spend a lot of CPU time just checking to see if there is anything to do. It would be more efficient if the consumer thread was blocked from executing until there was an object in the queue.

This is the purpose of a blocking queue. With this type of queue, the thread that calls the Dequeue method is blocked until there is an object in the queue.

To implement a blocking queue, we first extend the queue object in the collections namespace. We override the Enqueue and Dequeue methods to provide the blocking functionality. We also override the Clear method. This method also changes the state of the queue, so we need to create a thread safe version.

To provide the blocking functionality, we use the Monitor class. The Monitor.Wait method blocks the calling thread and releases any locks it has acquired. The Monitor.Pulse method notifies a waiting thread which then reacquires the lock and resumes execution.

In the Dequeue method, we first acquire the lock on the queue to prevent any other thread from modifying it. We then check to see if there is anything in the queue. A while loop is used here in the case that there is more than one consumer thread. In that scenario, it is possible that while this thread is waiting, the producer thread enqueues an object and wakes this thread to consume it. Before this thread can reacquire the lock, another thread gets it, and consumes the object. When this thread gets the lock, the queue is empty again so a check is made on the count before the Dequeue method is called. If it is zero, the thread calls the Monitor.Wait method to go back to sleep until there is something in the queue. If there is something in the queue, we call the base class Dequeue method and return the object to the caller.

In the Enqueue method, we simply acquire the lock, call the base class Enqueue method and wake up any thread waiting for an object in the queue by calling the Monitor.Pulse method.

using System;

using System.Collections;

using System.Threading;

 

namespace System.Collections

{

    /// <summary>

    /// Same as Queue except Dequeue function blocks until there is

    /// an object to return.

    /// Note: This class does not need to be synchronized

    /// </summary>

    public class BlockingQueue : Queue

    {

 

        /// <summary>

        /// Remove all objects from the Queue.

        /// </summary>

        public override void Clear()

        {

            lock (base.SyncRoot)

            {

                base.Clear();

            }

        }

 

        /// <summary>

        /// Removes and returns the object at the beginning of the Queue.

        /// </summary>

        /// <returns>Object in queue.</returns>

        public override object Dequeue()

        {

            lock (base.SyncRoot)

            {

                while (base.Count == 0)

                {

                    Monitor.Wait(base.SyncRoot);

                }

                return base.Dequeue();

            }

        }

 

        /// <summary>

        /// Adds an object to the end of the Queue.

        /// </summary>

        /// <param name="obj">Object to put in queue</param>

        public override void Enqueue(object obj)

        {

            lock (base.SyncRoot)

            {

                base.Enqueue(obj);

                Monitor.Pulse(base.SyncRoot);

            }

        }

    }

}

We can enhance this queue by providing versions of the Dequeue function that accept timeout values like so:

/// <summary>

    /// Removes and returns the object at the beginning of the Queue.

    /// </summary>

    /// <returns>Object in queue.</returns>

public override object Dequeue()

  {

     return Dequeue(Timeout.Infinite);

  }

 

    /// <summary>

    /// Removes and returns the object at the beginning of the Queue.

    /// </summary>

    /// <param name="timeout">time to wait before returning</param>

    /// <returns>Object in queue.</returns>

public object Dequeue(TimeSpan timeout)

  {

     return Dequeue(timeout.Milliseconds);

  }

 

    /// <summary>

    /// Removes and returns the object at the beginning of the Queue.

    /// </summary>

    /// <param name="timeout">time to wait before returning</param>

    /// <returns>Object in queue.</returns>

public object Dequeue(int timeout)

  {

     lock (base.SyncRoot)

      {

        while (base.Count == 0)

          {

             if (!Monitor.Wait(base.SyncRoot, timeout))

             throw new InvalidOperationException("Timeout");

          }

           return base.Dequeue();

      }

  }

Now our queue code looks like this:

BlockingQueue q = new BlockingQueue();

 

void ProducerThread()

  {

    while (!done)

      {

        MyData d = GetData();

        q.Enqueue(d);

        Thread.Sleep(100);

     }

  }

 

void ConsumerThread()

  {

    while (!done)

      {

        MyData d = (MyData)q.Dequeue();

        process(d);

      }

  }

The consumer thread only executes when there is something in the queue to process, and doesn’t waste CPU time polling when there is nothing to do.

To round out our blocking queue implementation, we should create constructors for each type of constructor in the Queue class. We should also add the ability to close the queue when we want to terminate our application. This wakes up all waiting threads and allows them to clean up before exiting.

The full implementation of the blocking queue is as follows:

using System;

using System.Collections;

using System.Threading;

 

namespace System.Collections

{

    /// <summary>

    /// Same as Queue except Dequeue function blocks until there is an object to return.

    /// Note: This class does not need to be synchronized

    /// </summary>

    public class BlockingQueue : Queue

    {

        private bool open;

 

        /// <summary>

        /// Create new BlockingQueue.

        /// </summary>

        /// <param name="col">The System.Collections.ICollection to copy elements from</param>

        public BlockingQueue(ICollection col) : base(col)

        {

            open = true;

        }

 

        /// <summary>

        /// Create new BlockingQueue.

        /// </summary>

        /// <param name="capacity">The initial number of elements that the queue can contain</param>

        /// <param name="growFactor">The factor by which the capacity of the queue is expanded</param>

        public BlockingQueue(int capacity, float growFactor) : base(capacity, growFactor)

        {

            open = true;

        }

 

        /// <summary>

        /// Create new BlockingQueue.

        /// </summary>

        /// <param name="capacity">The initial number of elements that the queue can contain</param>

        public BlockingQueue(int capacity) : base(capacity)

        {

            open = true;

        }

 

        /// <summary>

        /// Create new BlockingQueue.

        /// </summary>

        public BlockingQueue() : base()

        {

            open = true;

        }

 

        /// <summary>

        /// BlockingQueue Destructor (Close queue, resume any waiting thread).

        /// </summary>

        ~BlockingQueue()

        {

            Close();

        }

 

        /// <summary>

        /// Remove all objects from the Queue.

        /// </summary>

        public override void Clear()

        {

            lock (base.SyncRoot)

            {

                base.Clear();

            }

        }

 

        /// <summary>

        /// Remove all objects from the Queue, resume all dequeue threads.

        /// </summary>

        public void Close()

        {

            lock (base.SyncRoot)

            {

                open = false;

                base.Clear();

                Monitor.PulseAll(base.SyncRoot);    // resume any waiting threads

            }

        }

 

        /// <summary>

        /// Removes and returns the object at the beginning of the Queue.

        /// </summary>

        /// <returns>Object in queue.</returns>

        public override object Dequeue()

        {

            return Dequeue(Timeout.Infinite);

        }

 

        /// <summary>

        /// Removes and returns the object at the beginning of the Queue.

        /// </summary>

        /// <param name="timeout">time to wait before returning</param>

        /// <returns>Object in queue.</returns>

        public object Dequeue(TimeSpan timeout)

        {

            return Dequeue(timeout.Milliseconds);

        }

 

        /// <summary>

        /// Removes and returns the object at the beginning of the Queue.

        /// </summary>

        /// <param name="timeout">time to wait before returning (in milliseconds)</param>

        /// <returns>Object in queue.</returns>

        public object Dequeue(int timeout)

        {

            lock (base.SyncRoot)

            {

                while (open && (base.Count == 0))

                {

                    if (!Monitor.Wait(base.SyncRoot, timeout))

                        throw new InvalidOperationException("Timeout");

                }

                if (open)

                    return base.Dequeue();

                else

                    throw new InvalidOperationException("Queue Closed");

            }

        }

 

        /// <summary>

        /// Adds an object to the end of the Queue.

        /// </summary>

        /// <param name="obj">Object to put in queue</param>

        public override void Enqueue(object obj)

        {

            lock (base.SyncRoot)

            {

                base.Enqueue(obj);

                Monitor.Pulse(base.SyncRoot);

            }

        }

 

        /// <summary>

        /// Open Queue.

        /// </summary>

        public void Open()

        {

            lock (base.SyncRoot)

            {

                open = true;

            }

        }

 

        /// <summary>

        /// Gets flag indicating if queue has been closed.

        /// </summary>

        public bool Closed

        {

            get

            {

                return !open;

            }

        }

    }

}

 

Download the Visual Studio 2003 Solution that accompanies this article

Article Discussion: