Azure Service Bus - Working With Queue In A Real World Scenario

This article explains a real-world business requirement and how Azure Service Bus Queue helps to fulfill the requirement.

Business Requirement

One of the utility bill payments company wants to build an application, which will do the mobile recharge for end users. The application should support all the operators which are available in the market. The flow is, the user sends SMS in predefined format and application should parse the message and do the recharge accordingly.

Now, the biggest challenge is the application gets a huge number of recharge requests during the peak hours and is not able to process each request, so the company is looking for something like where all the requests are parked when they come and process one by one.

Azure Service Bus

The company came to know about the Azure Service Bus provided by Azure, so he started exploring Azure Service Bus.

Azure Service Bus supports cloud-based message-oriented middleware technologies like Queue, Topic, and Relay.

Azure Service Bus 

Here in order to meet the business requirement, we will use the queue mechanism where all the requests will be added in the queue and another side queue listener will process the message by reading from the queue.

Queue

Azure Service Bus 

The sender sends the message, adds it into the queue and the receiver processes the message in a FIFO (First In First Out) manner. The benefit of using queue is, it creates decoupling between sender and receiver, means sender and receiver should not be available at the same time while sending the message as messages are stored in a queue. Another benefit is load leveling, which means sender sends messages at a different rate and the receiver can process messages at a different rate, as both are independent in sending and processing.

Now let's see the step by step implementation of the below to design a solution:

  1. Create Service Bus Namespace
  2. Create Queue
  3. Add a message in the queue
  4. Read Message from the queue
  5. Scheduled Message
  6. Receive Mode
  7. Abandoned Async
  8. Dead-letter Queue
Create Service Bus Namespace

Service bus namespace is a container for all messaging components. One namespace contains multiple queues and topics.

  • Log in to the Azure portal via portal.azure.com
  • Click on 'Create a resource' from the left navigation pane, click on 'Integration' and then 'Service Bus'.

    Azure Service Bus
  • Enter the proper name for a namespace
  • Select pricing tier, for demo purpose we have selected Basic
  • Subscription and Resource group, keep it as it is
  • Select your desired location and click on 'Create' button

    Azure Service Bus

Once you create a namespace, you can explore the same from the dashboard where you can find all the details which you have provided while creating a namespace.

Azure Service Bus 

Credentials

Click on 'Shared access policies' from the left panel, click on 'RootManageSharedAccessKey' to explore keys and connection strings which will be used further to connect.

Azure Service Bus
 
Create Queue
  • Click on 'Mobile Recharge' namespace on Dashboard
  • Click on 'Queues' on the left pane and click on '+Queue' to create new Queue.

    Azure Service Bus
  • Enter queue name and click on 'Create' button keeping the rest of the input as it is.

    Azure Service Bus
  • Click on 'Queues' and you will get a list of created queues. You can also find our recently created queue; i.e. recharge.

    Azure Service Bus
Add Message in the QUEUE
  • Create a console application in Visual Studio
  • Set variables, one for connection string which you can copy from Shared access policies section and another is queue name; i.e. recharge
  • In order to do the recharge, we need a mobile number, amount and operator from user
  • Concatenate preceding three values separated with a star(*) 
  • Add Microsoft.Azure.ServiceBus from a NuGet package manager
  • Create queue client using connection string and queue name
  • Convert string message to Azure Service Bus message
  • Using queue client, call SendAsync method to add a message in the queue.
  • Call CloseAsync to close opened connection in finally block.
    1. class Program  
    2.     {  
    3.         static QueueClient queueClient;  
    4.         static void Main(string[] args)  
    5.         {  
    6.             string sbConnectionString = "Endpoint=sb://mobilerecharge.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=KVb9ubc9XaV0dT/1dMj/JGvVvUZ64U21IBI=";  
    7.             string sbQueueName = "Recharge";  
    8.    
    9.             string messageBody=string.Empty;  
    10.             try  
    11.             {  
    12.                 Console.WriteLine("-------------------------------------------------------");  
    13.                 Console.WriteLine("Mobile Recharge");  
    14.                 Console.WriteLine("-------------------------------------------------------");  
    15.                 Console.WriteLine("Operators");  
    16.                 Console.WriteLine("1. Vodafone");  
    17.                 Console.WriteLine("2. Airtel");  
    18.                 Console.WriteLine("3. JIO");  
    19.                 Console.WriteLine("-------------------------------------------------------");  
    20.    
    21.                 Console.WriteLine("Operator:");  
    22.                 string mobileOperator = Console.ReadLine();  
    23.                 Console.WriteLine("Amount:");  
    24.                 string amount = Console.ReadLine();  
    25.                 Console.WriteLine("Mobile:");  
    26.                 string mobile = Console.ReadLine();  
    27.    
    28.                 Console.WriteLine("-------------------------------------------------------");  
    29.    
    30.                 switch (mobileOperator)  
    31.                 {  
    32.                     case "1":  
    33.                         mobileOperator = "Vodafone";  
    34.                         break;  
    35.                     case "2":  
    36.                         mobileOperator = "Airtel";  
    37.                         break;  
    38.                     case "3":  
    39.                         mobileOperator = "JIO";  
    40.                         break;  
    41.                     default:  
    42.                         break;  
    43.                 }  
    44.    
    45.                 messageBody = mobileOperator + "*" + mobile + "*" + amount;  
    46.                 queueClient = new QueueClient(sbConnectionString, sbQueueName);  
    47.                   
    48.                 var message = new Message(Encoding.UTF8.GetBytes(messageBody));  
    49.                 Console.WriteLine($"Message Added in Queue: {messageBody}");  
    50.                 queueClient.SendAsync(message);  
    51.    
    52.                   
    53.             }  
    54.             catch (Exception ex)  
    55.             {  
    56.                 Console.WriteLine(ex.Message);  
    57.             }  
    58.             finally  
    59.             {  
    60.                 Console.ReadKey();  
    61.                 queueClient.CloseAsync();  
    62.             }  
    63.         }  
    64.     }  
  • Run the application and provide the required inputs
  • You can see the message 'Message Added in Queue…'

    Azure Service Bus
  • To check the added message in the portal, explore all the queues
  • Click on the queue in which we have added the message i.e. 'Recharge'
  • We can see 'Active message count' as 1

    Azure Service Bus
Read Message from QUEUE
  • Create a console application in the Visual Studio
  • Set variables, one for connection string which you can copy from Shared access policies section and another is queue name; i.e. recharge
  • Create queue client using connection string and queue name
  • Using queue client, call RegisterMessageHandler which is used to receive messages continuously from the entity. Registers a message handler and begins a new thread to receive messages. This handler has waited every time a new message is received by the receiver.
  • Inside ReceiveMessageAsync, call CompleteAsync which completes a message using its lock token and deletes the message from the queue.
    1. class Program  
    2.     {  
    3.         static QueueClient queueClient;  
    4.         static void Main(string[] args)  
    5.         {  
    6.             string sbConnectionString = "Endpoint=sb://mobilerecharge.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=KVb9ubc9XaV0dT/1dMj/JGvVvUZ64U21IBI=";  
    7.             string sbQueueName = "Recharge";  
    8.    
    9.             try  
    10.             {  
    11.                 queueClient = new QueueClient(sbConnectionString, sbQueueName);  
    12.    
    13.                 var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)  
    14.                 {  
    15.                     MaxConcurrentCalls = 1,  
    16.                     AutoComplete = false  
    17.                 };  
    18.                 queueClient.RegisterMessageHandler(ReceiveMessagesAsync, messageHandlerOptions);  
    19.             }  
    20.             catch (Exception ex)  
    21.             {  
    22.                 Console.WriteLine(ex.Message);  
    23.             }  
    24.             finally  
    25.             {  
    26.                 Console.ReadKey();  
    27.                 queueClient.CloseAsync();  
    28.             }  
    29.         }  
    30.    
    31.         static async Task ReceiveMessagesAsync(Message message, CancellationToken token)  
    32.         {  
    33.             Console.WriteLine($"Received message: {Encoding.UTF8.GetString(message.Body)}");  
    34.    
    35.             await queueClient.CompleteAsync(message.SystemProperties.LockToken);  
    36.         }  
    37.    
    38.         static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)  
    39.         {  
    40.             Console.WriteLine(exceptionReceivedEventArgs.Exception);  
    41.             return Task.CompletedTask;  
    42.         }  
    43.     }   
  • Run the application and you can see the previously added message in the queue.

    Azure Service Bus
  • Check the queue once again in the portal and you can see the 'Active message count' as zero(0) as it was deleted by CompleteAsync. 

    Azure Service Bus
  • Now run both the applications simultaneously and provide the required inputs
  • You can observe that the message will be read by the second application immediately.

    Azure Service Bus
Scheduled Message

Now let's tweak the requirement, where we want to do recharge after a specific time, not now. So for this, we have scheduled message facility where you can schedule a message for a specific time. In the demo, we will set it after 5 minutes.

In order to implement this, we need to comment SendSync method and call ScheduleMessageAsync which expects a message and schedule time.

Modify your code as below,

  1. DateTimeOffset scheduleTime = DateTime.UtcNow.AddMinutes(5);  
  2. queueClient.ScheduleMessageAsync(message,scheduleTime);  
  • Run your application once again
  • Provide required inputs

    Azure Service Bus

We can observe in the portal that the message is not added in the queue as 'Active message count' is zero (0) only.

Azure Service Bus

 

Now wait for 5 minutes and verify once again in a portal. We can see that it's added as 5 minutes have passed.

Azure Service Bus

 

Run the read application and you can get the scheduled message.

Azure Service Bus
Receive Mode

In the next step after receiving the message, you need to process that message; i.e. need to do a recharge. Now assume that while processing the message some exception occurs, then we may lose the request completely. For this situation, we have a Receive Mode facility.

You can specify two different modes as Receive Mode:

  • PeekLock
  • Receive and Delete

PeekLock

In this mode, the message won't be deleted until you call CompleteAsync method, so while processing if any exception occurs, we don't lose the message.

  • Run both the applications simultaneously in debug mode.
  • Provide the required input, and read application shows the added message.

    Azure Service Bus

Let's check the message in the portal, ideally, it was read by read application, so it should not be there in the portal but as Receive Mode is Peeklock, the message will be there.

Azure Service Bus

 

Execute CompleAsync method after processing the message. 

Azure Service Bus

We can see in the portal that the message is not available as we have already called CompleteAsync. 

Azure Service Bus 

ReceiveAndDelete

In Receive and Delete mode, as soon as the message is read, it will be deleted from the queue.

  • Run both applications simultaneously in debug mode.
  • Provide the required input and read application shows the added message.

    Azure Service Bus
  • Don't execute CompleAsync method, just hold the debug point there only.

    Azure Service Bus
  • Now, verify the message in the portal, ideally, CompleAsync is not called, so the message should be there but as we set ReceiveAndDelete mode, the message is deleted as soon as it was read.

    Azure Service Bus
AbandonAsync

Now in your application, whenever an exception occurs during processing you want to reprocess the message again. For this, we have the facility of AbandonAsync.

AbandonAsync abandons a Message using a lock token, this will make the message available again for processing.

To verify this we need to throw an exception explicitly from ReceiveMessageAsync and from the catch block call AbandonAsync method.

  1. static async Task ReceiveMessagesAsync(Message message, CancellationToken token)  
  2.         {  
  3.             try  
  4.             {  
  5.                 Console.WriteLine($"Received message: {Encoding.UTF8.GetString(message.Body)}");  
  6.    
  7.                 int i = 0;  
  8.                 i=i / Convert.ToInt32(message);  
  9.    
  10.                 await queueClient.CompleteAsync(message.SystemProperties.LockToken);  
  11.             }  
  12.             catch(Exception ex)  
  13.             {  
  14.                await queueClient.AbandonAsync(message.SystemProperties.LockToken);  
  15.             }  
  16.         }  

Run the application once again and from the read application we can observe that the same message was read 10 times, this means whenever an exception occurs it was available once again to process.

Azure Service Bus 

You can configure the reprocess count from properties by setting Maximum Delivery Count. Previously it was read 10 times because, by default, Maximum Delivery Count is set to 10.

Azure Service Bus 

Once the 10-time process is over, it is no longer available in the queue, and it will be added in the Dead-letter queue. We can verify the same thing in the portal. 

Azure Service Bus 

Dead-letter Queue

Now, in order to read the messages from the Dead-letter queue for analyzing purposes, the same application is used, you just need to change the queue name as below:

  1. string sbQueueName = "Recharge/$DeadLetterQueue";  
Run the application and it will read all messages available in Dead-letter queue.
 
Azure Service Bus 

We don't find any messages in Dead-letter queue now.

Azure Service Bus 

In the next article, we will discuss Azure Service Bus - Topic.

Note
You can download the complete sample code from here.