Everything You Need to Know About Azure Service Bus Brokered Messaging: Part 3

Before reading this article, I highly recommend reading the previous part:



If you have stayed with us through the journey, you know by now there is a lot of information about Azure Service Bus Brokered Messaging. Though we have covered a baulk of the ground work, there is still quite a bit to know about the Service Bus. In this 3rd and final part, we will be looking at some of the advanced features and patterns for brokered messaging as well as security and best practices. You can easily get up to speed with areas already covered in Part 1 and Part 2.

We have built a foundation on which we can now easily leverage the power of Service Bus Queues as well as Topics and Subscriptions to distribute information between disconnected systems. In addition, we have seen how we can further control information flow based on Subscription rules as well transform the data appropriately. But what about efficiency? Transactions? Faulty message handling? Well, these and many other common scenarios can be handled by the features provided by services and just some of the following areas were going to cover:

  • Batch Processing
  • Prefetching
  • Transactions
  • Dead-Lettering
  • Auto-Forwarding
  • Security
  • Best Practices

Batch Processing

With Service Bus Queues and Topics we have only looked at processing messages on an individual basis. However, it’s quite common to process multiple messages at a time and acquire a higher level of operational efficiency. You would think this would have a financial impact, but according to their documentation batching does not affect billable transactions.

It is also important to note that batching is only available through the Service Bus Messaging Protocol (SBMP) protocol. Before you start falling over the fact that HTTP was not mentioned, understanding that when using the Service Bus Client library, by default you are using SBMP unless you explicitly set the protocol to HTTP.

In most cases, batching is perceived as strictly a producer or consumer operation. But in Azure Service Bus, batching can refer to additional internal Service Bus operations. So let’s look at all these different scenarios in turn.

Client Side

Sending Messages in Batches

Reminder, in order to focus on the topic at hand, the exception handling has been removed. Be aware of how .Net handles exception when it comes to Tasks.

As you can expect there isn’t much with the act of sending a batch of messages:

  1. List<BrokeredMessage> messages = new List<BrokeredMessage>();  
  2. BrokeredMessage taylorSwift = new BrokeredMessage("Taylor Swift Request");  
  3. BrokeredMessage justinBieber = new BrokeredMessage("Justin Bieber Request");  
  4. BrokeredMessage britneySpears = new BrokeredMessage("Britney Spears Request");  
  5.   
  6. messages.Add(taylorSwift);  
  7. messages.Add(justinBieber);  
  8. messages.Add(britneySpears);  
  9.   
  10. MessageSender messageSender = messagingFactory.CreateMessageSender(_topicName);  
  11.   
  12. await messageSender.SendBatchAsync(messages);  
However, a batch of messages are also bound to the max size limit of a message. Therefore, batching will cut off the messages that exceed that limit and place them as the first messages of the next batch. In addition, internally, batching is processed based on the BatchFlushInterval property of the MessagingFactory and defaults to a 20ms interval. This can be changed to be more efficient when the situation calls for it by changing this property when creating the MessagingFactory (see Best Practices for scenarios to fine tune).

This will affect all clients that are creating from this MessagingFactory:
  1. MessagingFactorySettings messagingFactorySettings = new MessagingFactorySettings  
  2. {  
  3. NetMessagingTransportSettings = {BatchFlushInterval = TimeSpan.FromMilliseconds(100)}  
  4. };  
  5.   
  6. MessagingFactory messagingFactory = MessagingFactory.Create(uriAddress, messagingFactorySettings );   
Now, any client (MessageSender, MessageReceiver, QueueClient, TopicClient, SubscriptionClient) created by this MessagingFactory will be influenced by this setting. You might be reading those clients and be thinking, “Wait, aren’t we talking about sending!” That’s correct, but the batch flush interval affects both Sending and message Complete operations, which is a good time to look at Receiving.

Receiving Messages in Batches

The process of receiving messages in batches isn’t complex. We can receive messages and process as we did in our previous examples except in batches.
  1. MessageReceiver messageReceiver = await _messagingFactory.CreateMessageReceiverAsync(queueName);  
  2. try  
  3. {  
  4.    while (!cancellationToken.IsCancellationRequested)  
  5.    {  
  6.       IEnumerable<BrokeredMessage> messages = await messageReceiver.ReceiveBatchAsync(batchCount).ConfigureAwait(false);  
  7.       if (messages != null)  
  8.       {  
  9.          foreach (var brokeredMessage in messages)  
  10.          {  
  11.             if (brokeredMessage.LockedUntilUtc == DateTime.UtcNow) await brokeredMessage.RenewLockAsync();  
  12.                await ProcessAndReleaseMessageAsync(brokeredMessage);  
  13.             }   
  14.          }  
  15.   
  16.          await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);  
  17.       }  
  18.    }  
  19.    catch (Exception ex)  
  20.    {  
  21.       Console.WriteLine(ex.Message);  
  22.       throw;  
  23.    }  
  24.   
  25.    private async Task ProcessAndReleaseMessageAsync(BrokeredMessage message)  
  26.    {  
  27.       MessageProcessingAction action = MessageProcessingAction.Abandon;  
  28.   
  29.       try  
  30.       {  
  31.          //Process Message  
  32.      
  33.          action = MessageProcessingAction.Complete;  
  34.          await UpdateMessageState(message, action);  
  35.       }  
  36.       catch (Exception ex)  
  37.       {  
  38.          //log error  
  39.       }  
  40.       finally  
  41.       {  
  42.          //if something fails update with abandon   
  43.          //C# 6.0 allows await calls in a finally blocks  
  44.          UpdateMessageState(message, action);  
  45.       }  
  46.    }  
  47.   
  48.    private async Task UpdateMessageState(BrokeredMessage message, MessageProcessingAction action)  
  49.    {  
  50.       switch (action)  
  51.       {  
  52.          case MessageProcessingAction.Complete:  
  53.          await message.CompleteAsync();  
  54.          break;  
  55.          case MessageProcessingAction.Abandon:  
  56.          await message.AbandonAsync();  
  57.          break;  
  58.          case MessageProcessingAction.Deadletter:  
  59.          await message.DeadLetterAsync();  
  60.          break;  
  61.          default:  
  62.          await message.AbandonAsync();  
  63.          break;  
  64.       }  
  65.   
  66. }  
When batching incoming messages be aware that messages are still subjected to the individual lock duration if retrieving through the PeekLock mode and we need to ensure that the number of messages we pull back can be processed within that duration time. If not, we can find messages at the end expiring before they can be processed.

Controlling Azure Store Batch Processing

Up until now, all our batch processing has occurred on the client side. However, when the Service Bus service receives messages it has the capability to batch those messages when sending to the internal store. This only applies to Sending and Completion operations. The ability to have the Service Bus batch its message persistence to the internal store only requires setting the EnableBatchedOperations flag on the Queue, Topic or Subscription.
  1. TopicDescription topicDescription = new TopicDescription(topicPath)  
  2. {  
  3.    EnableBatchedOperations = true  
  4. };   
This can be provided either when the topic is created (see Creating Topics) or even provided as an update to an existing Topic. Again, this is just an example of providing for a Topic but is also an available property for Queues and Subscriptions.

Batching is one of those areas that are generally talked about when talking about improvements in application efficiency. Another feature of Service Bus that directly can contribute to efficiency and make a major contribution in that area is Prefetching and closely related to the batching we have been discussing. I like to think of it as more of a proactive form of batching.

Prefetching

With the exception of our batch example, all our examples of fetching messages from a Queue or Subscription have been per call and incur the cost of a round trip to Azure for each request. Imagine preloading messages to a local cache to your client when you make request for a single message and all subsequent message requests for another message pulls from the local cache until you run out of messages in the local cache. This is roughly what you are setting up when enabling Prefetching.

Not only will you see an immediate improvement in efficiency with processing available messages because you are receiving them from the local cache, but there are no interface changes that you need to make with how you receive messages. The only change incurs when you create your receiver client (QueueClient, SubscriptionClient or MessageReceiver) in setting the PrefetchCount property. This can also be set at the MessagingFactory level and cascade down to all clients that are created by the factory.
  1. MessageReceiver messageReceiver = await _messagingFactory.CreateMessageReceiverAsync(queueName);  
  2. messageReceiver.PrefetchCount = 200;   
The PrefetchCount count is not an arbitrary number but should be calculated based on Microsoft recommended formula (as a starting point). The formula is 20x the total number of messages that a single receiver can process per second. So if a receiver can process 5 messages/sec = 20×5 = 100 PrefetchCount. If setting this at the MessagingFactory level, the number should be multiplied by the number of clients being created by the factory. Therefore, it might be easier to be set at the client level if the number of clients are unknown.

Something to be very aware of, there is a default of 60 second lock duration per message at the server which can be extended up to 5 minutes. If a message is not processed before it hits the expiration it will be made available on the server for another client to process. However, the client who has cached that message will not know this and will receive the message (from local cache) and will receive an exception if attempting to process after the message’s Time-To-Live has expired.

Example of receiving time in milliseconds for 3 messages without Prefetching:
  1. Time taken to get message: 1126 in ms.
  2. Time taken to get message: 79 in ms.
  3. Time taken to get message: 157 in ms.
But after setting the PrefetchCount:
  1. Time taken to get message: 750 in ms.
  2. Time taken to get message: 0 in ms.
  3. Time taken to get message: 0 in ms.
I can’t account for why the overhead is higher for the first message of the first batch where there is no Prefetch occurring. However, when Prefetch is enabled it will have a higher overhead for the first message because of the increased message size. But obviously, we can see the drop in all subsequent messages that are received. Now imagine this over a large number of messages.

Transactions

We can wrap our Service operations in a transaction to ensure an all-or-none scenario when it comes to completing a number of operations. Take for example, the need to ensure that either all messages send or none. For example, if we were using a correlation pattern by setting the CorrelationId propery of a message, in conjunction with a CorrelationFilter for a subscription, we might want to ensure that all the messages in concert are delivered or none.

As in the case where you had a number of messages:
  1. MessagingFactory factory = MessagingFactory.CreateFromConnectionString(AccountInfo.ConnectionString);  
  2. MessageSender messageSender = factory.CreateMessageSender(_topicName);  
  3.   
  4. BrokeredMessage justinBieber = new BrokeredMessage() {CorrelationId = "Music Awards"};  
  5. justinBieber.Properties.Add("RequestedSinger""Justin Bieber");  
  6.   
  7. BrokeredMessage taylorSwift = new BrokeredMessage() {CorrelationId = "Music Awards"};  
  8. taylorSwift.Properties.Add("RequestedSinger""Taylor Swifht");  
  9.   
  10. BrokeredMessage britneySpears = new BrokeredMessage() {CorrelationId = "Music Awards"};  
  11. britneySpears.Properties.Add("RequestedSinger""Britney Spears");  
  12.   
  13. List<BrokeredMessage> messages = new List<BrokeredMessage>() {justinBieber, taylorSwift, britneySpears};   
And you wanted them all to send or none;
  1. using (var transaction = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled)) {  
  2.    foreach (BrokeredMessage message in messages)  
  3.    {  
  4.       messageSender.SendAsync(message);  
  5.    }  
  6.   
  7.    transaction.Complete();  
  8. }   
If an exception does occur, none of the messages will publish on the Topic. Take note of the TransactionScope that takes a TransactionScopeAsyncFlowOption enum. This allows ambient transactions to flow across thread continuations.

Dead Lettering

When messages are malformed or time-to-live for a message has expired, a rule filter evaluation exception occurs or any other reason why a brokered message might not get processed off a Queue or Subscription, we might want to capture that message for further analysis. Service Bus Queues and Subscriptions provide a Dead-Letter failover for just this purpose. However, it is not enabled by default.

There is actually a lot that can be discussed regarding patterns used for dead-lettering messages. Generally, these patterns differ from either the Service Bus service dead-lettering a message based on conditions or a consumer application explicitly moves a message to the dead-letter queue.

We could beat this topic into the ground with all the nuances, but let’s keep it simple. There are just a few basic points you need to understand to take advantage of the dead-lettering capabilities of Service Bus.
Implicit Service Dead-Lettering

In order for implicit dead-lettering to be carried out by the Service Bus, dead-lettering must be enabled on the Topic, Queue or Subscription. This can be done at creation or through an update of one of these entities:
  1. SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName) {  
  2.    DefaultMessageTimeToLive = TimeSpan.FromSeconds(30),  
  3.    LockDuration = TimeSpan.FromSeconds(90),  
  4.    EnableDeadLetteringOnMessageExpiration = true,  
  5.    EnableDeadLetteringOnFilterEvaluationExceptions = true  
  6. };   
Note: Shown is a SubscriptionDescription, but applies also to a QueueDescription and TopicDescription

You’ll see here that there is actually more than just one dead-letter related property that can be set. EnableDeadLetteringOnMessageExpiration allows for the Service Bus service to move messages that have expired based on their TimeToLive property. While EnableDeadLetteringOnFilterEvaluationExceptions allows the capture of messages when a Subscriptions rule condition throws and exception on the evaluation of a message. There is also one other time that the service will implicitly move a message to dead-letter queue. This is when the number of times a message has been delivered is equal to the MaxDeliveryCount property of a Queue, Topic or Subscription.

Explicitly Consumer Dead-Lettering

In addition to the service moving messages to a dead letter queue, a consumer application has the capabilities to determine if a message should be moved to the dead-letter queue. This is easily accomplished by calling the Abandon/AbandonAysnc method of a message. You have actually already seen this when we were receiving and processing messages in the number of different examples:
  1. await message.DeadLetterAsync();  
  2. or provide a reason and description:   
  3. await message.DeadLetterAsync("Popularity""Number of request for singer has not reached minimum");  
Retrieving Dead Letter Messages

Finally, when we want to retrieve messages that have been moved to the dead letter queue, were in luck, because this does not change from how we have created receivers in the past. The only difference is we will need to acquire the path to the dead-letter queue and provide that as the Path for the creation method:
  1. string deadLetterPath = SubscriptionClient.FormatDeadLetterPath(topicPath, subscriptionName);  
  2. MessageReceiver messageReceiver = await _messagingFactory.CreateMessageReceiverAsync(deadLetterPath);   
Take note that we are calling the static FormatDeadLetterPath method off of SubscriptionClient.

Acquiring a MessageReceiver should look familiar and from here we can acquire any messages in the Dead-Letter queue.

Auto-Forwarding

Throughput and scaling, two very important areas that happens to be the problem the Auto-Forwarding feature is poised to answer. This is one of the most powerful features and one that can easily make the most impact on a high volume system with the least amount of effort.

See, implementing Auto-Forwarding is very simple. It’s simply setting the ForwardTo property of a queue, topic and subscription. It does exactly what it sounds like, automatically forwards messages it receives onto the destination. However, it’s leveraging Auto-Forwarding in your architecture that the power of this feature shows. Let’s examine this a little closer to understand how.
  1. TopicDescription topic = new TopicDescription(topicPath);  
  2. await _namespaceManager.CreateTopicAsync(topic);  
  3.   
  4. SubscriptionDescription subscription = new SubscriptionDescription(topicPath, subscriptionName)  
  5. {  
  6.    ForwardTo = topic.Path,  
  7. };  
  8.   
  9. await _namespaceManager.CreateSubscriptionAsync(subscription, ruleDescription);   
The one key point to take away from the above example is that the destination must exist before either you update a subscription with a defined ForwardTo field or create a new one with the field defined.

First, you need to understand that a topic’s throughput is limited to how many messages it can process at any given time. Naturally, if you can disperse the processing of messages, you can increase the throughput, and that is exactly what Auto-Forwarding implicitly allows. Let’s look at an example.

Imagine you’re a freight company and you manage the locations of your trucks nationwide. You have broken your country into 5 regions and based on a trucks current geographic coordinates route information on a truck to a regional distribution center subscription. In turn, this regional subscription would Auto-Forward the message to a topic that would have n localized distribution centers subscriptions.



With our example, we could have hundreds or thousands of local distribution centers. Currently a topic’s subscription limit is 2,000. Therefore, instead of requiring a single topic to process messages of upwards of 2,000 subscriptions significantly reducing throughput, we can disperse that workload across 5 subscriptions that in turn will forward to a topic who will be responsible for a smaller subset of local distribution center subscriptions. Not only will we achieve an exponentially better throughput because we have reduced the number of subscriptions to process against for a single topic, but we also will remove the subscription limitation of 2,000.

This also can demonstrate the scale out capabilities that Auto-Forwarding provides. If a new local or regional distribution center comes online, the ease in which to add that subscription to the proper topic and update/add rules to account for the new distribution center is fairly straight forward.

Security

In Part 1 we actually had a quick taste of some of the security protocols that Service Bus provides out-of-the-box in the form of a TokenProvider. The TokenProvider is the governing piece that will allow us to implement the form of security controls that we want. Whether that be in the form of Shared Access Signatures which we have talked about at length for other services, or backed by a Access Control Service such as Identity Providers (Windows Live ID, Google, Yahoo…etc.) or Federation services such as AD FS 2.0.

However, that being said, you can read Microsoft’s view on Service Bus security and how they have recently (Sept. 2014) changed it to heavily favor Shared Access Signatures (SAS). Therefore, we will also scope this security discussion around Shared Access Signatures as well. At the end of this portion, I’ll make some additional information available for those interested in other security controls.

If you have been following my previous topics on Azure Services such as Blob and Table Storage just to name a few, you will be somewhat familiar with the concept of Shared Access Signatures. If you are not, read up on the MSDN version. A Shared Access Signature (SAS) provides a granular level of access control to specific resources. They allow you to specify what access rights a consumer has for a specific resource for a given amount time. As in other Azure services the Shared Access Signature is a string that is made up of the details of the access along with a hash that is used by the Azure service to authenticate the Shared Access Signature being passed to the service.

Instead of repetitiously defining exactly what rights a signature grants to a consumer of the Shared Access Signature, we can use predefined policies that are stored on Azure. These policies act as templates for providing privileges to a specific Service Bus resource. as well as allow us to easily revoke access by removing individual stored policies. The resources that we can provide access to constitute two levels within the Service Bus; the Namespace and Entities.

The Namespace resource is the same one we define in Part 1 while the entities constitute resources such as Topics and Queues. However, since programmatically creating stored policies at the Service Bus Namespace requires working with uploaded Certificates through the Azure Portal, I am focus mainly on creating Shared Access Policies for entities. However, if you are interesting in learning about uploading a certificate and creating Namespace Shared Access Policies.

However, you can easily create a Namespace level Shared Access Policy through the portal by going to the Configure section under the Service Bus Namespace you created as shown below:



It is here in the portal that you can create Namespace level policies that can provide any combination of Manage, Send, or Listen privileges. external client.

Creating Stored Policies

Following the Principle of Least Privilege, let’s look at creating and saving stored Policies at the entity level. Then we can look at consuming those policies to create a Shared Access Signature which can be used or provided to a consumer. When it comes to creating a policy there isn’t much to it.
  1. TopicDescription topicDescription = await _operations.CreateOrRetrieveTopicAsync(topicPath);  
  2. topicDescription.Authorization.Add(new SharedAccessAuthorizationRule(“SendSingerRequests”,  
  3. SharedAccessAuthorizationRule.GenerateRandomKey(), new[] { AccessRights.Send ));  
  4.   
  5. await _namespaceManager.UpdateTopicAsync(topicDescription );  
Again, CreateOrRetrieveTopicAsync is just a helper method to either pass back an existing TopicDescription or create it if it doesn’t exist. From there we simply are adding a new AuthorizationRule as a SharedAccessAuthorizationRule to the Authorization property of the TopicDescription. This SharedAccessAuthorizationRule is created by providing the name of the policy as well as an array of AccessRights enumerations.

In this case we are creating a Stored Policy with the name “SendSingerRequests” with the right to send on the topic which we can see if we navigate to the Topic under the Namespace for the Service Bus in the Azure portal:



Now that we have created a stored policy for a Topic, we can easily generate a Shared Access Signature which can be provided to a consumer for authenticity.

Generating Shared Access Signatures from Policies

Generating a Shared Access signature just requires the service knowing the endpoint URI, policy name and either the primary or secondary key along with a timespan of how long the Shared Access Signature is valid.
  1. Uri serviceUri = ServiceBusEnvironment.CreateServiceUri("sb""shopit"string.Empty);  
  2. string generatedSaS = SharedAccessSignatureTokenProvider.GetSharedAccessSignature(  
  3. policyName,  
  4. policyKey,  
  5. serviceUri.ToString().Trim('/'),  
  6. TimeSpan.FromHours(8));  
A lot of this should already look very familiar to you back in Part 1 where we showed various ways to create the NamespaceManager. From here we can provide the string Shared Access Signature to a consumer to use in creating their TokenProvider.

Consuming a Shared Access Signature

Having the generated Shared Access Signature in hand, we can provide it to a consumer that might want to send messages to our Topic which they can go about consuming it when they generate a TokenProvider:
  1. TokenProvider tokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(generatedSaS);  
  2. MessagingFactory factory = MessagingFactory.Create("sb://<yourNamespace>.servicebus.windows.net", tokenProvider);  
  3. MessageSender messageSender = factory.CreateMessageSender(_topicName);  
From here they can send messages to the topic for as long as the Time-To-Live was set for the Shared Access Signature when it was generated above (example of 8 hrs.).

Additional Resources

As I mentioned, there is other Access Control Services available Identity Providers, Federation services to name a couple. Below are some additional resources for more information on those extended areas of access control:
  1. http://blogs.msdn.com/b/jimoneil/archive/2012/04/24/fun-with-the-service-bus-part-1.aspx
  2. http://azure.microsoft.com/en-us/documentation/articles/active-directory-dotnet-how-to-use-access-control/
  3. http://blogs.msdn.com/b/servicebus/archive/2014/09/03/change-to-azure-service-bus-portal-default-authentication-mechanism-for-service-bus-namespaces-now-sas.aspx
  4. http://www.developerfusion.com/article/121561/integrating-active-directory-into-azure/
Best Practices

Protocols

There are number of protocols that the Service Bus can operate under. By default the SDK utilizes the Service Bus Messaging Protocol which provides the best performance and features. Unless yours or your consumers operations are limited to HTTP/S, then utilize the SBMP.

Factories, Clients and Abstractions

I haven’t mentioned this before, but there is a significant amount of overhead when creating factories such as MessagingFactory and clients such as QueueClient, TopicClient and SubscriptionClient as well as their MessageSender and MessageReceiver abstractions. Therefore, when creating these resource, attempt to fully utilize them without needlessly recreating. An example would be recreating a MessageSender for every message that you need to send.

Furthermore, understand that when you terminate a factory such as a MessagingFactory, it also closes all the entities it was responsible for creating.

Abstractions

When you absolutely don’t need to utilize the direct clients such as QueueClient, TopicClient and SubscriptionClient use their abstractions MessageSender and MessageReceiver. Doing so, you don’t need to worry about whether you sending to a queue or to a topic, receiving from a queue or a subscription.

Operations

At the beginning of this series I mentioned it, but I thought it was worth repeating the importance of taking advantage and utilizing the asynchronous operations provided by the SDK when possible. There is a significant performance improvement when using the provided asynchronous methods.

Message Size

Finally, the default of a message size is 256kb total which includes both the body and properties (system and custom) with a max property size of 64kb. While conceivably you can increase this size, proceed with caution. If there is any reason or possibility that a future requirement or current client that will need to utilize HTTP as their protocol, this can have a negative impact. Service Bus messages are designed for efficiency and Service Bus messages are designed to be compatible with HTTP header size specifications.

Batching

It’s not quite 1 or 0, true or false, black or white when it comes to batching and being efficient. Batching is only available for send and completion operations But here are some performance considerations.

Low Throughput - Disable batching by setting the batch flush interval to 0.

High Throughput - Set the BatchFlushInterval to 50ms

Multiple Senders/Receivers - When using multiple senders/receivers, set the BatchFlushInterval to 100ms.

To adjust the flush interval for batch processing, you set the BatchFlushInternval property on the MessagingFactorySettings object that is then passed into the creation of a MessagingFactory (see Batching):

Conclusion

This has been one long series on a very large Azure Service that I wish I could say I have covered every corner regarding the service, but the truth is that there is just so much to talk about that it would take more than a 3 part series to do that. However, be assured that you have been shown a vast majority of the features of Azure Service Bus. Now, there are closely related topics that are services built on top of Azure Service Bus such as Event and Notification hub as well as service bus patterns that weren’t covered such as request-response patterns just to name a few. But, hopefully the information you have garnished from the three part series will teach you more than just the basics of using Azure Service Bus.