Use SQS To Offload Asynchonous Tasks

Introduction

 
Among the many goals of user-facing applications is performance. Whether designing a web site or a back-end service, users expect near-instant response times. HTTP response caching, application data caching, content-delivery networks, etc. are common methods to improve application performance.  While reviewing the steps involved in responding to a user request, identify any operations that can be offloaded to an asynchronous process. Good candidates include,
  • Audit and logging messages
  • Backend requests that kick off asynchronous processes like workflow
  • Requests to external systems, like SMS message requests or a request for a credit check
This article illustrates how to integrate with Amazon's Simple Queue Service (SQS) using .NET Core and Lambda Functions. This article assumes basic familiarity with the Amazon Web Services (AWS) administration console. You will need to download and install the AWS SDK for .NET and configure it with your AWS credentials. Once you install the SDK, open Visual Studio and select the Create New Profile icon.
 
 AWS Explorer
 
Enter your AWS Access Key ID and Secret Access Key.
 
Edit AWS Profile 

Designing for a SQS Standard Queue 

 
After creating an account at aws.amazon.com, log in and navigate to the SQS console and select "Create Queue". You're presented with a choice between a Standard Queue and a FIFO Queue. 
  • Standard Queue - Messages may not be delivered in the order they are added to the queue and there may be duplicates. Messages can be processed directly be a Lambda Function using a push approach. Provides higher throughput than FIFO queues.
  • FIFO Queue - Messages are delivered in the order they are added and there are no duplicates. An EC2 instance or Docker container running a Windows Service or Linux daemon needs to pull messages from the queue.
Standard queues require less infrastructure, but more application design consideration. An audit message should only be logged once even though a duplicate could be pushed to the Lambda. The queue message object includes a unique message id that can be used as a unique index in a DynamoDB table or a relational database table.
 
Message order is also a challenge. Adding a date time property to the message and set it before sending it to the queue. This provides a column that can be used to present the messages in a report in the order they were created even if they were received out of sequence.
  1. [DynamoDBTable("AuditMessages")]  
  2. [JsonObject]  
  3. public class AuditMessage  
  4. {  
  5.     [DynamoDBHashKey]  
  6.     [DynamoDBProperty(AttributeName = "messageId")]  
  7.     [JsonProperty(PropertyName = "messageId", NullValueHandling = NullValueHandling.Ignore)]  
  8.     public string MessageId { getset; }  
  9.   
  10.     [DynamoDBProperty(AttributeName = "auditText")]  
  11.     [JsonProperty(PropertyName = "auditText", NullValueHandling = NullValueHandling.Ignore)]  
  12.     public string AuditText { getset; }  
  13.   
  14.     [DynamoDBRangeKey]         
  15.     [DynamoDBProperty(AttributeName = "auditTime")]  
  16.     [JsonProperty(PropertyName ="auditTime")]  
  17.     public DateTime AuditTime { getset; }  
  18. }  
The AuditMessage class attributes are decorated with attributes that bind it the AuditMessages DynamoDB table. While this uses DynamoDB, another implementation could use the EntityFramework to bind the class definition to a relational table. 
  1. public class FunctionTest  
  2.   {  
  3.       // Change this to your region  
  4.       private string AWS_REGION = "us-east-1";  
  5.   
  6.       [Fact]  
  7.       public async Task CreateTable()  
  8.       {  
  9.           using (var client = new AmazonDynamoDBClient(Amazon.RegionEndpoint.GetBySystemName(AWS_REGION)))  
  10.           {  
  11.               string tableName = "AuditMessages";  
  12.   
  13.               // Attribute definitions  
  14.               var attributeDefinitions = new List<AttributeDefinition>()  
  15.               {  
  16.                   {new AttributeDefinition{  
  17.                       AttributeName = "messageId",  
  18.                       AttributeType = "S"}},  
  19.                   {new AttributeDefinition(){  
  20.                       AttributeName = "auditTime",  
  21.                       AttributeType = "S"}  
  22.                   }  
  23.               };  
  24.   
  25.               // Table key schema  
  26.               var tableKeySchema = new List<KeySchemaElement>()  
  27.               {  
  28.                   {new KeySchemaElement {  
  29.                       AttributeName = "messageId",  
  30.                       KeyType = "HASH"}},  
  31.                   {new KeySchemaElement {  
  32.                       AttributeName = "auditTime",  
  33.                       KeyType = "RANGE"}}  
  34.               };  
  35.   
  36.               CreateTableRequest createTableRequest = new CreateTableRequest  
  37.               {  
  38.                   TableName = tableName,  
  39.                   ProvisionedThroughput = new ProvisionedThroughput  
  40.                   {  
  41.                       ReadCapacityUnits = (long)5,  
  42.                       WriteCapacityUnits = (long)1  
  43.                   },  
  44.                   AttributeDefinitions = attributeDefinitions,  
  45.                   KeySchema = tableKeySchema  
  46.               };  
  47.   
  48.               CreateTableResponse response = await client.CreateTableAsync(createTableRequest);  
  49.           }  
  50.       }  
This unit test scaffolds the AuditMessages DynamoDB table to store the AuditMessage property values. Replace the AWS_REGION constant with the name of the AWS region in which you created your AWS account.
  1. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]  
  2. namespace SampleSqs.AuditReceiver  
  3. {  
  4.     public class Function  
  5.     {  
  6.         private RegionEndpoint _awsRegion = null;  
  7.   
  8.         public Function()  
  9.         {  
  10.             _awsRegion = Amazon.RegionEndpoint.GetBySystemName(System.Environment.GetEnvironmentVariable("AWS_REGION"));     
  11.         }  
  12.   
  13.         public async Task FunctionHandler(SQSEvent evnt, ILambdaContext context)  
  14.         {  
  15.             List<Exception> errors = new List<Exception>();  
  16.             context.Logger.LogLine($"Retrieved {evnt.Records.Count} message(s)");  
  17.   
  18.             foreach(var message in evnt.Records)  
  19.             {  
  20.                 try  
  21.                 {  
  22.                     context.Logger.LogLine($"Processing message {message.MessageId}");  
  23.                     await ProcessMessageAsync(message, context);  
  24.                 }  
  25.                 catch(Exception ex)  
  26.                 {  
  27.                     string errMsg = $"Error processing message {message.MessageId}";  
  28.                     errors.Add(new Exception(errMsg, ex));  
  29.                 }  
  30.             }  
  31.   
  32.             if (errors.Any())  
  33.                 throw new AggregateException(errors);  
  34.         }  
  35.   
  36.         private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context)  
  37.         {  
  38.             context.Logger.LogLine($"Processed message body {message.Body}");  
  39.   
  40.             AuditMessage auditMessage = JsonConvert.DeserializeObject<AuditMessage>(message.Body);  
  41.             auditMessage.MessageId = message.MessageId;  
  42.   
  43.             using (var dynamoContext = new DynamoDBContext(new AmazonDynamoDBClient(_awsRegion)))  
  44.             {  
  45.                 await dynamoContext.SaveAsync<AuditMessage>(auditMessage);  
  46.                 context.Logger.LogLine($"New audit message {message.MessageId}");  
  47.             }  
  48.         }  
  49.     }  
  50. }  
This is the code of the Lambda function that processes messages from the SQS queue. The AWS_REGION environment attribute is provided by Amazon. It contains the name of the AWS region in which the Lambda is deployed. Note that it's used when creating the DynamoDBContext on line 45. The AuditMessage DynamoDBTable property saves having to provide another configuration entry. It supplies the name of the DynamoDB table to the dynamoContext instance.
 
The SaveAsync method on line 45 will handle duplicate requests. If the message has already been written to the table, then it is updated.
 
After executing this code, open the AWS console and navigate to the DynamoDB dashboard. Select the AuditMessages table and copy the Amazon Resource Number (ARN) to Notepad. You'll need it later when configuring role permissions.
 
Table Details

Lambda Error Management

 
If you're dealing with a relational table, you'll need to handle the exception and not rethrow it. If you're using node.js or Python to create Lambdas that process SQS messages, then you need to explicitly delete the message after handling it. Otherwise, the message remains in the queue. The opposite applies to .NET Core Lambdas. AWS automatically deletes SQS messages processed by .NET Core Lambdas unless the Lambda throws an exception.
 
The example code above captures errors and then rethrows them in an AggregateException which puts all messages in the batch back into the queue, even if they were processed. Unfortunately, there is no elegant solution here. If one message fails, the batch fails.
 

Configure the SQS Queue

 
The auditmessagequeue is the destination of the audit messages and will be connected to the Lambda function. The ReceiveMessageWaitTimeSeconds attribute is set to the maximum limit of 20 seconds. By default, this is set to zero (0) seconds which causes SQS to use short polling.  While this nearly eliminates the time any message spends in the queue, it unnecessarily burns through Lambda executions. The AWS free tier allows for one million executions before charging a nominal rate. When I first configured a queue, I left the default value and found I used most of my free executions within a week.
 
As a best practice, set the ReceiveMessageWaitTime to more than 0. The Lambda will only be invoked when a message appears in the queue. At most, the message waits the number of seconds applied in the setting before being pushed to the Lambda function.
  1. [Fact]  
  2. public async Task CreateQueueAsync()  
  3. {  
  4.     string queueName = "auditmessagequeue";  
  5.     CreateQueueRequest deadLetterRequest = new CreateQueueRequest(string.Concat(queueName, "-deadletter"));  
  6.     deadLetterRequest.Attributes = new Dictionary<stringstring>();  
  7.     deadLetterRequest.Attributes.Add(QueueAttributeName.ReceiveMessageWaitTimeSeconds, "20");  
  8.     deadLetterRequest.Attributes.Add(QueueAttributeName.MessageRetentionPeriod, "864000");  
  9.   
  10.     string deadLetterArn = null;  
  11.     using (AmazonSQSClient sqsClient = new AmazonSQSClient(Amazon.RegionEndpoint.GetBySystemName(FunctionTest.AWS_REGION)))  
  12.     {              
  13.         var createResponse = await sqsClient.CreateQueueAsync(deadLetterRequest);  
  14.         GetQueueAttributesRequest queueReq = new GetQueueAttributesRequest();  
  15.         queueReq.QueueUrl = createResponse.QueueUrl;  
  16.         queueReq.AttributeNames.Add(QueueAttributeName.All);  
  17.         var queueAttribs = await sqsClient.GetQueueAttributesAsync(queueReq);                 
  18.         deadLetterArn = queueAttribs.QueueARN;  
  19.     }  
  20.   
  21.   
  22.     string redrivePolicy = $"{{\"deadLetterTargetArn\":\"{deadLetterArn}\",\"maxReceiveCount\":5}}";  
  23.   
  24.     CreateQueueRequest createQueueRequest = new CreateQueueRequest();  
  25.   
  26.     createQueueRequest.QueueName = queueName;  
  27.     createQueueRequest.Attributes = new Dictionary<stringstring>();  
  28.     createQueueRequest.Attributes.Add(QueueAttributeName.RedrivePolicy, redrivePolicy);  
  29.     createQueueRequest.Attributes.Add(QueueAttributeName.ReceiveMessageWaitTimeSeconds, "20");  
  30.   
  31.     using (AmazonSQSClient sqsClient = new AmazonSQSClient(Amazon.RegionEndpoint.GetBySystemName(FunctionTest.AWS_REGION)))  
  32.     {  
  33.         var createResponse = await sqsClient.CreateQueueAsync(createQueueRequest);  
  34.     }  
  35. }  
 This creation process runs through two steps:
  1. Create the auditmessagequeue-deadletter queue. This is where the audit message lands if it cannot be processed in the auditmessagequeue as per the RedrivePolicy setting  (line 22 and 28) on the auditmessagequeue. It will remain in the dead letter queue for ten days per the MessageRetentionPeriod setting on line 8. 
  2. Create the auditmessagequeue
After executing this logic, navigate to the SQS dashboard in the AWS console. Locate the auditmessagequeue and copy the Amazon Resource Number to Notepad. You'll need this in the next section where you grant access to the Lambda function to read from the queue.
 
SQS List 

Configuring the AuditReceiver Lambda Function

 
At this point, the queue is deployed along with the DynamoDB table. Next comes the AuditReceiver Lambda function. Before we deploy it and wire it into the queue, the function needs an IAM role with rights to read from the queue. Browse to the IAM dashboard in the AWS console, select Roles, then select Create Role.

Create the Role 

  1. On the first Create Role panel, select Lambda and click the Next: Permission button on the bottom of the screen.

    Create Lambda Role

  2. On the next screen, search for the AWSLambdaBasicExecutionRole. Select it and click the Next: Tags button at the bottom of the page. This will grant access to CloudWatch logs and other basic resources required for Lambda functionality.

    AWSLambdaBasicExecutionRole

  3. On the Tags screen, click the Next: Review button at the bottom of the screen.
  4. Enter lambda_auditmessagequeuereader for the role name and click Create Role at the bottom of the screen.

    Create Lambda Role
While this configures basic rights for the Lambda, it does not yet grant access the to queue. To grant rights to read from the queue, navigate to the newly created role and grant the minimum rights needed to process queue messages.
  1. In the IAM dashboard, select Roles and locate the lambda_auditmessagequeuereader role. Click the role.

    Find Role 

  2. Select Add Inline Policy.
    Select Add Inline Policy
  3. Select the SQS Service

    Select SQS Service

  4. Expand Actions and select GetQueueAttributes, ReceiveMessages, and DeleteMessages.

    Select Actions

  5. Expand Resources and select Add ARN.

    Add ARN

  6. Assign the ARN of the queue that was copied to Notepad from the prior steps taken when creating the queue and click Add.

    Assign Queue ARN

  7. One the next page, enter AuditQueuePolicy as the policy name and click Create Policy.
  8. Open the lambda_auditmessagequeuereader role again.
  9. Select Add Inline Police.
  10. Select the DynamoDB Service.
    Select DynamoDB Service

  11. Expand Actions and select DescribeTable, PutItem, and UpdateItem.

    Select DynamoDB Actions

  12. Expand Resources and select Add ARN,
  13. Apply the AuditMessages DynamoDB ARN that was copied in an earlier step after the DynamoDB table was created earlier and click Add.

    Apply DynamoDB ARN

  14. Enter AuditMessage_DynamoDBAccess for the policy name and click Create Policy.
Now that the role is in place, we're ready to deploy the Lambda.

Deploy the AuditReceiver Lambda 

  1. In Visual Studio, right click on the SampleSqs.AuditReceiver project and select Publish to AWS Lambda...
  2. Accept the default settings on the first page and click Next.
  3. On this page find lambda_auditmessagequeuereader role that was created in the prior steps.

    Select Lambda Role
  4. Click the Upload button.

Connect the AuditReceiver Lambda to the Queue

 
The queue is configured and the lambda function is deployed. The following code adds the SQS as an event source that can trigger the Lambda Function
  1.     [Fact]  
  2.      public async Task ConfigureLambdaWithQueueAsync()  
  3.      {  
  4.          string queueArn = null;  
  5.   
  6.          using (AmazonSQSClient sqsClient = new AmazonSQSClient(Amazon.RegionEndpoint.GetBySystemName(FunctionTest.AWS_REGION)))  
  7.          {  
  8.              GetQueueUrlRequest queueUrlReq = new GetQueueUrlRequest();  
  9.              queueUrlReq.QueueName = FunctionTest.QUEUE_NAME;  
  10.              GetQueueUrlResponse getQueueUrlResp = await sqsClient.GetQueueUrlAsync(queueUrlReq);  
  11.              GetQueueAttributesRequest queueAttribReq = new GetQueueAttributesRequest();  
  12.              queueAttribReq.AttributeNames.Add(QueueAttributeName.QueueArn);  
  13.              queueAttribReq.QueueUrl = getQueueUrlResp.QueueUrl;  
  14.              var queueAttribResp = await sqsClient.GetQueueAttributesAsync(queueAttribReq);  
  15.              queueArn = queueAttribResp.QueueARN;  
  16.          }  
  17.   
  18.          using (AmazonLambdaClient lambdaClient = new AmazonLambdaClient(Amazon.RegionEndpoint.GetBySystemName(FunctionTest.AWS_REGION)))  
  19.          {  
  20.              CreateEventSourceMappingRequest eventMappingReq = new CreateEventSourceMappingRequest();  
  21.              eventMappingReq.FunctionName = "AuditReceiver";  
  22.              eventMappingReq.BatchSize = 10;  
  23.              eventMappingReq.Enabled = true;  
  24.              eventMappingReq.EventSourceArn = queueArn;  
  25.              await lambdaClient.CreateEventSourceMappingAsync(eventMappingReq);  
  26.          }  
  27.      }  
After executing this code, navigate to the AuditReceiver Lambda configuration in the AWS console. SQS show as a valid event trigger.
 
Lambda Final Config 

Sending the Audit Message

 
The pieces are finally in place to send the audit message to the queue. 
  1. [Fact]  
  2. public async Task PostMessageToQueueTestAsync()  
  3. {  
  4.     System.Environment.SetEnvironmentVariable("AWS_REGION", AWS_REGION);  
  5.     AuditMessage message = new AuditMessage();  
  6.   
  7.     message.AuditTime = DateTime.UtcNow;  
  8.     message.AuditText = "Message posted from unit test";  
  9.   
  10.    using(var sqsClient = new AmazonSQSClient(Amazon.RegionEndpoint.GetBySystemName(FunctionTest.AWS_REGION)))  
  11.    {  
  12.         GetQueueUrlRequest urlReq = new GetQueueUrlRequest();  
  13.         urlReq.QueueName = QUEUE_NAME;  
  14.         var queueUrlResp = await sqsClient.GetQueueUrlAsync(urlReq);  
  15.         string messageBody = JsonConvert.SerializeObject(message);  
  16.         var sendMessageResp = await sqsClient.SendMessageAsync(queueUrlResp.QueueUrl, messageBody);  
  17.    }  
  18.   
  19. }  
Note that the queue URL is obtained first followed by a message to sent the message. In a production application, this could be addressed with a configuration value or a distributed cache that stores the queue.
 
DynamoDB Entry 

Summary

 
Congratulations on making it to the end.
 
The steps outlined in this article touched on a number of AWS services including SQS, DynamoDB, IAM, and Lambda functions. These cloud-native services support a durable queuing and messaging architecture that can offload resource-intensive workloads to asynchronous processes using standard SQS queues and Lambda functions. In practice, DynamoDB inserts are just as performant as posts to SQS queues. If you are already inserting into DynamoDB in your user-facing application, this wouldn't improve your response time.
 
Rather than posting to a DynamoDB, you may use a queue to kick off a backend workflow process or post to an external service. In my experience, I've used this approach to send SMS requests via Twilio through a Lambda configured to receive push request from a standard SQS queue.
 
All of the code samples are available in the Visual Studio solution attached to this article.


Similar Articles