Microsoft Azure Service Bus-Topic

Introduction

 
In our previous article, I covered an overview of the service bus, especially on queue & topics. Finally, we saw how the queue works from the coding perspective.
In case you have not read about Service Bus Queues, I highly recommend you to go through the article by referring to here.
 
In case you are not using Azure, I would recommend you go for a free trial using the link here.
 
You can find source code here.
 

Configuring Topic in Azure Portal

 
Let's jump directly into how to create a service bus-topic from the Azure portal.
 
Microsoft Azure Service Bus-Topic
I'm preferring to stay with the default settings while creating the topic i.e. partitioning and duplicate detection are set to disable.
 
Microsoft Azure Service Bus-Topic
 
Microsoft Azure Service Bus-Topic
 
Here, I'm creating two subscriptions, one with Order subscription and another one with Product subscription. In both subscriptions, we have solicited service bus to have the maximum deliveries of messages to 5, however, the remaining settings are set to default. In future articles, we'll cover sessions, dead lettering, etc.
 
Microsoft Azure Service Bus-Topic
 
I have created two shared access policies, one for send and another one for listening (i.e. we'll use send policy in the publisher, whereas the listening policy for consumers (products and orders consumers).
 

Coding

 
I have created 4 projects for this article:
  • ASP.NET Core web api-> Producer
  • 2 Worker Service projects-> 1 for products consumer and another one for orders consumer.
  • .NET core class library-> Common classes
Add a Microsoft.Azure.ServiceBus NuGet package in the producer and consumer (products and orders) projects
 
Firstly, let's tackle the Producer functionality:
  1. public interface IMessagePublisher {  
  2.     Task PublisherAsync < T > (T request);  
  3. }  
  4. public class MessagePublisher: IMessagePublisher {  
  5.     private readonly ITopicClient topicClient;  
  6.     public MessagePublisher(ITopicClient topicClient) {  
  7.         this.topicClient = topicClient;  
  8.     }  
  9.     public async Task PublisherAsync < T > (T request) {  
  10.         var message = new Message {  
  11.             MessageId = Guid.NewGuid().ToString(),  
  12.                 Body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(request))  
  13.         };  
  14.         message.UserProperties.Add("MessageType"typeof(T).Name);  
  15.         await topicClient.SendAsync(message);  
  16.     }  
  17. }  
Message Publisher class is pretty much similar to what we have implemented in the previous article. Instead of IQueueClient, we have to use ITopicClient for topic; moreover, I have added MessageId and custom property(MessageType) to the message header. Later, we will discuss about why I added custom property in detail.
  1. [Route("api/[controller]")]  
  2. [ApiController]  
  3. public class TopicController: ControllerBase {  
  4.     private readonly IMessagePublisher messagePublisher;  
  5.     public TopicController(IMessagePublisher messagePublisher) {  
  6.         this.messagePublisher = messagePublisher;  
  7.     }  
  8.     // POST api/values  
  9.     [HttpPost(template: "product")]  
  10.     public async Task SendToProduct([FromBody] Product product) {  
  11.             await messagePublisher.PublisherAsync(product);  
  12.         }  
  13.         [HttpPost(template: "order")]  
  14.     public async Task SentToOrder([FromBody] Order order) {  
  15.         await messagePublisher.PublisherAsync(order);  
  16.     }  
  17. }  
I have created a pretty straightforward controller with an HTTP verb as post for both product and order.
  1. public void ConfigureServices(IServiceCollection services) {  
  2.     services.AddControllers();  
  3.     services.AddSingleton < ITopicClient > (serviceProvider => new TopicClient(connectionString: Configuration.GetValue < string > ("servicebus:connectionstring"), entityPath: Configuration.GetValue < string > ("serviceBus:topicname")));  
  4.     services.AddSingleton < IMessagePublisher, MessagePublisher > ();  
  5. }  
In the startup class, I have created a dependency injection for ITopicClient and IMessagePublisher.
  1. {  
  2.     "servicebus": {  
  3.         "connectionstring""<ConnectionString Here>",  
  4.         "topicname""<Topic name here>"  
  5.     },  
  6.     "Logging": {  
  7.         "LogLevel": {  
  8.             "Default""Information",  
  9.             "Microsoft""Warning",  
  10.             "Microsoft.Hosting.Lifetime""Information"  
  11.         }  
  12.     },  
  13.     "AllowedHosts""*"  
  14. }  
Here are the domain classes defined in a common project.
  1. public class Order {  
  2.     public int Id {  
  3.         get;  
  4.         set;  
  5.     }  
  6.     public string Name {  
  7.         get;  
  8.         set;  
  9.     }  
  10.     public int Quantity {  
  11.         get;  
  12.         set;  
  13.     }  
  14. }  
  15. public class Product {  
  16.     public int Id {  
  17.         get;  
  18.         set;  
  19.     }  
  20.     public string Name {  
  21.         get;  
  22.         set;  
  23.     }  
  24.     public int Price {  
  25.         get;  
  26.         set;  
  27.     }  
  28.     public string ProductStatus {  
  29.         get;  
  30.         set;  
  31.     }  
  32. }  
Now run the publisher with either products or orders. In my case, I'm running with products:
 
Microsoft Azure Service Bus-Topic
 
Now verify the message count in the subscription:
 
Microsoft Azure Service Bus-Topic
The good part is that we are getting the message count as 1 but wait we have the message count as 1 for orders too. Weird, we are sending the message for products but we have received a message for orders as well. Any guess as to why?
 
The answer is because we don't have any filters for the topic.
 
Let's understand what a filter is in topic. Subscribers have to define which message they want from the topic is called a filter. Each of the newly created topic subscriptions has an initial default subscription rule. If in case, you don't explicitly specify a filter condition for the rule, the applied filter is 1=1 that enables all messages to be selected into the subscription.
 
There are 2 types of filters in topics:
  • SQL Filters- SQL filters hold SQL like conditional expressions against system and user properties(custom properties). Here is the answer, why we used custom property in the message.
  • Correlational Filters- Correlation filters are used to match against one or more message systems or user properties.
In these articles, we'll be focused on both filters.
 
Microsoft Azure Service Bus-Topic
 
By default, the default filter is applied i.e. 1=1 or it will accept all the incoming messages.
 
Let's add a new filter:
 
Microsoft Azure Service Bus-Topic
 
For products, I have used the SQL filter. In order to receive a message from the topic, I have set MessageType='Product'
 
Microsoft Azure Service Bus-Topic
 
For orders, I have used a Correlation filter. I have specified the custom properties to receive a message from the topic.
 
Now run the postman and see the result. Now I'm running for products:
 
Microsoft Azure Service Bus-Topic
Here we go, the message has been received for product subscription. The publisher seems to be working fine.
 
Now, let's move our focus on both the consumers. First, let's begin with products consumer.
  1. public static IHostBuilder CreateHostBuilder(string[] args) =>  
  2. Host.CreateDefaultBuilder(args)  
  3. .ConfigureServices((hostContext, services) =>  
  4. {  
  5.    services.AddHostedService<Worker>();  
  6.    services.AddSingleton<ISubscriptionClient>(serviceProvider => new SubscriptionClient(  
  7.    connectionString: "<ConnectionString Here",  
  8.    topicPath: "<Topic Name here>", subscriptionName: "ProductSubscription"));  
  9. });  
We have created dependency injection for ISubscriptionClient by passing connection string, topic name and subscription name.
 
Note
You have to use listen key of the shared access policy for the connection string.
  1. public class Worker: BackgroundService {  
  2.         private readonly ILogger < Worker > _logger;  
  3.         private readonly ISubscriptionClient subscriptionClient;  
  4.         public Worker(ILogger < Worker > logger, ISubscriptionClient subscriptionClient) {  
  5.             _logger = logger;  
  6.             this.subscriptionClient = subscriptionClient;  
  7.         }  
  8.         protected override async Task ExecuteAsync(CancellationToken stoppingToken) {  
  9.             subscriptionClient.RegisterMessageHandler((message, token) => {  
  10.                 _logger.LogInformation($ "message id:{message.MessageId}");  
  11.                 _logger.LogInformation($ "message body:{Encoding.UTF8.GetString( message.Body)}");  
  12.                 var product = JsonConvert.DeserializeObject < Common.Product > (Encoding.UTF8.GetString(message.Body));  
  13.                 //Perform operation here ex: DB operation etc  
  14.                 return subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);  
  15.             }, new MessageHandlerOptions(ex => {  
  16.                 _logger.LogError(ex.Exception, ex.Exception.Message);  
  17.                 return Task.FromException(ex.Exception);  
  18.             }) {  
  19.                 AutoComplete = false,  
  20.                     MaxConcurrentCalls = 1  
  21.             });  
  22.         }  
You need to register the message handler in order to receive messages from the topic. In the above snippet, we are converting the message body from bytes to product object. Don't forget to specify AutoComplete=false other you will end up with an exception.
 
Similarly, we have to create for Order subscription as well. There are absolutely no change in terms of code other than specifying subscription name in the program class.
  1. public static IHostBuilder CreateHostBuilder(string[] args) =>  
  2. Host.CreateDefaultBuilder(args)  
  3. .ConfigureServices((hostContext, services) =>  
  4. {  
  5.    services.AddHostedService<Worker>();  
  6.    services.AddSingleton<ISubscriptionClient>(serviceProvider => new SubscriptionClient(  
  7.    connectionString: "<ConnectionString Here",  
  8.    topicPath: "<Topic Name here>", subscriptionName: "ProductSubscription"));  
  9. });  
Finally, we have managed to put all the code changes in place. Run the application and verify for both the products and orders.
 
I hope you liked the article. In case you find the article interesting, then kindly like and share it.