Road To AZ-204 - Developing Message-Based Solution

Intro

 
This article's intention is to explain the main skills measured in this sub-topic of the AZ-204 Certification. Azure Service Bus and Azure Queue Storage are the main components that will have their fundamentals explained here alongside a practical example.
 
This certification is very extensive and this article approaches only the main topics, make sure you know those components deep down before taking the exam. Another great tip is doing exam simulators before the official exam in order to validate your knowledge.
 

What is the Certification AZ-204 - Developing Solutions for Microsoft Azure?

 
The AZ-204 - Developing Solutions for Microsoft Azure certification measures designing, building, testing, and maintaining skills of an application and/or service in the Microsoft Azure Cloud environment. It approaches, among others, those components,
  • Azure Virtual Machines;
  • Docker;
  • Azure Containers;
  • Service Web App;
  • Azure Functions;
  • Cosmos DB;
  • Azure Storage;
  • Azure AD;
  • Azure Key Vault;
  • Azure Managed Identities;
  • Azure Redis Cache;
  • Azure Logic App;
  • Azure Event Grid;
  • Azure Event Hub;
  • Azure Notification Hub;
  • Azure Service Bus;
  • Azure Queue Storage.
Target Audience
 
Any IT professional willing to improve his knowledge in Microsoft Azure is encouraged to take this certification, it is a great way to measure your skills within trending technologies. But, some group of professionals is keener to take maximum advantage of it,
  • Azure Developers, with at least 1 year of experience with Microsoft Azure;
  • Experienced Software Developers, looking for an Architect position in a hybrid environment;
  • Software Developers, working to move applications to the cloud environment.
Skills Measured
 
According to today's date, the skills that are measured in the exam are split as follows,

Benefits of Getting Certified

 
The main benefit here is having a worldwide recognized certification that proves that you have knowledge of this topic. Among intrinsic and extrinsic benefits, we have,
  • Higher growth potential, as certifications are a big plus;
  • Discounts and deals in Microsoft products and partners, like PluralSight and UpWork;
  • MCP Newsletters, with trending technologies;
  • Higher exposure on LinkedIn, as recruiters usually search for specific certifications;
  • Higher salary, you will be more valuable to your company;
  • Unique happiness when getting the result and you were approved, knowing that all your efforts were worth it;

Main Skills Measured by This Topic

 
What is Azure Service Bus?
 
Service Bus is a fully managed enterprise message broker that supports message queuing and publish/subscribe to topics. Use Azure Service Bus to scale your applications with the power of asynchronous messages and built-in integration with Azure Services.
 
With Service Bus you can handle single or batch messaging, load balance messages consumption, topics subscription, message sessions, and transactions handling with a guarantee that it is in compliance with standard and protocols like Advanced Message Queuing Protocol - AMQP 1.0, Java Message Service - JMS 2.0 for Premium SKU, and JMS 1.1 for Standart SKU. 
 
Service Bus main terminologies are the ones as follows,
  • The namespace works like a server and contains N queues and topics;
  • Queue, contains the messages;
  • Sender, who sends the message;
  • Receiver, who receives the message;
  • Topic, works as a queue but with multiple receivers;
  • Subscription, a receiver in a topic;
  • Batch, group of messages;
  • Safe-batch, validates if each message can be included in the batch;
  • Session, enables FIFO and group your messages in the queue;
  • Peek, returns a message without removing it from the queue;
  • Dead-letter queue, queue for messages that could not be delivered by its normal queue;
  • Peek & Lock, retrieves a message from the queue without removing it, and locks the message from being received by others receivers;
  • Receive & Delete, retrieves and delete a message from the queue;
  • Auto delete on idle, sets a time span to delete the queue if it is not used;
  • Duplicate detection history, before sending a message it checks if the message was not sent before;

What are Azure Queue Storage queues?

 
Azure Queue Storage queues is a simple message broker, it provides cloud message exchange between applications and services with messages up to 64kb in size in authenticated HTPP/HTPPS protocols. They are inside an Azure Storage Account and you access it with the same access keys and connection strings that you use your other resources inside the Storage Account.
 
Azure Queue Storage offers 3 ways of handling messages on the queue. The first way to handle a message on the queue is to peek a message when you retrieve a message from the queue without removing or locking it, the second way is to delete a message after this message is processed and the third way is to receive a message in the front of the queue but locking it temporarily.
 

Service Bus vs Azure Queue Storage Queues

 
Superficially comparing Service Bus with an Azure Queue Storage queues we could say that Service Bus is a very powerful and complete message-broker that is recommended for medium/high complexity while Azure Queue Storage queues is a basic message-broker that could be used for low complexity. Below we can see a comparison table with the main message-broker functionalities and how Service Bus and Azure Queues Storage queues fit with those functionalities side by side.
 
 
Practical Examples
 
Azure Service Bus with .Net Core
 
Pre-Requisites
  • Nuget Package Azure.Messaging.ServiceBus; 
 

Creating Service Bus with Azure CLI

 
Setting Variables
  1. $resourceGroup = "serviceBus-RG"  
  2. $location ="westeurope"  
  3. $serviceBusNamespace="sampleservbusnamespace"  
Creating the Service Bus Namespace
  1. az servicebus namespace create -g $resourceGroup -n $serviceBusNamespace -l $location --sku Basic  
Result
 
 
 

Creating Service Bus Objects with .Net Core

 
Methods to get, create, update and delete a queue, a topic, and a subscription
  1. class ServiceBusObjects  
  2.    {  
  3.        public static async Task<QueueProperties> CreateQueueAsync(string connectionString, string queueName, bool requiresSession = false)  
  4.        {  
  5.            var client = new ServiceBusAdministrationClient(connectionString);  
  6.            var options = new CreateQueueOptions(queueName)  
  7.            {  
  8.                DefaultMessageTimeToLive = TimeSpan.FromDays(2),  
  9.                DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(1),  
  10.                EnableBatchedOperations = true,  
  11.                DeadLetteringOnMessageExpiration = true,  
  12.                EnablePartitioning = false,  
  13.                ForwardDeadLetteredMessagesTo = null,  
  14.                ForwardTo = null,  
  15.                LockDuration = TimeSpan.FromSeconds(45),  
  16.                MaxDeliveryCount = 8,  
  17.                MaxSizeInMegabytes = 2048,  
  18.                UserMetadata = "some metadata"  
  19.            };  
  20.            options.RequiresSession = requiresSession;  
  21.   
  22.            options.AuthorizationRules.Add(new SharedAccessAuthorizationRule(  
  23.                "allClaims",  
  24.                new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen }));  
  25.   
  26.            return await client.CreateQueueAsync(options);  
  27.        }  
  28.   
  29.        public static async Task<QueueProperties> GetQueueAsync(string connectionString, string queueName)  
  30.        {  
  31.            try  
  32.            {  
  33.                var client = new ServiceBusAdministrationClient(connectionString);  
  34.                return await client.GetQueueAsync(queueName);  
  35.            }  
  36.            catch (Exception ex)  
  37.            {  
  38.                return null;  
  39.            }  
  40.        }  
  41.        public static async Task<QueueProperties> UpdateQueueAsync(string connectionString, QueueProperties queue)  
  42.        {  
  43.            var client = new ServiceBusAdministrationClient(connectionString);  
  44.            queue.UserMetadata = "other metadata";  
  45.            return await client.UpdateQueueAsync(queue);  
  46.        }  
  47.        public static async Task<Azure.Response> DeleteQueueAsync(string connectionString, string queueName)  
  48.        {  
  49.            var client = new ServiceBusAdministrationClient(connectionString);  
  50.            return await client.DeleteQueueAsync(queueName);  
  51.        }  
  52.        public static async Task<TopicProperties> CreateTopicAsync(string topicName, string connectionString)  
  53.        {  
  54.            var client = new ServiceBusAdministrationClient(connectionString);  
  55.            var topicOptions = new CreateTopicOptions(topicName)  
  56.            {  
  57.                DefaultMessageTimeToLive = TimeSpan.FromDays(2),  
  58.                DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(1),  
  59.                EnableBatchedOperations = true,  
  60.                EnablePartitioning = false,  
  61.                MaxSizeInMegabytes = 2048,  
  62.                UserMetadata = "some metadata"  
  63.            };  
  64.   
  65.            topicOptions.AuthorizationRules.Add(new SharedAccessAuthorizationRule(  
  66.                "allClaims",  
  67.                new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen }));  
  68.   
  69.            return await client.CreateTopicAsync(topicOptions);  
  70.        }  
  71.   
  72.        public static async Task<TopicProperties> GetTopicAsync(string topicName, string connectionString)  
  73.        {  
  74.            try  
  75.            {  
  76.                var client = new ServiceBusAdministrationClient(connectionString);  
  77.                return await client.GetTopicAsync(topicName);  
  78.            }  
  79.            catch (Exception ex)  
  80.            {  
  81.                return null;  
  82.            }            
  83.        }  
  84.   
  85.        public static async Task<TopicProperties> UpdateTopicAsync(TopicProperties topic, string connectionString)  
  86.        {  
  87.            var client = new ServiceBusAdministrationClient(connectionString);  
  88.            topic.UserMetadata = "other metadata";  
  89.            return await client.UpdateTopicAsync(topic);  
  90.        }  
  91.        public static async Task<Azure.Response> DeleteTopicAsync(string topicName, string connectionString)  
  92.        {  
  93.            var client = new ServiceBusAdministrationClient(connectionString);  
  94.            return await client.DeleteTopicAsync(topicName);  
  95.        }  
  96.   
  97.        public static async Task<SubscriptionProperties> CreateSubscriptionAsync(string topicName, string connectionString, string subscriptionName)  
  98.        {  
  99.            var client = new ServiceBusAdministrationClient(connectionString);  
  100.   
  101.            var subscriptionOptions = new CreateSubscriptionOptions(topicName, subscriptionName)  
  102.            {  
  103.                DefaultMessageTimeToLive = TimeSpan.FromDays(2),  
  104.                EnableBatchedOperations = true,  
  105.                UserMetadata = "some metadata"  
  106.            };  
  107.            return await client.CreateSubscriptionAsync(subscriptionOptions);  
  108.        }  
  109.   
  110.        public static async Task<SubscriptionProperties> GetSubscriptionAsync(string topicName, string connectionString, string subscriptionName)  
  111.        {  
  112.            try  
  113.            {  
  114.                var client = new ServiceBusAdministrationClient(connectionString);  
  115.                return await client.GetSubscriptionAsync(topicName, subscriptionName);  
  116.            }  
  117.            catch (Exception ex)  
  118.            {  
  119.                return null;  
  120.            }  
  121.        }  
  122.        public static async Task<SubscriptionProperties> UpdateSubscriptionAsync(string connectionString, SubscriptionProperties subscription)  
  123.        {  
  124.            var client = new ServiceBusAdministrationClient(connectionString);  
  125.            subscription.UserMetadata = "other metadata";  
  126.            return await client.UpdateSubscriptionAsync(subscription);  
  127.        }  
  128.        public static async Task<Azure.Response> DeleteSubscriptionAsync(string topicName, string connectionString, string subscriptionName)  
  129.        {  
  130.            var client = new ServiceBusAdministrationClient(connectionString);  
  131.            return await client.DeleteSubscriptionAsync(topicName, subscriptionName);  
  132.        }  
  133.   
  134.    }  
Creating the Objects
  1.    private static readonly string ConnectionString = "Endpoint=sb://sampleservbusnamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+SXXUY4kweUSphE1l78avtHWrp9NyDGKPiePOi/KWfw=";  
  2.         private static readonly string QueueName = "sampleQueue";  
  3.         private static readonly string SessionQueueName = "sampleSessionQueue";  
  4.         private static readonly string TopicName = "sampleTopic";  
  5.         private static readonly string SubscriptionName = "sampleSubscription";  
  6.         static async Task Main(string[] args)  
  7.         {  
  8.             Console.WriteLine("Hello World!");  
  9.  
  10.             #region Creating Objects  
  11.             QueueProperties queue = await ServiceBusObjects.GetQueueAsync(ConnectionString, QueueName);  
  12.             if (queue == null)  
  13.                 queue = await ServiceBusObjects.CreateQueueAsync(ConnectionString, QueueName);  
  14.   
  15.             QueueProperties sessionQueue = await ServiceBusObjects.GetQueueAsync(ConnectionString, SessionQueueName);  
  16.             if (sessionQueue == null)  
  17.                 sessionQueue = await ServiceBusObjects.CreateQueueAsync(ConnectionString, SessionQueueName, true);  
  18.   
  19.             TopicProperties topic = await ServiceBusObjects.GetTopicAsync(TopicName, ConnectionString);  
  20.             if (topic == null)  
  21.                 topic = await ServiceBusObjects.CreateTopicAsync(TopicName, ConnectionString);  
  22.   
  23.             SubscriptionProperties subscription = await ServiceBusObjects.GetSubscriptionAsync(TopicName, ConnectionString, SubscriptionName);  
  24.             if (subscription == null)  
  25.                 subscription = await ServiceBusObjects.CreateSubscriptionAsync(TopicName, ConnectionString, SubscriptionName);  
  26.  
  27.             #endregion  
  28. }   
Result 
 
 
 
 

Consuming and Publishing to Service Bus with .Net Core

 
Creating the Method to publish and consume to queues 
  1. public class SendAndReceiveMessage  
  2.     {  
  3.         public static async Task SendMessageAsync(string connectionString, string queueName, string messageText)  
  4.         {  
  5.             await using var client = new ServiceBusClient(connectionString);  
  6.   
  7.             // create the sender  
  8.             ServiceBusSender sender = client.CreateSender(queueName);  
  9.   
  10.             // create a message that we can send. UTF-8 encoding is used when providing a string.  
  11.             ServiceBusMessage message = new ServiceBusMessage(messageText);  
  12.   
  13.             // send the message  
  14.             await sender.SendMessageAsync(message);  
  15.         }  
  16.         public static async Task SendSessionMessageAsync(string connectionString, string queueName, string messageText, string sessionId)  
  17.         {  
  18.             await using var client = new ServiceBusClient(connectionString);  
  19.   
  20.             // create the sender  
  21.             ServiceBusSender sender = client.CreateSender(queueName);  
  22.   
  23.             // create a message that we can send. UTF-8 encoding is used when providing a string.  
  24.             ServiceBusMessage message = new ServiceBusMessage(messageText)  
  25.             {  
  26.                 SessionId = sessionId  
  27.             };  
  28.   
  29.             // send the message  
  30.             await sender.SendMessageAsync(message);  
  31.         }  
  32.         public static async Task<long> SendScheduledMessageAsync(string connectionString, string queueName, string messageText, DateTimeOffset timeOffset)  
  33.         {  
  34.             await using var client = new ServiceBusClient(connectionString);  
  35.   
  36.             // create the sender  
  37.             ServiceBusSender sender = client.CreateSender(queueName);  
  38.   
  39.             // create a message that we can send. UTF-8 encoding is used when providing a string.  
  40.             ServiceBusMessage message = new ServiceBusMessage(messageText);  
  41.   
  42.             // send the message  
  43.             return await sender.ScheduleMessageAsync(message, timeOffset);  
  44.         }  
  45.   
  46.   
  47.         public static async Task CancelScheduledMessageAsync(string connectionString, string queueName, long messageSequenceNumber)  
  48.         {  
  49.             await using var client = new ServiceBusClient(connectionString);  
  50.   
  51.             // create the sender  
  52.             ServiceBusSender sender = client.CreateSender(queueName);  
  53.   
  54.             await sender.CancelScheduledMessageAsync(messageSequenceNumber);  
  55.   
  56.         }  
  57.         public static async Task<long> DeferMessageAsync(string connectionString, string queueName)  
  58.         {  
  59.             await using var client = new ServiceBusClient(connectionString);  
  60.   
  61.             // create a receiver that we can use to receive the message  
  62.             ServiceBusReceiver receiver = client.CreateReceiver(queueName);  
  63.   
  64.             ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();  
  65.   
  66.             // defer the message, thereby preventing the message from being received again without using  
  67.             // the received deferred message API.  
  68.             await receiver.DeferMessageAsync(receivedMessage);  
  69.   
  70.             return receivedMessage.SequenceNumber;  
  71.   
  72.         }  
  73.   
  74.         public static async Task<ServiceBusReceivedMessage> GetDeferredMessageAsync(string connectionString, string queueName, long messageSequenceNumber)  
  75.         {  
  76.             await using var client = new ServiceBusClient(connectionString);  
  77.   
  78.             // create a receiver that we can use to receive the message  
  79.             ServiceBusReceiver receiver = client.CreateReceiver(queueName);  
  80.   
  81.             // receive the deferred message by specifying the service set sequence number of the original  
  82.             // received message  
  83.             return await receiver.ReceiveDeferredMessageAsync(messageSequenceNumber);  
  84.         }  
  85.   
  86.         public static async Task DeadLetterMessageAsync(string connectionString, string queueName)  
  87.         {  
  88.             await using var client = new ServiceBusClient(connectionString);  
  89.   
  90.             // create a receiver that we can use to receive the message  
  91.             ServiceBusReceiver receiver = client.CreateReceiver(queueName);  
  92.   
  93.             ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();  
  94.   
  95.             // dead-letter the message, thereby preventing the message from being received again without receiving from the dead letter queue.  
  96.             await receiver.DeadLetterMessageAsync(receivedMessage);  
  97.         }  
  98.         public static async Task<ServiceBusReceivedMessage> GetDeadLetterMessageAsync(string connectionString, string queueName)  
  99.         {  
  100.             await using var client = new ServiceBusClient(connectionString);  
  101.   
  102.             // receive the dead lettered message with receiver scoped to the dead letter queue.  
  103.             ServiceBusReceiver receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions  
  104.             {  
  105.                 SubQueue = SubQueue.DeadLetter  
  106.             });  
  107.   
  108.             // the received message is a different type as it contains some service set properties  
  109.             return await receiver.ReceiveMessageAsync();  
  110.         }  
  111.   
  112.         public static async Task<ServiceBusReceivedMessage> GetMessageAsync(string connectionString, string queueName)  
  113.         {  
  114.             await using var client = new ServiceBusClient(connectionString);  
  115.   
  116.             // create a receiver that we can use to receive the message  
  117.             ServiceBusReceiver receiver = client.CreateReceiver(queueName);  
  118.   
  119.             // the received message is a different type as it contains some service set properties  
  120.             return await receiver.ReceiveMessageAsync();  
  121.         }  
  122.         public static async Task<ServiceBusReceivedMessage> GetMessageFromSessionAsync(string connectionString, string queueName, string sessionId)  
  123.         {  
  124.             await using var client = new ServiceBusClient(connectionString);  
  125.   
  126.             ServiceBusSessionReceiver receiver = null;  
  127.             if (string.IsNullOrEmpty(sessionId))  
  128.                 receiver = await client.AcceptNextSessionAsync(queueName);  
  129.             else  
  130.                 receiver = await client.AcceptSessionAsync(queueName, sessionId);  
  131.   
  132.             // the received message is a different type as it contains some service set properties  
  133.             return await receiver.ReceiveMessageAsync();  
  134.         }  
  135.         public static async Task CompleteOrAbandonMessageAsync(string connectionString, string queueName, bool abandon)  
  136.         {  
  137.             await using var client = new ServiceBusClient(connectionString);  
  138.   
  139.             // create a receiver that we can use to receive the message  
  140.             ServiceBusReceiver receiver = client.CreateReceiver(queueName);  
  141.   
  142.             // the received message is a different type as it contains some service set properties  
  143.             ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();  
  144.   
  145.             Console.WriteLine($"Message received {receivedMessage.Body} to be abandoned {abandon}");  
  146.   
  147.             if (!abandon) // complete the message, thereby deleting it from the service  
  148.                 await receiver.CompleteMessageAsync(receivedMessage);  
  149.             else // abandon the message, thereby releasing the lock and allowing it to be received again by this or other receivers  
  150.                 await receiver.AbandonMessageAsync(receivedMessage);  
  151.         }  
  152.   
  153.   
  154.         public static async Task SendMessageBatchAsync(string connectionString, string queueName, List<string> messageTexts)  
  155.         {  
  156.             await using var client = new ServiceBusClient(connectionString);  
  157.   
  158.             // create the sender  
  159.             ServiceBusSender sender = client.CreateSender(queueName);  
  160.   
  161.             // create a message that we can send. UTF-8 encoding is used when providing a string.  
  162.             IList<ServiceBusMessage> messages = new List<ServiceBusMessage>();  
  163.   
  164.             messageTexts.ForEach(msg => messages.Add(new ServiceBusMessage(msg)));  
  165.   
  166.             // send the message  
  167.             await sender.SendMessagesAsync(messages);  
  168.         }  
  169.         public static async Task SendMessageSafeBatchAsync(string connectionString, string queueName, List<string> messageTexts)  
  170.         {  
  171.             await using var client = new ServiceBusClient(connectionString);  
  172.   
  173.             // create the sender  
  174.             ServiceBusSender sender = client.CreateSender(queueName);  
  175.   
  176.             // create a message that we can send. UTF-8 encoding is used when providing a string.  
  177.             Queue<ServiceBusMessage> messages = new Queue<ServiceBusMessage>();  
  178.   
  179.             messageTexts.ForEach(msg => messages.Enqueue(new ServiceBusMessage(msg)));  
  180.   
  181.             while (messages.Count > 0)  
  182.             {  
  183.                 // start a new batch  
  184.                 using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();  
  185.   
  186.                 // add the first message to the batch  
  187.                 if (messageBatch.TryAddMessage(messages.Peek()))  
  188.                 {  
  189.                     // dequeue the message from the .NET queue once the message is added to the batch  
  190.                     messages.Dequeue();  
  191.                 }  
  192.                 else  
  193.                 {  
  194.                     // if the first message can't fit, then it is too large for the batch  
  195.                     throw new Exception($"Message {messageTexts.Count - messages.Count} is too large and cannot be sent.");  
  196.                 }  
  197.   
  198.                 // add as many messages as possible to the current batch  
  199.                 while (messages.Count > 0 && messageBatch.TryAddMessage(messages.Peek()))  
  200.                 {  
  201.                     // dequeue the message from the .NET queue as it has been added to the batch  
  202.                     messages.Dequeue();  
  203.                 }  
  204.   
  205.                 // now, send the batch  
  206.                 await sender.SendMessagesAsync(messageBatch);  
  207.   
  208.             }  
  209.         }  
  210.     }  
Publishing and Consuming
  1. private static readonly string ConnectionString = "Endpoint=sb://sampleservbusnamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+SXXUY4kweUSphE1l78avtHWrp9NyDGKPiePOi/KWfw=";  
  2. private static readonly string QueueName = "sampleQueue";  
  3. private static readonly string SessionQueueName = "sampleSessionQueue";  
  4. private static readonly string TopicName = "sampleTopic";  
  5. private static readonly string SubscriptionName = "sampleSubscription";  
  6. static async Task Main(string[] args)  
  7. {  
  8.     Console.WriteLine("Hello World!");  
  9.     #region Sending and Receiving Messages  
  10.     int count = 0;  
  11.   
  12.     //sending messages  
  13.     await SendAndReceiveMessage.SendMessageAsync(ConnectionString, QueueName, $"Message {count++}");  
  14.     await SendAndReceiveMessage.SendMessageBatchAsync(ConnectionString, QueueName, new System.Collections.Generic.List<string> { $"Message {count++}", $"Message {count++}", $"Message {count++}", $"Message {count++}" });  
  15.     await SendAndReceiveMessage.SendMessageSafeBatchAsync(ConnectionString, QueueName, new System.Collections.Generic.List<string> { $"Message {count++}", $"Message {count++}", $"Message {count++}", $"Message {count++}" });  
  16.     long firstScheduledMessageNumber = await SendAndReceiveMessage.SendScheduledMessageAsync(ConnectionString, QueueName, $"Message {count++}"new DateTimeOffset(DateTime.Now.AddMinutes(10)));  
  17.     long secondScheduledMessageNumber = await SendAndReceiveMessage.SendScheduledMessageAsync(ConnectionString, QueueName, $"Message {count++}"new DateTimeOffset(DateTime.Now.AddMinutes(10)));  
  18.     await SendAndReceiveMessage.CancelScheduledMessageAsync(ConnectionString, QueueName, firstScheduledMessageNumber);  
  19.     long deferredMessageNumber = await SendAndReceiveMessage.DeferMessageAsync(ConnectionString, QueueName);  
  20.     Console.WriteLine((await SendAndReceiveMessage.GetDeferredMessageAsync(ConnectionString, QueueName, deferredMessageNumber)).Body);  
  21.     await SendAndReceiveMessage.DeadLetterMessageAsync(ConnectionString, QueueName);  
  22.     Console.WriteLine((await SendAndReceiveMessage.GetDeadLetterMessageAsync(ConnectionString, QueueName)).Body);  
  23.     Console.WriteLine((await SendAndReceiveMessage.GetMessageAsync(ConnectionString, QueueName)).Body);  
  24.     await SendAndReceiveMessage.CompleteOrAbandonMessageAsync(ConnectionString, QueueName, false);  
  25.     await SendAndReceiveMessage.CompleteOrAbandonMessageAsync(ConnectionString, QueueName, true);  
  26.     #endregion  
  27.  
  28.     #region Sending and Receiving Session Messages  
  29.     await SendAndReceiveMessage.SendSessionMessageAsync(ConnectionString, SessionQueueName, $"Message {count++}""session id 1");  
  30.     Console.WriteLine((await SendAndReceiveMessage.GetMessageFromSessionAsync(ConnectionString, SessionQueueName, "session id 1")).Body);  
  31.     #endregion  
Results
 
 
 
Using Service Bus Processor with .Net Core 
 
Message Processor Method
  1. class Processor  
  2.    {  
  3.        public static async Task RunProcessor(string connectionString, string queueName, TimeSpan timeSpan)  
  4.        {  
  5.            // since ServiceBusClient implements IAsyncDisposable we create it with "await using"  
  6.            await using var client = new ServiceBusClient(connectionString);  
  7.   
  8.            // get the options to use for configuring the processor  
  9.            var options = new ServiceBusProcessorOptions  
  10.            {  
  11.                // By default after the message handler returns, the processor will complete the message  
  12.                // If I want more fine-grained control over settlement, I can set this to false.  
  13.                AutoCompleteMessages = false,  
  14.   
  15.                // I can also allow for multi-threading  
  16.                MaxConcurrentCalls = 2  
  17.            };  
  18.   
  19.            // create a processor that we can use to process the messages  
  20.            ServiceBusProcessor processor = client.CreateProcessor(queueName, options);  
  21.   
  22.            processor.ProcessMessageAsync += MessageHandler;  
  23.            processor.ProcessErrorAsync += ErrorHandler;  
  24.   
  25.            await processor.StartProcessingAsync();  
  26.   
  27.            // since the message handler will run in a background thread, in order to prevent  
  28.            // this sample from terminating immediately  
  29.            DateTime endProcessing = DateTime.Now.Add(timeSpan);  
  30.            while (DateTime.Now < endProcessing)  
  31.               await  Task.Delay(100);  
  32.   
  33.            // stop processing once the task completion source was completed.  
  34.            await processor.StopProcessingAsync();  
  35.        }  
  36.   
  37.        private static async Task MessageHandler(ProcessMessageEventArgs args)  
  38.        {  
  39.            string body = args.Message.Body.ToString();  
  40.            Console.WriteLine($"Message received from processor: {body}");  
  41.   
  42.            // we can evaluate application logic and use that to determine how to settle the message.  
  43.            await args.CompleteMessageAsync(args.Message);  
  44.        }  
  45.   
  46.        private static Task ErrorHandler(ProcessErrorEventArgs args)  
  47.        {  
  48.            // the error source tells me at what point in the processing an error occurred  
  49.            Console.WriteLine(args.ErrorSource);  
  50.            // the fully qualified namespace is available  
  51.            Console.WriteLine(args.FullyQualifiedNamespace);  
  52.            // as well as the entity path  
  53.            Console.WriteLine(args.EntityPath);  
  54.            Console.WriteLine(args.Exception.ToString());  
  55.            return Task.CompletedTask;  
  56.        }  
  57.    }  
Using Service Bus Session Processor with .Net Core 
 
Session Processor Method
  1. class SessionProcessor  
  2.    {  
  3.        public static async Task RunSessionProcessor(string connectionString, string queueName, TimeSpan timeSpan)  
  4.        {  
  5.            // since ServiceBusClient implements IAsyncDisposable we create it with "await using"  
  6.            await using var client = new ServiceBusClient(connectionString);  
  7.   
  8.            // get the options to use for configuring the processor  
  9.            var options = new ServiceBusSessionProcessorOptions  
  10.            {  
  11.                // By default after the message handler returns, the processor will complete the message  
  12.                // If I want more fine-grained control over settlement, I can set this to false.  
  13.                AutoCompleteMessages = false,  
  14.   
  15.                // I can also allow for processing multiple sessions  
  16.                MaxConcurrentSessions = 5,  
  17.   
  18.                // By default, there will be a single concurrent call per session. I can  
  19.                // increase that here to enable parallel processing within each session.  
  20.                MaxConcurrentCallsPerSession = 2  
  21.            };  
  22.   
  23.            // create a processor that we can use to process the messages  
  24.            ServiceBusSessionProcessor processor = client.CreateSessionProcessor(queueName, options);  
  25.   
  26.            processor.ProcessMessageAsync += MessageHandler;  
  27.            processor.ProcessErrorAsync += ErrorHandler;  
  28.   
  29.            await processor.StartProcessingAsync();  
  30.   
  31.            // since the message handler will run in a background thread, in order to prevent  
  32.            // this sample from terminating immediately  
  33.            DateTime endProcessing = DateTime.Now.Add(timeSpan);  
  34.            while (DateTime.Now < endProcessing)  
  35.                await Task.Delay(100);  
  36.   
  37.            // stop processing once the task completion source was completed.  
  38.            await processor.StopProcessingAsync();  
  39.        }  
  40.   
  41.        private static async Task MessageHandler(ProcessSessionMessageEventArgs args)  
  42.        {  
  43.            string body = args.Message.Body.ToString();  
  44.            Console.WriteLine($"Message received from session processor: {body}");  
  45.   
  46.            // we can evaluate application logic and use that to determine how to settle the message.  
  47.            await args.CompleteMessageAsync(args.Message);  
  48.   
  49.            // we can also set arbitrary session state using this receiver  
  50.            // the state is specific to the session, and not any particular message  
  51.            await args.SetSessionStateAsync(new BinaryData("some state"));  
  52.        }  
  53.   
  54.        private static Task ErrorHandler(ProcessErrorEventArgs args)  
  55.        {  
  56.            // the error source tells me at what point in the processing an error occurred  
  57.            Console.WriteLine(args.ErrorSource);  
  58.            // the fully qualified namespace is available  
  59.            Console.WriteLine(args.FullyQualifiedNamespace);  
  60.            // as well as the entity path  
  61.            Console.WriteLine(args.EntityPath);  
  62.            Console.WriteLine(args.Exception.ToString());  
  63.            return Task.CompletedTask;  
  64.        }  
  65.    }  
Azure Queue Storage queues with .Net Core
 
Prerequisites
  • Nuget Package Azure.Storage.Queues;
  • Azure Storage Account previously created. Here named as "storageaccountnormal"; 
 
Creating Queue Storage objects with .Net Core
 
Methods
  1. public class StorageQueueObjects  
  2.    {  
  3.        //-------------------------------------------------  
  4.        // Create the queue  
  5.        //-------------------------------------------------  
  6.        public async static Task<QueueClient> CreateQueue(string queueName, string connectionString)  
  7.        {  
  8.            // Instantiate a QueueClient which will be used to create and manipulate the queue  
  9.            QueueClient queueClient = new QueueClient(connectionString, queueName);  
  10.   
  11.            // Create the queue  
  12.            await queueClient.CreateIfNotExistsAsync();  
  13.   
  14.            return queueClient;  
  15.        }  
  16.   
  17.        //-------------------------------------------------  
  18.        // Delete the queue  
  19.        //-------------------------------------------------  
  20.        public async static Task<Azure.Response<bool>> DeleteQueue(string queueName, string connectionString)  
  21.        {  
  22.            // Instantiate a QueueClient which will be used to manipulate the queue  
  23.            QueueClient queueClient = new QueueClient(connectionString, queueName);  
  24.   
  25.            // Delete the queue  
  26.            return await queueClient.DeleteIfExistsAsync();  
  27.        }  
  28.    }   
Creating the Objects
  1.  private readonly static string ConnectionString = "DefaultEndpointsProtocol=https;AccountName=storageaccountnormal;AccountKey=ypYvvkSb93K4ca3YYNvh92+oyOZZLmn6eJh0TBHc5h54DtFd22sOLGtgjeu2/30AyF948HaT125nbEj4HxCvWQ==;EndpointSuffix=core.windows.net";  
  2.         private readonly static string QueueName = "samplestoragequeue";  
  3.   
  4.         static async Task Main(string[] args)  
  5.         {  
  6.             Console.WriteLine("Hello World!");  
  7.   
  8.             QueueClient queue = await StorageQueueObjects.CreateQueue(QueueName, ConnectionString);  
  9. }  
Result
 
 
Receiving and Publishing to Azure Queue Storage with .Net Core 
 
Methods
  1. public class SendAndReceiveMessage  
  2.     {  
  3.         //-------------------------------------------------  
  4.         // Send a message to the queue  
  5.         //-------------------------------------------------  
  6.         public static async Task<Azure.Response<SendReceipt>> InsertMessage(QueueClient queueClient, string message)  
  7.         {  
  8.             return await queueClient.SendMessageAsync(message);  
  9.         }  
  10.         //-------------------------------------------------  
  11.         // Peek at a message in the queue  
  12.         //-------------------------------------------------  
  13.         public static async Task<Azure.Response<PeekedMessage[]>> PeekMessage(QueueClient queueClient)  
  14.         {  
  15.             return await queueClient.PeekMessagesAsync();  
  16.         }  
  17.   
  18.         //-------------------------------------------------  
  19.         // Process and remove one message from the queue  
  20.         //-------------------------------------------------  
  21.         public static async Task<QueueMessage[]> DequeueMessage(QueueClient queueClient)  
  22.         {  
  23.             // Get the next message  
  24.             QueueMessage[] retrievedMessage = await queueClient.ReceiveMessagesAsync();  
  25.   
  26.   
  27.             // Delete the message  
  28.             await queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);  
  29.   
  30.             return retrievedMessage;  
  31.         }  
  32.   
  33.   
  34.         //-----------------------------------------------------  
  35.         // Process and remove multiple messages from the queue  
  36.         //-----------------------------------------------------  
  37.         public static async Task<QueueMessage[]> DequeueMessages(QueueClient queueClient)  
  38.         {  
  39.             // Receive and process 20 messages  
  40.             QueueMessage[] receivedMessages = await queueClient.ReceiveMessagesAsync(20, TimeSpan.FromMinutes(5));  
  41.   
  42.             foreach (QueueMessage message in receivedMessages)  
  43.             {  
  44.                 // Delete the message  
  45.                 await queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);  
  46.             }  
  47.   
  48.             return receivedMessages;  
  49.         }  
  50.     }  
Receiving and Publishing Messages
  1. private readonly static string ConnectionString = "DefaultEndpointsProtocol=https;AccountName=storageaccountnormal;AccountKey=ypYvvkSb93K4ca3YYNvh92+oyOZZLmn6eJh0TBHc5h54DtFd22sOLGtgjeu2/30AyF948HaT125nbEj4HxCvWQ==;EndpointSuffix=core.windows.net";  
  2.       private readonly static string QueueName = "samplestoragequeue";  
  3.   
  4.       static async Task Main(string[] args)  
  5.       {  
  6.           Console.WriteLine("Hello World!");  
  7.   
  8.           QueueClient queue = await StorageQueueObjects.CreateQueue(QueueName, ConnectionString);  
  9.   
  10.           //insert 50 messages  
  11.           for (int i = 0; i < 50; i++)  
  12.           {  
  13.               await SendAndReceiveMessage.InsertMessage(queue, "Message Number " + i);  
  14.           }  
  15.   
  16.           PeekedMessage[] peekedMessages = await SendAndReceiveMessage.PeekMessage(queue);  
  17.           peekedMessages.ToList().ForEach(x => Console.WriteLine("Message peeked: " + x.Body));  
  18.   
  19.           QueueMessage[] message = await SendAndReceiveMessage.DequeueMessage(queue);  
  20.           Console.WriteLine("Single Message dequeued: " + message[0].Body);  
  21.   
  22.           QueueMessage[] messages = await SendAndReceiveMessage.DequeueMessages(queue);  
  23.           messages.ToList().ForEach(x => Console.WriteLine("Message dequeued: " + x.Body));  
  24.   
  25.       }   
Result 
 
 
 
External References