Working With Topic And Subscription - ​Azure Service Bus

Introduction

 
Azure Service Bus is a fully managed multi-tenant cloud messaging service. It is an enterprise integration message broker. It is used to decouple the application and service from each other. The service bus topic and subscriptions support a publish/subscribe messaging communication model. The topic may have multiple independent subscribers and topic subscribers can receive the copy of each message sent to the topic. In this model, the components of a distributed application communicate with each other via topics. The topic acts as an intermediary.
 
In the service bus queue, each message is processed by a single consumer. It means that the service bus queue provides one-to-one (point-to-point) communication. The topic/subscriptions provide one-to-many communication using a publish/subscribe messaging model. When a message is sent to the topic, it is available to each subscriber to handle or process independently. We can also define filter rules for a topic on a per subscription basis that allow us to restrict the message received by topic subscription.
 
Working With Topic And Subscription - ​Azure Service Bus
 
Namespace is the scoping container for all message components. The topic and subscriptions are also created under the namespace. In my previous article, I have explained about "what Azure service bus is and how to create namespace". Please take note that topics and subscriptions are only available when we choose either the Standard or Premium pricing tier while creating the namespace.

 
Create a topic using the Azure portal


To create a topic, click on the "Topic" button. We can view all the topics under the namespace in "Entities >> topics" tab. On the "Create new topic" screen, there are multiple fields that need to be filled up, such as name, max topic size, message time to live, and other option.
 
Working With Topic And Subscription - ​Azure Service Bus
 
Working With Topic And Subscription - ​Azure Service Bus 
 
Under topic, we can create one or more subscriptions. To create subscriptions, select the topic that you want to create a subscription for and click on "Subscriptions" either from the left menu or from the toolbar.
 
Working With Topic And Subscription - ​Azure Service Bus
 
Working With Topic And Subscription - ​Azure Service Bus 
 
On the "Create new topic" screen, there are multiple fields that need to be filled up, such as name, message time to live, lock duration, max delivery count, and other options. We can create multiple subscriptions as we need.
 
In the following example, I have created a sender that sends a message to the topic and one receiver application (subscription) that receives the message from the topic and processes it. Here, I am using .NET Core console application to send and receive the message.
 

Code to send a message to the topic

 
At the initial part of the code, I have created an instance of TopicClient using connection string and topic name. The TopicClient is available in Microsoft.Azure.ServiceBus NuGet package. It can be downloaded using the NuGet Package Manager.
 
Working With Topic And Subscription - ​Azure Service Bus
 
Using the SendAsync method of the topic client, we can send the message to the topic. In the following example, I have sent multiple messages (currently 5) to the topic.
  1. static ITopicClient topicClient;  
  2. static void Main(string[] args)  
  3. {  
  4.     string connectionStringServiceBus = "<<connection String Service Bus>>";  
  5.     string topicName = "<< Topic Name >>";  
  6.   
  7.     SendMessage(connectionStringServiceBus, topicName).GetAwaiter().GetResult();  
  8. }  
  9.   
  10. static async Task SendMessage(string connectionStringServiceBus, string topicName)  
  11. {  
  12.     const int numberOfMessages = 5;  
  13.     topicClient = new TopicClient(connectionStringServiceBus, topicName);  
  14.   
  15.     Console.WriteLine("======================================================");  
  16.     Console.WriteLine("Press any key to exit after sending all the messages.");  
  17.     Console.WriteLine("======================================================");  
  18.   
  19.     // Send Messages  
  20.     await SendMessagesToQueueAsync(numberOfMessages);  
  21.   
  22.     Console.ReadKey();  
  23.   
  24.     await topicClient.CloseAsync();  
  25. }  
  26.   
  27. static async Task SendMessagesToQueueAsync(int numberOfMessages)  
  28. {  
  29.     try  
  30.     {  
  31.         for (var i = 0; i < numberOfMessages; i++)  
  32.         {  
  33.             // Message that send to the queue  
  34.             string messageBody = $"GNR-MSG dataid:{i}";  
  35.             var message = new Message(Encoding.UTF8.GetBytes(messageBody));  
  36.   
  37.             Console.WriteLine($"Sending message to queue: {messageBody}");  
  38.   
  39.             // Send the message to the queue  
  40.             await topicClient.SendAsync(message);  
  41.         }  
  42.     }  
  43.     catch (Exception exception)  
  44.     {  
  45.         Console.WriteLine($"Exception: {exception.Message}");  
  46.     }  
  47. }  
When we run the sender application and check the Azure portal (select topic under the namespace then goto Overview window), all the subscriptions are listed in the bottom of this window and the "Active Message Count" field contains the count of messages in the queue.
 
Working With Topic And Subscription - ​Azure Service Bus
 

Receive messages from the subscription

 
In the same way as sender application, I have created an instance of TopicClient using connection string and topic name. Also, I have created the instance of SubscriptionClient using connection string, topic name, and subscription name. To read the message from subscription, we need to register a MessageHandler (using RegisterMessageHandler method of subscription client). The RegisterMessageHandler contains two parameter async task for process messages and message handler option. The first parameter of this function is defined as a function to process the received messages. The CompleteAsync method of subscription client completed a message and then delete from the subscription. Following is receiver code.
  1. static ITopicClient topicClient;  
  2. static ISubscriptionClient subscriptionClient;  
  3. static void Main(string[] args)  
  4. {  
  5.     string connectionStringServiceBus = "<<connection String Service Bus>>";  
  6.     string topicName = "<<Topic Name>>";  
  7.     string SubscriptionName = "<<Subscription Name>>";  
  8.   
  9.     subscriptionClient = new SubscriptionClient(connectionStringServiceBus, topicName, SubscriptionName);  
  10.   
  11.     Console.WriteLine("======================================================");  
  12.     Console.WriteLine("Press any key to exit after receiving all the messages.");  
  13.     Console.WriteLine("======================================================");  
  14.   
  15.     topicClient = new TopicClient(connectionStringServiceBus, topicName);  
  16.   
  17.     RegisterMessageHandlerAndReceiveMessages();  
  18.   
  19.     Console.ReadKey();  
  20.   
  21.     topicClient.CloseAsync().Wait();  
  22. }  
  23. static void RegisterMessageHandlerAndReceiveMessages()  
  24. {  
  25.     // Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.  
  26.     var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)  
  27.     {  
  28.         MaxConcurrentCalls = 1,  
  29.         AutoComplete = false  
  30.     };  
  31.   
  32.     // Register the function that will process messages  
  33.     subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);  
  34. }  
  35.   
  36. static async Task ProcessMessagesAsync(Message message, CancellationToken token)  
  37. {  
  38.     // Process the message  
  39.     Console.WriteLine($"Received message: Sequence Number:{message.SystemProperties.SequenceNumber} \t Body:{Encoding.UTF8.GetString(message.Body)}");  
  40.   
  41.     await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);  
  42. }  
  43. // Use this Handler to look at the exceptions received on the MessagePump  
  44. static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)  
  45. {  
  46.     Console.WriteLine($"Exception:: {exceptionReceivedEventArgs.Exception}.");  
  47.     return Task.CompletedTask;  
  48. }  
Output

Working With Topic And Subscription - ​Azure Service Bus
 

Summary


The topic/subscriptions provides one-to-many communication using a publish/subscribe messaging model. This is more useful for scaling to a large number of recipients (each message sent to the topic is available to each subscription registered with the topic). We can also set filter rules per subscription to restrict the message processed by the subscriptions.
 
You can view and download the source code from GitHub.