Road To AZ-204 - Developing Message-Based Solution

Introduction

This article's intention is to explain the main skills measured in this sub-topic of the AZ-204 Certification. Azure Service Bus and Azure Queue Storage are the main components that will have their fundamentals explained here alongside a practical example.

This certification is very extensive and this article approaches only the main topics, make sure you know those components deep down before taking the exam. Another great tip is to do exam simulators before the official exam in order to validate your knowledge.

What is the Certification AZ-204 - Developing Solutions for Microsoft Azure?

The AZ-204 - Developing Solutions for Microsoft Azure certification measures designing, building, testing, and maintaining skills of an application and/or service in the Microsoft Azure Cloud environment. It approaches, among others, those components,

  • Azure Virtual Machines
  • Docker
  • Azure Containers
  • Service Web App
  • Azure Functions
  • Cosmos DB
  • Azure Storage
  • Azure AD
  • Azure Key Vault
  • Azure Managed Identities
  • Azure Redis Cache
  • Azure Logic App
  • Azure Event Grid
  • Azure Event Hub
  • Azure Notification Hub
  • Azure Service Bus
  • Azure Queue Storage.

Target Audience

Any IT professional willing to improve his knowledge in Microsoft Azure is encouraged to take this certification, it is a great way to measure your skills within trending technologies. But, some group of professionals is keener to take maximum advantage of it,

  • Azure Developers, with at least 1 year of experience with Microsoft Azure;
  • Experienced Software Developers, looking for an Architect position in a hybrid environment;
  • Software Developers, working to move applications to the cloud environment.

Skills Measured

According to today's date, the skills that are measured in the exam are split as follows.

Benefits of Getting Certified

The main benefit here is having a worldwide recognized certification that proves that you have knowledge of this topic. Among intrinsic and extrinsic benefits, we have,

  • Higher growth potential, as certifications, are a big plus.
  • Discounts and deals in Microsoft products and partners, like PluralSight and UpWork.
  • MCP Newsletters, with trending technologies.
  • Higher exposure on LinkedIn, as recruiters usually search for specific certifications.
  • Higher salary, you will be more valuable to your company.
  • Unique happiness when getting the result and you were approved, knowing that all your efforts were worth it.

Main Skills Measured by This Topic
 

What is Azure Service Bus?

Service Bus is a fully managed enterprise message broker that supports message queuing and publishing/subscribing to topics. Use Azure Service Bus to scale your applications with the power of asynchronous messages and built-in integration with Azure Services.

With Service Bus you can handle single or batch messaging, load balance messages consumption, topics subscription, message sessions, and transactions handling with a guarantee that it is in compliance with standards and protocols like Advanced Message Queuing Protocol - AMQP 1.0, Java Message Service - JMS 2.0 for Premium SKU, and JMS 1.1 for Standart SKU.

Service Bus's main terminologies are the ones as follows.

  • The namespace works like a server and contains N queues and topics.
  • The queue contains the messages.
  • Sender, who sends the message.
  • Receiver, who receives the message.
  • The topic works as a queue but with multiple receivers.
  • Subscription, a receiver in a topic.
  • Batch, group of messages.
  • Safe batch validates if each message can be included in the batch.
  • Session enables FIFO and group your messages in the queue.
  • Peek, returns a message without removing it from the queue.
  • Dead-letter queue, a queue for messages that could not be delivered by its normal queue.
  • Peek & Lock, retrieves a message from the queue without removing it and locks the message from being received by other receivers.
  • Receive & Delete, retrieves and deletes a message from the queue.
  • Auto deletes on idle sets a time span to delete the queue if it is not used.
  • Duplicate detection history, before sending a message it checks if the message was not sent before.

What are Azure Queue Storage queues?

Azure Queue Storage queues is a simple message broker, it provides cloud message exchange between applications and services with messages up to 64kb in size in authenticated HTPP/HTPPS protocols. They are inside an Azure Storage Account and you access it with the same access keys and connection strings that you use your other resources inside the Storage Account.

Azure Queue Storage offers 3 ways of handling messages on the queue. The first way to handle a message on the queue is to peek at a message when you retrieve a message from the queue without removing or locking it, the second way is to delete a message after this message is processed and the third way is to receive a message in the front of the queue but locking it temporarily.

Service Bus vs Azure Queue Storage Queues

Superficially comparing Service Bus with an Azure Queue Storage queue we could say that Service Bus is a very powerful and complete message-broker that is recommended for medium/high complexity while Azure Queue Storage queues is a basic message-broker that could be used for low complexity. Below we can see a comparison table with the main message-broker functionalities and how Service Bus and Azure Queues Storage queues fit with those functionalities side by side.

Storage Queues

Practical Examples

Azure Service Bus with .Net Core

Pre-Requisites

Nuget Package Azure.Messaging.ServiceBus.

You can find the following project on GitHub.

Creating Service Bus with Azure CLI

Setting Variables

$resourceGroup = "serviceBus-RG"
$location = "westeurope"
$serviceBusNamespace = "sampleservbusnamespace"

Creating the Service Bus Namespace

az servicebus namespace create -g $resourceGroup -n $serviceBusNamespace -l $location --sku Basic

Result

 Namespace

Creating Service Bus Objects with .Net Core

Methods to get, create, update, and delete a queue, a topic, and a subscription.

class ServiceBusObjects
{
    public static async Task<QueueProperties> CreateQueueAsync(string connectionString, string queueName, bool requiresSession = false)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
        var options = new CreateQueueOptions(queueName)
        {
            DefaultMessageTimeToLive = TimeSpan.FromDays(2),
            DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(1),
            EnableBatchedOperations = true,
            DeadLetteringOnMessageExpiration = true,
            EnablePartitioning = false,
            ForwardDeadLetteredMessagesTo = null,
            ForwardTo = null,
            LockDuration = TimeSpan.FromSeconds(45),
            MaxDeliveryCount = 8,
            MaxSizeInMegabytes = 2048,
            UserMetadata = "some metadata"
        };
        options.RequiresSession = requiresSession;

        options.AuthorizationRules.Add(new SharedAccessAuthorizationRule(
            "allClaims",
            new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen }));

        return await client.CreateQueueAsync(options);
    }

    public static async Task<QueueProperties> GetQueueAsync(string connectionString, string queueName)
    {
        try
        {
            var client = new ServiceBusAdministrationClient(connectionString);
            return await client.GetQueueAsync(queueName);
        }
        catch (Exception ex)
        {
            return null;
        }
    }

    public static async Task<QueueProperties> UpdateQueueAsync(string connectionString, QueueProperties queue)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
        queue.UserMetadata = "other metadata";
        return await client.UpdateQueueAsync(queue);
    }

    public static async Task<Azure.Response> DeleteQueueAsync(string connectionString, string queueName)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
        return await client.DeleteQueueAsync(queueName);
    }

    public static async Task<TopicProperties> CreateTopicAsync(string topicName, string connectionString)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
        var topicOptions = new CreateTopicOptions(topicName)
        {
            DefaultMessageTimeToLive = TimeSpan.FromDays(2),
            DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(1),
            EnableBatchedOperations = true,
            EnablePartitioning = false,
            MaxSizeInMegabytes = 2048,
            UserMetadata = "some metadata"
        };

        topicOptions.AuthorizationRules.Add(new SharedAccessAuthorizationRule(
            "allClaims",
            new[] { AccessRights.Manage, AccessRights.Send, AccessRights.Listen }));

        return await client.CreateTopicAsync(topicOptions);
    }

    public static async Task<TopicProperties> GetTopicAsync(string topicName, string connectionString)
    {
        try
        {
            var client = new ServiceBusAdministrationClient(connectionString);
            return await client.GetTopicAsync(topicName);
        }
        catch (Exception ex)
        {
            return null;
        }            
    }

    public static async Task<TopicProperties> UpdateTopicAsync(TopicProperties topic, string connectionString)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
        topic.UserMetadata = "other metadata";
        return await client.UpdateTopicAsync(topic);
    }

    public static async Task<Azure.Response> DeleteTopicAsync(string topicName, string connectionString)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
        return await client.DeleteTopicAsync(topicName);
    }

    public static async Task<SubscriptionProperties> CreateSubscriptionAsync(string topicName, string connectionString, string subscriptionName)
    {
        var client = new ServiceBusAdministrationClient(connectionString);

        var subscriptionOptions = new CreateSubscriptionOptions(topicName, subscriptionName)
        {
            DefaultMessageTimeToLive = TimeSpan.FromDays(2),
            EnableBatchedOperations = true,
            UserMetadata = "some metadata"
        };
        return await client.CreateSubscriptionAsync(subscriptionOptions);
    }

    public static async Task<SubscriptionProperties> GetSubscriptionAsync(string topicName, string connectionString, string subscriptionName)
    {
        try
        {
            var client = new ServiceBusAdministrationClient(connectionString);
            return await client.GetSubscriptionAsync(topicName, subscriptionName);
        }
        catch (Exception ex)
        {
            return null;
        }
    }

    public static async Task<SubscriptionProperties> UpdateSubscriptionAsync(string connectionString, SubscriptionProperties subscription)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
        subscription.UserMetadata = "other metadata";
        return await client.UpdateSubscriptionAsync(subscription);
    }

    public static async Task<Azure.Response> DeleteSubscriptionAsync(string topicName, string connectionString, string subscriptionName)
    {
        var client = new ServiceBusAdministrationClient(connectionString);
        return await client.DeleteSubscriptionAsync(topicName, subscriptionName);
    }
}

Creating the Objects

private static readonly string ConnectionString = "Endpoint=sb://sampleservbusnamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+SXXUY4kweUSphE1l78avtHWrp9NyDGKPiePOi/KWfw=";
private static readonly string QueueName = "sampleQueue";
private static readonly string SessionQueueName = "sampleSessionQueue";
private static readonly string TopicName = "sampleTopic";
private static readonly string SubscriptionName = "sampleSubscription";

static async Task Main(string[] args)
{
    Console.WriteLine("Hello World!");

    #region Creating Objects
    QueueProperties queue = await ServiceBusObjects.GetQueueAsync(ConnectionString, QueueName);
    if (queue == null)
        queue = await ServiceBusObjects.CreateQueueAsync(ConnectionString, QueueName);

    QueueProperties sessionQueue = await ServiceBusObjects.GetQueueAsync(ConnectionString, SessionQueueName);
    if (sessionQueue == null)
        sessionQueue = await ServiceBusObjects.CreateQueueAsync(ConnectionString, SessionQueueName, true);

    TopicProperties topic = await ServiceBusObjects.GetTopicAsync(TopicName, ConnectionString);
    if (topic == null)
        topic = await ServiceBusObjects.CreateTopicAsync(TopicName, ConnectionString);

    SubscriptionProperties subscription = await ServiceBusObjects.GetSubscriptionAsync(TopicName, ConnectionString, SubscriptionName);
    if (subscription == null)
        subscription = await ServiceBusObjects.CreateSubscriptionAsync(TopicName, ConnectionString, SubscriptionName);
    #endregion
}

Result

Settings

Sampletopic

Consuming and Publishing to Service bus with .Net Core

Creating the Method to publish and consume queues.

public class SendAndReceiveMessage
{
    public static async Task SendMessageAsync(string connectionString, string queueName, string messageText)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create the sender
        ServiceBusSender sender = client.CreateSender(queueName);

        // create a message that we can send. UTF-8 encoding is used when providing a string.
        ServiceBusMessage message = new ServiceBusMessage(messageText);

        // send the message
        await sender.SendMessageAsync(message);
    }

    public static async Task SendSessionMessageAsync(string connectionString, string queueName, string messageText, string sessionId)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create the sender
        ServiceBusSender sender = client.CreateSender(queueName);

        // create a message that we can send. UTF-8 encoding is used when providing a string.
        ServiceBusMessage message = new ServiceBusMessage(messageText)
        {
            SessionId = sessionId
        };

        // send the message
        await sender.SendMessageAsync(message);
    }

    public static async Task<long> SendScheduledMessageAsync(string connectionString, string queueName, string messageText, DateTimeOffset timeOffset)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create the sender
        ServiceBusSender sender = client.CreateSender(queueName);

        // create a message that we can send. UTF-8 encoding is used when providing a string.
        ServiceBusMessage message = new ServiceBusMessage(messageText);

        // send the message
        return await sender.ScheduleMessageAsync(message, timeOffset);
    }

    public static async Task CancelScheduledMessageAsync(string connectionString, string queueName, long messageSequenceNumber)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create the sender
        ServiceBusSender sender = client.CreateSender(queueName);

        await sender.CancelScheduledMessageAsync(messageSequenceNumber);
    }

    public static async Task<long> DeferMessageAsync(string connectionString, string queueName)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create a receiver that we can use to receive the message
        ServiceBusReceiver receiver = client.CreateReceiver(queueName);

        ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

        // defer the message, thereby preventing the message from being received again without using
        // the received deferred message API.
        await receiver.DeferMessageAsync(receivedMessage);

        return receivedMessage.SequenceNumber;
    }

    public static async Task<ServiceBusReceivedMessage> GetDeferredMessageAsync(string connectionString, string queueName, long messageSequenceNumber)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create a receiver that we can use to receive the message
        ServiceBusReceiver receiver = client.CreateReceiver(queueName);

        // receive the deferred message by specifying the service set sequence number of the original
        // received message
        return await receiver.ReceiveDeferredMessageAsync(messageSequenceNumber);
    }

    public static async Task DeadLetterMessageAsync(string connectionString, string queueName)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create a receiver that we can use to receive the message
        ServiceBusReceiver receiver = client.CreateReceiver(queueName);

        ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

        // dead-letter the message, thereby preventing the message from being received again without receiving from the dead letter queue.
        await receiver.DeadLetterMessageAsync(receivedMessage);
    }

    public static async Task<ServiceBusReceivedMessage> GetDeadLetterMessageAsync(string connectionString, string queueName)
    {
        await using var client = new ServiceBusClient(connectionString);

        // receive the dead lettered message with receiver scoped to the dead letter queue.
        ServiceBusReceiver receiver = client.CreateReceiver(queueName, new ServiceBusReceiverOptions
        {
            SubQueue = SubQueue.DeadLetter
        });

        // the received message is a different type as it contains some service set properties
        return await receiver.ReceiveMessageAsync();
    }

    public static async Task<ServiceBusReceivedMessage> GetMessageAsync(string connectionString, string queueName)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create a receiver that we can use to receive the message
        ServiceBusReceiver receiver = client.CreateReceiver(queueName);

        // the received message is a different type as it contains some service set properties
        return await receiver.ReceiveMessageAsync();
    }

    public static async Task<ServiceBusReceivedMessage> GetMessageFromSessionAsync(string connectionString, string queueName, string sessionId)
    {
        await using var client = new ServiceBusClient(connectionString);

        ServiceBusSessionReceiver receiver = null;
        if (string.IsNullOrEmpty(sessionId))
            receiver = await client.AcceptNextSessionAsync(queueName);
        else
            receiver = await client.AcceptSessionAsync(queueName, sessionId);

        // the received message is a different type as it contains some service set properties
        return await receiver.ReceiveMessageAsync();
    }

    public static async Task CompleteOrAbandonMessageAsync(string connectionString, string queueName, bool abandon)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create a receiver that we can use to receive the message
        ServiceBusReceiver receiver = client.CreateReceiver(queueName);

        // the received message is a different type as it contains some service set properties
        ServiceBusReceivedMessage receivedMessage = await receiver.ReceiveMessageAsync();

        Console.WriteLine($"Message received {receivedMessage.Body} to be abandoned {abandon}");

        if (!abandon) // complete the message, thereby deleting it from the service
            await receiver.CompleteMessageAsync(receivedMessage);
        else // abandon the message, thereby releasing the lock and allowing it to be received again by this or other receivers
            await receiver.AbandonMessageAsync(receivedMessage);
    }

    public static async Task SendMessageBatchAsync(string connectionString, string queueName, List<string> messageTexts)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create the sender
        ServiceBusSender sender = client.CreateSender(queueName);

        // create a message that we can send. UTF-8 encoding is used when providing a string.
        IList<ServiceBusMessage> messages = new List<ServiceBusMessage>();

        messageTexts.ForEach(msg => messages.Add(new ServiceBusMessage(msg)));

        // send the message
        await sender.SendMessagesAsync(messages);
    }

    public static async Task SendMessageSafeBatchAsync(string connectionString, string queueName, List<string> messageTexts)
    {
        await using var client = new ServiceBusClient(connectionString);

        // create the sender
        ServiceBusSender sender = client.CreateSender(queueName);

        // create a message that we can send. UTF-8 encoding is used when providing a string.
        Queue<ServiceBusMessage> messages = new Queue<ServiceBusMessage>();

        messageTexts.ForEach(msg => messages.Enqueue(new ServiceBusMessage(msg)));

        while (messages.Count > 0)
        {
            // start a new batch
            using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();

            // add the first message to the batch
            if (messageBatch.TryAddMessage(messages.Peek()))
            {
                // dequeue the message from the .NET queue once the message is added to the batch
                messages.Dequeue();
            }
            else
            {
                // if the first message can't fit, then it is too large for the batch
                throw new Exception($"Message {messageTexts.Count - messages.Count} is too large and cannot be sent.");
            }

            // add as many messages as possible to the current batch
            while (messages.Count > 0 && messageBatch.TryAddMessage(messages.Peek()))
            {
                // dequeue the message from the .NET queue as it has been added to the batch
                messages.Dequeue();
            }

            // now, send the batch
            await sender.SendMessagesAsync(messageBatch);
        }
    }
}

Publishing and Consuming.

private static readonly string ConnectionString = "Endpoint=sb://sampleservbusnamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=+SXXUY4kweUSphE1l78avtHWrp9NyDGKPiePOi/KWfw=";
private static readonly string QueueName = "sampleQueue";
private static readonly string SessionQueueName = "sampleSessionQueue";
private static readonly string TopicName = "sampleTopic";
private static readonly string SubscriptionName = "sampleSubscription";

static async Task Main(string[] args)
{
    Console.WriteLine("Hello World!");

    #region Sending and Receiving Messages
    int count = 0;

    // sending messages
    await SendAndReceiveMessage.SendMessageAsync(ConnectionString, QueueName, $"Message {count++}");
    await SendAndReceiveMessage.SendMessageBatchAsync(ConnectionString, QueueName, new System.Collections.Generic.List<string> { $"Message {count++}", $"Message {count++}", $"Message {count++}", $"Message {count++}" });
    await SendAndReceiveMessage.SendMessageSafeBatchAsync(ConnectionString, QueueName, new System.Collections.Generic.List<string> { $"Message {count++}", $"Message {count++}", $"Message {count++}", $"Message {count++}" });
    long firstScheduledMessageNumber = await SendAndReceiveMessage.SendScheduledMessageAsync(ConnectionString, QueueName, $"Message {count++}", new DateTimeOffset(DateTime.Now.AddMinutes(10)));
    long secondScheduledMessageNumber = await SendAndReceiveMessage.SendScheduledMessageAsync(ConnectionString, QueueName, $"Message {count++}", new DateTimeOffset(DateTime.Now.AddMinutes(10)));
    await SendAndReceiveMessage.CancelScheduledMessageAsync(ConnectionString, QueueName, firstScheduledMessageNumber);
    long deferredMessageNumber = await SendAndReceiveMessage.DeferMessageAsync(ConnectionString, QueueName);
    Console.WriteLine((await SendAndReceiveMessage.GetDeferredMessageAsync(ConnectionString, QueueName, deferredMessageNumber)).Body);
    await SendAndReceiveMessage.DeadLetterMessageAsync(ConnectionString, QueueName);
    Console.WriteLine((await SendAndReceiveMessage.GetDeadLetterMessageAsync(ConnectionString, QueueName)).Body);
    Console.WriteLine((await SendAndReceiveMessage.GetMessageAsync(ConnectionString, QueueName)).Body);
    await SendAndReceiveMessage.CompleteOrAbandonMessageAsync(ConnectionString, QueueName, false);
    await SendAndReceiveMessage.CompleteOrAbandonMessageAsync(ConnectionString, QueueName, true);
    #endregion

    #region Sending and Receiving Session Messages
    await SendAndReceiveMessage.SendSessionMessageAsync(ConnectionString, SessionQueueName, $"Message {count++}", "session id 1");
    Console.WriteLine((await SendAndReceiveMessage.GetMessageFromSessionAsync(ConnectionString, SessionQueueName, "session id 1")).Body);
    #endregion
}

Results

Samplesession

Samplequeue

Using Service Bus Processor with .Net Core.

Message Processor Method

class Processor
{
    public static async Task RunProcessor(string connectionString, string queueName, TimeSpan timeSpan)
    {
        // since ServiceBusClient implements IAsyncDisposable we create it with "await using"
        await using var client = new ServiceBusClient(connectionString);

        // get the options to use for configuring the processor
        var options = new ServiceBusProcessorOptions
        {
            // By default after the message handler returns, the processor will complete the message
            // If I want more fine-grained control over settlement, I can set this to false.
            AutoCompleteMessages = false,

            // I can also allow for multi-threading
            MaxConcurrentCalls = 2
        };

        // create a processor that we can use to process the messages
        ServiceBusProcessor processor = client.CreateProcessor(queueName, options);

        processor.ProcessMessageAsync += MessageHandler;
        processor.ProcessErrorAsync += ErrorHandler;
        await processor.StartProcessingAsync();
        // since the message handler will run in a background thread, in order to prevent
        // this sample from terminating immediately
        DateTime endProcessing = DateTime.Now.Add(timeSpan);
        while (DateTime.Now < endProcessing)
            await Task.Delay(100);
        // stop processing once the task completion source was completed.
        await processor.StopProcessingAsync();
    }

    private static async Task MessageHandler(ProcessMessageEventArgs args)
    {
        string body = args.Message.Body.ToString();
        Console.WriteLine($"Message received from processor: {body}");

        // we can evaluate application logic and use that to determine how to settle the message.
        await args.CompleteMessageAsync(args.Message);
    }
    private static Task ErrorHandler(ProcessErrorEventArgs args)
    {
        // the error source tells me at what point in the processing an error occurred
        Console.WriteLine(args.ErrorSource);
        // the fully qualified namespace is available
        Console.WriteLine(args.FullyQualifiedNamespace);
        // as well as the entity path
        Console.WriteLine(args.EntityPath);
        Console.WriteLine(args.Exception.ToString());
        return Task.CompletedTask;
    }
}

Using Service Bus Session Processor with .Net Core.

Session Processor Method

class SessionProcessor
{
    public static async Task RunSessionProcessor(string connectionString, string queueName, TimeSpan timeSpan)
    {
        // since ServiceBusClient implements IAsyncDisposable we create it with "await using"
        await using var client = new ServiceBusClient(connectionString);

        // get the options to use for configuring the processor
        var options = new ServiceBusSessionProcessorOptions
        {
            // By default after the message handler returns, the processor will complete the message
            // If I want more fine-grained control over settlement, I can set this to false.
            AutoCompleteMessages = false,

            // I can also allow for processing multiple sessions
            MaxConcurrentSessions = 5,
            // By default, there will be a single concurrent call per session. I can
            // increase that here to enable parallel processing within each session.
            MaxConcurrentCallsPerSession = 2
        };

        // create a processor that we can use to process the messages
        ServiceBusSessionProcessor processor = client.CreateSessionProcessor(queueName, options);

        processor.ProcessMessageAsync += MessageHandler;
        processor.ProcessErrorAsync += ErrorHandler;
        await processor.StartProcessingAsync();
        // since the message handler will run in a background thread, in order to prevent
        // this sample from terminating immediately
        DateTime endProcessing = DateTime.Now.Add(timeSpan);
        while (DateTime.Now < endProcessing)
            await Task.Delay(100);

        // stop processing once the task completion source was completed.
        await processor.StopProcessingAsync();
    }
    private static async Task MessageHandler(ProcessSessionMessageEventArgs args)
    {
        string body = args.Message.Body.ToString();
        Console.WriteLine($"Message received from session processor: {body}");

        // we can evaluate application logic and use that to determine how to settle the message.
        await args.CompleteMessageAsync(args.Message);

        // we can also set arbitrary session state using this receiver
        // the state is specific to the session, and not any particular message
        await args.SetSessionStateAsync(new BinaryData("some state"));
    }
    private static Task ErrorHandler(ProcessErrorEventArgs args)
    {
        // the error source tells me at what point in the processing an error occurred
        Console.WriteLine(args.ErrorSource);
        // the fully qualified namespace is available
        Console.WriteLine(args.FullyQualifiedNamespace);
        // as well as the entity path
        Console.WriteLine(args.EntityPath);
        Console.WriteLine(args.Exception.ToString());
        return Task.CompletedTask;
    }
}

Azure Queue Storage queues with .Net Core
 

Prerequisites

  • Nuget Package Azure.Storage.Queues;
  • Azure Storage Account previously created. Here named "storage account normal"

You can find the following project on GitHub.

Creating Queue Storage objects with .Net Core

Methods

public class StorageQueueObjects
{
    //-------------------------------------------------
    // Create the queue
    //-------------------------------------------------
    public async static Task<QueueClient> CreateQueue(string queueName, string connectionString)
    {
        // Instantiate a QueueClient which will be used to create and manipulate the queue
        QueueClient queueClient = new QueueClient(connectionString, queueName);

        // Create the queue
        await queueClient.CreateIfNotExistsAsync();

        return queueClient;
    }

    //-------------------------------------------------
    // Delete the queue
    //-------------------------------------------------
    public async static Task<Azure.Response<bool>> DeleteQueue(string queueName, string connectionString)
    {
        // Instantiate a QueueClient which will be used to manipulate the queue
        QueueClient queueClient = new QueueClient(connectionString, queueName);

        // Delete the queue
        return await queueClient.DeleteIfExistsAsync();
    }
}

Creating the Objects

private readonly static string ConnectionString = "DefaultEndpointsProtocol=https;AccountName=storageaccountnormal;AccountKey=ypYvvkSb93K4ca3YYNvh92+oyOZZLmn6eJh0TBHc5h54DtFd22sOLGtgjeu2/30AyF948HaT125nbEj4HxCvWQ==;EndpointSuffix=core.windows.net";
private readonly static string QueueName = "samplestoragequeue";
static async Task Main(string[] args)
{
    Console.WriteLine("Hello World!");
    QueueClient queue = await StorageQueueObjects.CreateQueue(QueueName, ConnectionString);
}

Result

Storage

Receiving and Publishing to Azure Queue storage with .Net core

Methods

public class SendAndReceiveMessage
{
    //-------------------------------------------------
    // Send a message to the queue
    //-------------------------------------------------
    public static async Task<Azure.Response<SendReceipt>> InsertMessage(QueueClient queueClient, string message)
    {
        return await queueClient.SendMessageAsync(message);
    }
    //-------------------------------------------------
    // Peek at a message in the queue
    //-------------------------------------------------
    public static async Task<Azure.Response<PeekedMessage[]>> PeekMessage(QueueClient queueClient)
    {
        return await queueClient.PeekMessagesAsync();
    }
    //-------------------------------------------------
    // Process and remove one message from the queue
    //-------------------------------------------------
    public static async Task<QueueMessage[]> DequeueMessage(QueueClient queueClient)
    {
        // Get the next message
        QueueMessage[] retrievedMessage = await queueClient.ReceiveMessagesAsync();

        // Delete the message
        await queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);

        return retrievedMessage;
    }

    //-----------------------------------------------------
    // Process and remove multiple messages from the queue
    //-----------------------------------------------------
    public static async Task<QueueMessage[]> DequeueMessages(QueueClient queueClient)
    {
        // Receive and process 20 messages
        QueueMessage[] receivedMessages = await queueClient.ReceiveMessagesAsync(20, TimeSpan.FromMinutes(5));
        foreach (QueueMessage message in receivedMessages)
        {
            // Delete the message
            await queueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
        }

        return receivedMessages;
    }
}

Receiving and Publishing Messages

private readonly static string ConnectionString = "DefaultEndpointsProtocol=https;AccountName=storageaccountnormal;AccountKey=ypYvvkSb93K4ca3YYNvh92+oyOZZLmn6eJh0TBHc5h54DtFd22sOLGtgjeu2/30AyF948HaT125nbEj4HxCvWQ==;EndpointSuffix=core.windows.net";
private readonly static string QueueName = "samplestoragequeue";
static async Task Main(string[] args)
{   
    Console.WriteLine("Hello World!");
    QueueClient queue = await StorageQueueObjects.CreateQueue(QueueName, ConnectionString);
    //insert 50 messages
    for (int i = 0; i < 50; i++)
    {   
        await SendAndReceiveMessage.InsertMessage(queue, "Message Number " + i);
    }   
    PeekedMessage[] peekedMessages = await SendAndReceiveMessage.PeekMessage(queue);
    peekedMessages.ToList().ForEach(x => Console.WriteLine("Message peeked: " + x.Body));

    QueueMessage[] message = await SendAndReceiveMessage.DequeueMessage(queue);
    Console.WriteLine("Single Message dequeued: " + message[0].Body);

    QueueMessage[] messages = await SendAndReceiveMessage.DequeueMessages(queue);
    messages.ToList().ForEach(x => Console.WriteLine("Message dequeued: " + x.Body));
}

Result

Microsoft

List

External References


Similar Articles