Parallel Processing Messages Based On SQL Server

A few weeks ago my boss gave me a task to improve our mail sending system. At that moment we had a system which would send email without parallel, single message after message. We don't send a lot of emails per day, but we also have irregular behavior. For example, the system was idle for two hours and after that it needed to send 100 emails. After consideration I have created a simple, message-based system based on SQL Server. It should work as if  queue persisted in SQL Server single table. The email system isn't so big, so we didn't want to use RabbitMQ or MSMQ. In SQL Server we can create reports and audit data practically out of the box. Today I will present to you a smaller and not so complete version, but it's good enought to explain the assumptions. So le's switch to code.

Assumptions
  • system should be based on SQL Server and single table .
  • many parts of system can insert data to table (it represents sending emails) at the same time.
  • system should process parallel data from one table - send many emails at the same time.
  • if sending message has error, the system shouldn't mark (remove) message from table.
1. Database

The database code is very simple - all stuff is in one table.
  1. CREATE TABLE [dbo].[messages](    
  2.     [id] [int] IDENTITY(1,1) NOT NULL,    
  3.     [inserted] [datetime2](7) NOT NULL,    
  4.     [messageType] [nvarchar](512) NOT NULL,    
  5.     [messageBody] [nvarchar](maxNOT NULL,    
  6.  CONSTRAINT [PK_messages] PRIMARY KEY CLUSTERED     
  7. (    
  8.     [id] ASC    
  9. )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ONON [PRIMARY]    
  10. ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]    
  11.     
  12. GO    
  13.     
  14. ALTER TABLE [dbo].[messages] ADD  CONSTRAINT [DF_messages_inserted]  DEFAULT (sysdatetime()) FOR [inserted]    
  15. GO   
  • id - is unique message identifier - autoincrement,
  • inserted - represent date, when message was inserted
  • messageType - represent full C# type name (namespace + class name),
  • messageBody - has serialized message.
2. C# Abstractions: I'd like to start programming from defining my interfaces. So let's do this.
  1. public interface IMessageProcessor<T>    
  2. {    
  3.   void Process(T message);    
  4. }    
  5.     
  6. public interface IMessageProcessorEngine    
  7. {    
  8.   void ProcessAllMessages(int maxDegreeOfParallelism);    
  9. }    
  10.     
  11. public interface IMessageRepository : IDisposable    
  12. {    
  13.   void BeginTransaction();    
  14.   void CommitTransaction();    
  15.   void RollbackTransaction();    
  16.   DbMessageModel GetOldestMessage();    
  17.   void RemoveMessage(DbMessageModel message);    
IMessageProcessor: Implementation of this class will have logic for processing message -> in my work example, it send emails by SMTP.
  • IMessageEngine - has logic to manage tasks.
  • IMessageRepository - it responsible of manage a single message (row) in [dbo].[messages] sql table.
3. C# Implementation: Let's create class which represents the object structure of [dbo].[message] table row. 
  1. public class DbMessageModel    
  2. {    
  3.   public int Id { getset; }    
  4.   public DateTime Inserted { getset; }    
  5.   public string MessageType { getset; }    
  6.   public string MessageBody { getset; }    
  7. }  
Next let's implements IMessageRepository,
  1. public class MessageRepository : IMessageRepository    
  2.   {    
  3.     private SqlConnection _connection;    
  4.     private SqlTransaction _transaction;    
  5.     
  6.     public void BeginTransaction()    
  7.     {    
  8.       _connection =    
  9.         new SqlConnection(ConfigurationManager.ConnectionStrings["ParallelMessageProcessingDb"].ConnectionString);    
  10.       _connection.Open();    
  11.       _transaction = _connection.BeginTransaction();    
  12.     }    
  13.     
  14.     public void CommitTransaction()    
  15.     {    
  16.       _transaction.Commit();    
  17.     }    
  18.     
  19.     public void RollbackTransaction()    
  20.     {    
  21.       _transaction.Rollback();    
  22.     }    
  23.     
  24.     public DbMessageModel GetOldestMessage()    
  25.     {    
  26.       const string queryText = @"SELECT TOP (1)    
  27.                                           id,    
  28.                                           inserted,    
  29.                                           messageType,    
  30.                                           messageBody    
  31.                                       FROM dbo.messages WITH (ROWLOCK, READPAST, UPDLOCK, INDEX (PK_messages))    
  32.                                       ORDER BY id";    
  33.     
  34.       using (var command = new SqlCommand(queryText, _connection, _transaction))    
  35.       {    
  36.         var dataTable = new DataTable();    
  37.         dataTable.Load(command.ExecuteReader());    
  38.     
  39.         if (dataTable.Rows.Count == 0)    
  40.           return null;    
  41.     
  42.         return new DbMessageModel    
  43.         {    
  44.           Id = (int) dataTable.Rows[0]["id"],    
  45.           Inserted = (DateTime) dataTable.Rows[0]["inserted"],    
  46.           MessageType = (string) dataTable.Rows[0]["messageType"],    
  47.           MessageBody = (string) dataTable.Rows[0]["messageBody"]    
  48.         };    
  49.       }    
  50.     }    
  51.     
  52.     public void RemoveMessage(DbMessageModel message)    
  53.     {    
  54.       const string queryText = @"DELETE top(1) FROM dbo.messages WITH (ROWLOCK) WHERE id = @id";    
  55.     
  56.       using (var command = new SqlCommand(queryText, _connection, _transaction))    
  57.       {    
  58.         var idParameter = new SqlParameter("id", SqlDbType.Int);    
  59.         idParameter.Value = message.Id;    
  60.     
  61.         command.Parameters.Add(idParameter);    
  62.         command.ExecuteNonQuery();    
  63.       }    
  64.     }    
  65.     
  66.     public void Dispose()    
  67.     {    
  68.       _transaction.Dispose();    
  69.       _connection.Dispose();    
  70.     }    
  71.   }    
I will not explain the manage transaction, select and delete statements - I think it's quite easy. The first strange thing which  you may not know are SQL query hints.
  • ROWLOCK: it suggests SQL Server to lock single row in transaction, not whole table.

  • READPAST: tells SQL Server to get only not locked rows (not edited by another transaction etc).

  • UPDLOCK: it increases a SQL Server lock to update, so if another part of system executes thd same query, the result will not be contained to that row (readpast is responsible for that).

  • INDEX: In that scenario INDEX hint is not necessary, SQL Server will use it by default (primary key clustered index). But if you want order your rows by different column, You must create a specified index for that. For example, if in current structure of table i will order data by "inserted" column without index on that, the SQL Server need to scan whole table and automatically it will expand ROWLOCK to table lock. As i mentioned before, the ROWLOCK is only suggestion to SQL Server, but it can execute query with it own, when it need it.
The MessageProcessorEngine is responsible for managing parallel and executing specified message processor. 
  1. public class MessageProcessorEnigine : IMessageProcessorEngine    
  2.   {    
  3.     private readonly Func<Type, object> _mmessageHandlerFactory;    
  4.     
  5.     public MessageProcessorEnigine(Func<Type, object> mmessageHandlerFactory)    
  6.     {    
  7.       _mmessageHandlerFactory = mmessageHandlerFactory;    
  8.     }    
  9.     
  10.     public void ProcessAllMessages(int maxDegreeOfParallelism)    
  11.     {    
  12.       var tasks = new List<Task<bool>>();    
  13.     
  14.       while (true)    
  15.       {    
  16.         while (tasks.Count < maxDegreeOfParallelism)    
  17.         {    
  18.           Task<bool> task = Task.Factory.StartNew(() => ProcessOldestMessage());    
  19.           tasks.Add(task);    
  20.         }    
  21.     
  22.         int finishedTaskIndex = Task.WaitAny(tasks.ToArray());    
  23.         var taskResult = tasks[finishedTaskIndex].Result;    
  24.     
  25.         if (!taskResult)    
  26.         {    
  27.           Task.WaitAll(tasks.ToArray());    
  28.           break;    
  29.         }    
  30.     
  31.         tasks.RemoveAt(finishedTaskIndex);    
  32.       }    
  33.     }    
  34.     
  35.     private bool ProcessOldestMessage()    
  36.     {    
  37.       using (IMessageRepository messageRepository = new MessageRepository())    
  38.       {    
  39.         messageRepository.BeginTransaction();    
  40.         var oldestMessage = messageRepository.GetOldestMessage();    
  41.     
  42.         if (oldestMessage == null)    
  43.         {    
  44.           messageRepository.RollbackTransaction();    
  45.           return false;    
  46.         }    
  47.     
  48.         try    
  49.         {    
  50.           var messageType = Type.GetType(oldestMessage.MessageType);    
  51.           object message = null;    
  52.     
  53.           var xmlSerializer = new XmlSerializer(messageType);    
  54.           using (var stringReader = new StringReader(oldestMessage.MessageBody))    
  55.             message = xmlSerializer.Deserialize(stringReader);    
  56.              
  57.           var messageProcessorType = typeof(IMessageProcessor<>).MakeGenericType(messageType);    
  58.           dynamic messageProcessor = _mmessageHandlerFactory(messageProcessorType);    
  59.     
  60.           messageProcessor.Process((dynamic)message);    
  61.           messageRepository.RemoveMessage(oldestMessage);    
  62.           messageRepository.CommitTransaction();    
  63.         }    
  64.         catch (Exception ex)    
  65.         {    
  66.           //handling error    
  67.           messageRepository.RollbackTransaction();    
  68.         }    
  69.     
  70.         return true;    
  71.       }    
  72.     }    
  73.   }   
So, here's how it works. The engine starts a few tasks. If any one of the executing tasks return NULL from repository - it mean that is nothing to do, and the enginge will wait for finish existing tasks, otherwise it will peek next row (message) from database and process it on spearate task.

Processing single message contains: message deserialization, creating instance of specified message processor and execution of process message.
Let's write some classes that, show us how it works.
  1. namespace ParallelMessageProcessing.Messages    
  2. {    
  3.   public class SendEmailMessage    
  4.   {    
  5.     public string Recipient { getset; }    
  6.     public string Subject { getset; }    
  7.     public string Body { getset; }    
  8.   }    
  9. }    
  10.     
  11. namespace ParallelMessageProcessing.Messages    
  12. {    
  13.   public class SendEmailMessageProcessor : IMessageProcessor<SendEmailMessage>    
  14.   {    
  15.     public void Process(SendEmailMessage message)    
  16.     {    
  17.       Console.WriteLine("Sending message to: {0} with subject: {1}", message.Recipient, message.Subject);    
  18.       Task.Delay(TimeSpan.FromSeconds(2)).Wait();    
  19.     }    
  20.   }    
  21. }  
Message processor is simulating a two second sending email process.

Let's fill the  database with some data.
  1. DECLARE @counter int = 0;    
  2.     
  3. WHILE(@counter < 30)    
  4. BEGIN    
  5.     
  6. INSERT INTO dbo.messages (messageType, messageBody)    
  7.     VALUES ('ParallelMessageProcessing.Messages.SendEmailMessage''<SendEmailMessage><Recipient>[email protected]</Recipient><Subject>' + CAST(@counter AS NVARCHAR(5)) + '</Subject></SendEmailMessage>')    
  8.     
  9. SET @counter = @counter + 1    
  10. END   
The last step is to write Program.cs,
  1. internal class Program    
  2. {    
  3.   private static void Main(string[] args)    
  4.   {    
  5.     Func<Type, object> ioc = (Type type) => new SendEmailMessageProcessor();    
  6.     IMessageProcessorEngine messageEngine = new MessageProcessorEnigine(ioc);    
  7.     messageEngine.ProcessAllMessages(3);    
  8.     
  9.     Console.ReadKey();    
  10.   }    
  11. }  
Now run the app.



As You can see application processed parallel all inserted messages (from 0 do 29).

4. Conculsion

In that simple application I will show you the basis of how you can use SQL Server locks to create parallel processing system that works like queue. In your production code you should focus on error handling (in example if some will going wrong, the another task will process same message and it will over and over again). Probably you should do something like processing counter, if for example after three tries it is still going wrong - move that message to another error table.

Also, you must know that whilethe  engine is processingthe message the SQL Server keeps opening transaction - so, you shouldn't use a huge number of tasks - in our system 10 works well.

Also when something is going really wrong (for example your server is powered off), it may be a situation when the system processed the message and it will not be removed from the message table (Processor finished its job but transaction was not committed). You should add additional logic for checking that, but it will be a really rare situation.

The simple engine is a good starting point for extend (You can add your own Dependency Injection container, the message serialization will be moved out of Engine etc.)
 
Read more articles on SQL Server


Similar Articles