Synchronized custom queue

By Ehtesham Blogs | C# Language Sep 19, 2012
This post demonstrates use of synchronization on thread with extended custom queue.

Many times we come across situations were need to continuously execute a block of code

For instance reading data from network port (socket program), or from RFID where we create secondary thread for continuously reading and another thread for same data manipulation

Let me demonstrate with a basic read from keyboard (on primary thread) and write (on secondary thread) in console application with queue

using System.Threading;
namespace ConsoleApplication2
{
    class Program
    {
        static Queue<string> q = new Queue<string>();
        static Thread ThreadObj;
        static void ReadMethod()
        {
            while (true)
            {
                if (q.Count > 0)
                {

                              //here we would manipulate data on secondary thread
                    string str = q.Dequeue();
                             Console.WriteLine(string.Format("Dequeued item {0}", str));

                }
            }
        }
        static void Main(string[] args)
        {
            ThreadObj = new Thread(new ThreadStart(ReadMethod));
            ThreadObj.Start();
            loc:

                        //here read data on primary thread,for instance from network port
              string input = Console.ReadLine();
                        q.Enqueue(input);
              goto loc;

        }
    }
}


Where read method executes while loop infinite times, with a check is made if queue has string item if yes write it and continue with next iteration

obviously we need to avoid while loop to continue execution until there is an enqueue happen on queue, to do that consider the below example

class Program
{
    static Queue<string> q = new Queue<string>();
    static Thread ThreadObj;
    static object LockObj = new object();
    static void ReadMethod()
    {
        while (true)
        {
            if (q.Count == 0)
                lock (LockObj)
                {
                    //hold a lock on LockObj and wait for a signal if queue is empty
                    Monitor.Wait(LockObj);
                }
            string str = q.Dequeue();
            Console.WriteLine(string.Format("Dequeued item {0}", str));
        }
    }
    static void Main(string[] args)
    {
        ThreadObj = new Thread(new ThreadStart(ReadMethod));
        ThreadObj.Start();
    loc:
        string input = Console.ReadLine();
        q.Enqueue(input);
        lock (LockObj)
        {
            //signal to continue on LockObj
            Monitor.Pulse(LockObj);
        }
        goto loc;
    }
}

From the above code ReadMethod while loop is blocked and put on wait until queue is Enqueued, ReadMethod uses Monitor.Wait(LockObj) which blocks the current thread execution on static object LockObj until receives a signal using Monitor.Pulse(LockObj).

Note : Object synchronization Monitor.Pulse and Monitor.Wait must be called from an synchronized block of code hence we use lock keyword(similarly you can user Monitor.Enter, Monitor.Exit methods or Synchronization attribute to synchronize between threads).

Let us put the above code in ready made form by extending .Net library class Queue

Take 1

sealed class AsyncQ<T> : Queue<T>
{
    #region Members
    //Delegate points to Readmethod
    Action ActnMethod;
    //Object to put a lock on
    object LockerObj = new object();
    //secondary thread
    Thread ThreadObj;
    #endregion
    #region Constructors
    /// <summary>
    /// Intialize object
    /// </summary>
    /// <param name="actmethodsignature">Method to execute on Enqueue</param>
    public AsyncQ(Action actmethodsignature)
    {
        this.ActnMethod = actmethodsignature;
        ThreadObj = new Thread(new ThreadStart(this.RunDequeue));
        ThreadObj.Start();
    }
    #endregion
    #region User Defined Methods
    /// <summary>
    ///  Adds an object to the end of the System.Collections.Generic.Queue<T>.
    /// </summary>
    /// <param name="item">Item</param>
    internal new void Enqueue(T item)
    {
        base.Enqueue(item);
        lock (LockerObj)
            //signal to continue on LockObj
            Monitor.Pulse(LockerObj);
    }
    /// <summary>
    ///  Removes and returns the object at the beginning of System.Collections.Generic.Queue<T>.
    /// </summary>
    /// <returns></returns>
    internal new T Dequeue()
    {
        return base.Dequeue();
    }
    /// <summary>
    /// runs on a secondary thread
    /// </summary>
    void RunDequeue()
    {
        while (true)
        {
            if (this.Count == 0)
            {
                lock (LockerObj)
                    //wait for a signal if queue is empty
                    Monitor.Wait(LockerObj);
            }
            //calls ReadMethod
            ActnMethod();
        }
    }
    #endregion
}

class Program
{
    static AsyncQ<string> q = new AsyncQ<string>(Readmethod);
    static void Readmethod()
    {
        string str = q.Dequeue();
        Console.WriteLine(string.Format("Dequeued item {0}", str));
    }
    static void Main(string[] args)
    {
    loc:
        string input = Console.ReadLine();
        q.Enqueue(input);
        goto loc;
    }
}

Take 2 with Asynchronous Callback

sealed class AsyncCallBackQ<T> : Queue<T>
{
    #region Members
    //callback delegate for ReadMethod
    AsyncCallback TargetMethod;
    #endregion
    //delegate to call base.Enqueue
    delegate void EnqueueDeleg(T item);
    #region Constructors
    /// <summary>
    /// initialize object
    /// </summary>
    /// <param name="targetmethod">callback method</param>
    public AsyncCallBackQ(AsyncCallback targetmethod)
    {
        this.TargetMethod = targetmethod;
    }
    #endregion
    #region User Defined Methods
    /// <summary>
    ///  Adds an object to the end of the System.Collections.Generic.Queue<T>.
    /// </summary>
    /// <param name="item">Item</param>
    internal new void Enqueue(T item)
    {
        EnqueueDeleg enqdel = new AsyncCallBackQ<T>.EnqueueDeleg(base.Enqueue);
        //Add item asynchronously and call back read method on finish(Creates secondary thread implicitly)
        IAsyncResult Iresult = enqdel.BeginInvoke(item, this.TargetMethod, null);
        enqdel.EndInvoke(Iresult);
    }
    /// <summary>
    ///  Removes and returns the object at the beginning of the System.Collections.Generic.Queue<T>.
    /// </summary>
    /// <returns></returns>
    internal new T Dequeue()
    {
        return base.Dequeue();
    }
    #endregion
} 

class Program
{
    static AsyncCallBackQ<string> q = new AsyncCallBackQ<string>(Readmethod);
    static object Lockobj=new object();
    static void Readmethod(IAsyncResult result)
    {
      lock (Lockobj)
      {
        string str = q.Dequeue();
        Console.WriteLine(string.Format("Dequeued item {0}", str));
      }
    }
    static void Main(string[] args)
    {
    loc:
        string input = Console.ReadLine();
        q.Enqueue(input);
        goto loc;
    }
}

COMMENT USING

PREMIUM SPONSORS

Hire Mobile & Web Developer on demand. 100% satisfaction. Try for 1 week or Money Back. Local and remote developers available all over USA.

Latest Blogs