Reader Level:
ARTICLE

Handling the Queuing of Messages in a Multithreaded Program

Posted by Mike Gold Articles | Multithreading in C# September 06, 2004
In the financial world you have to deal with messages being spewed at you in large quantities at a rapid rate. For example stock quotes, market data, and orders come flying at you through some sort of wire and you as a programmer have to handle them in a way that doesn’t overwhelm you or the machine.
  • 0
  • 0
  • 30022

Figure 1 - Sending Messages to the Queue

In the financial world you have to deal with messages being spewed at you in large quantities at a rapid rate. For example stock quotes, market data, and orders come flying at you through some sort of wire and you as a programmer have to handle them in a way that doesn't overwhelm you or the machine.  I suppose a real-world analogy of this message traffic might be as if someone came by your cubicle every second and dumped a wastepaper basket on your desk, and then your boss shouts across the room, "Arrange those papers in some way that I can read them!"  In this situation we turn to a method of queuing up the messages and handling them when the program is good and ready.  Also, since message handling is only a small portion of our program requirements, we shove the queue into a separate thread, so it doesn't hog the processing of the rest of the program (such as processing of user input). 

To handle the queuing confusion, I've designed a singleton class called MessageOrderWriter that allows the programmer to write their message to the class and forget about it.  The MessageOrderWriter processes all messages in the queue in a separate thread from the thread asking for the processing.  (Of course, the user should keep in mind that if they pass messages faster than the queue can process them, the queue will grow rather large.  It is problems like these that the financial industry is tackling everyday.)   

Below is the singleton class of the MessageOrderWriter shown in a UML Diagram:

Figure 2 - UML Diagram of the MessageOrderWriter reverse engineered using WithClass  for .NET

There are three steps to processing the message in the queue 

  1. Spawn a thread that sits around in an infinite while loop and waits for the queue to populate
  2. Write a Message to the Queue
  3. Dequeue the message in the while loop, and process the message

Although this process seems pretty simple upon first glance, it actually becomes a little bit more complicated when you introduce multithreading.  First of all we want a queue that is thread-safe.  For this, we turn to the Synchronized queue.  Microsoft allows you to create a wrapped synchronized queue from your existing queue using the following calls: 

_orderMessageQueue = new Queue();
_syncOrderMessageQueue = Queue.Synchronized(_orderMessageQueue);
 

Internally, Microsoft locks the queue when you enqueue and dequeue, however, you can actually do your own manual locking, which we will do in this article. Now that we have a synchronized queue, we need to process it.  First we will spawn the separate thread that will process the queue: 

public void Start()
{
// Open a file for writing
FileStream fs = new FileStream("myfile.data", FileMode.Create, FileAccess.Write);
// create a binary file to write the messages to
_streamWriter = new BinaryWriter(fs);
// start a new thread for writing messages
_queueWritingThread = new Thread(new ThreadStart(ProcessQueue));
// make it a background thread and start it
_queueWritingThread.IsBackground = true; _queueWritingThread.Start();
}
 

Next we will process the queue by looping and dequeuing the messages as we loop.  When we have exhausted the queue, we will sit and wait until we have more items in the queue.  It is important, that as we are dequeuing the queue, we lock the queue so the queue count doesn't change when we check it.  It's also important that we process the slow operation of writing the message (SerializeMessage) outside the lock, because locking down the file writing would hinder performance.  When we do not have any messages to process in the queue, we simply block the thread until a new message appears in the queue with a ManualResetEvent

void ProcessQueue()
{
IMessage msg;
// infinitely loop and dequeue messages
while (true)
{
msg =
null;
// Lock the queue so that other threads cannot
// enter while we are checking the queue count and
// dequeuing the message
lock (_syncOrderMessageQueue.SyncRoot)
{
// if we have messages in the queue
// lock the thread, and dequeue the next message
if (_syncOrderMessageQueue.Count > 0)
{
msg = (IMessage)_syncOrderMessageQueue.Dequeue();
}
else
{
// Tell the ManualResetEvent to block the thread
// When the WaitOne method is called on the
// ManualResetEvent since there are no messages
// in the queue.
// The thread will become unblocked, when the
// ManualResetEvent is signalled to Set.
_waitForObject.Reset();
}
}
if (msg != null)
{
// serialize the message to the file outside the lock
SerializeMessage(msg);
}
else
{
// queue is empty, so block thread until Set event is called
_waitForObject.WaitOne();
}
}
// end while
}

Adding messages to the queue is performed outside the thread since it is not a CPU intensive operation.  We lock when we put messages on the queue so it doesn't interfere with the other queue threads. The message is enqueued and the manual reset event calls Set to release it from its blocking behavior because there is now at least one message on the queue that we can dequeue. 

public void WriteMessage(IMessage msg, string subject)
{
lock (_syncOrderMessageQueue.SyncRoot)
{
_syncOrderMessageQueue.Enqueue(msg);
// We now have a message in the queue.
// signal the manual reset event to unblock the thread
_waitForObject.Set();
}
}
 

When we are finished using our queue thread, we can call a StopWriting to abort the thread.  Also in this method, we can flush and close the file we are writing.  Of course a better technique would be to wait until the queue is empty and then abort the thread.  However, in the case when it will never be empty (because messages keep coming anyway), and you just want to finish writing, this is an acceptable solution. 

public void StopWriting ()
{
_streamWriter.Close();
try
{
// stop the queueing thread
_queueWritingThread.Abort();
}
catch (Exception ex)
{
// thread aborted;
Console.WriteLine("Finished Recording");
}
}
 

Conclusion  

Although thread management is a complex topic,  threading issues are much alleviated with the help of .NET constructs such as locks, semaphores and mutexes.  Remember to lock collection operations that requiring adding, removing, or checking if the collection is empty to avoid race conditions.  Avoid locks when you can if they surround code that is CPU intensive.  Anyway I hope you manage to thread your way through multiprocess programming and unlock the secrets of coding with C# and .NET.

COMMENT USING

Trending up