Unit Testing The Azure Cosmos DB Change Feed In xUnit And C#

Introduction 

 
Writing basic units for Azure Functions triggered by the Change Feed is straightforward with xUnit
 
Unit Testing the Azure Cosmos DB Change Feed in xUnit and C#
 
While refactoring some of our microservices at work, I came across a service that didn’t have any unit tests for them! This service uses the Azure Cosmos DB Change Feed to listen to one of our write-optimized containers related to customers. If a new customer is created in that container, we then pick up that Customer document and insert it into a read-optimized container (acting as an aggregate store) which has a read friendly partition key value.
 
This read-optimized container is then utilized by other services within our pipeline when we need to query the aggregate for our Customer data.
Most of our services are triggered by message brokers (Event Hubs, Service Bus, etc.) so our process for unit testing these services is pretty standardized. But for whatever reason, this service didn’t have any unit tests, which is pretty bad in my opinion. So I’d thought I’d have a crack at it.
 
Turns out, it’s fairly straightforward.
 
So in this article, I’m going to show you how straightforward it is to write basic unit tests for an Azure Function that is triggered by the Azure Cosmos DB Change Feed using xUnit. This is the unit testing framework we use at work and I also use it in my side projects as well since it’s free, open-source, and super easy to get your head around.
 

Wait, What is the Change Feed again?

 
The Azure Cosmos DB Change Feed is a persistent record of changes that take place in a container in the order that they occur. It listens to any changes in a container and then outputs a sorted list of documents that were changed in the order in which they were modified.
 

Let’s dive in!

 
Let’s imagine that we’re working with our Customer containers, we have an Azure Function that uses a CosmosDB trigger to listen to changes within our container and inserts the documents into our read-optimized containers. It might look something like this:
  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Threading.Tasks;  
  4. using Microsoft.Azure.Documents;  
  5. using Microsoft.Azure.WebJobs;  
  6. using Microsoft.Extensions.Logging;  
  7. using Newtonsoft.Json;  
  8. using PizzaParlour.Core.Models;  
  9. using PizzaParlour.CustomerManager.Aggregate.Repositories;  
  10.   
  11. namespace PizzaParlour.CustomerManager.Aggregate.Functions  
  12. {  
  13.     public class CustomerFeed  
  14.     {  
  15.         private readonly ILogger<CustomerFeed> _logger;  
  16.         private readonly ICustomerAggregateRepository _customerAggregateRepository;  
  17.   
  18.         public CustomerFeed(  
  19.             ILogger<CustomerFeed> logger,  
  20.             ICustomerAggregateRepository customerAggregateRepository)  
  21.         {  
  22.             _logger = logger;  
  23.             _customerAggregateRepository = customerAggregateRepository;  
  24.         }  
  25.   
  26.         [FunctionName(nameof(CustomerFeed))]  
  27.         public async Task Run([CosmosDBTrigger(  
  28.             databaseName: "PizzaParlourDB",  
  29.             collectionName: "Customers",  
  30.             ConnectionStringSetting = "CosmosDBConnectionString",  
  31.             LeaseCollectionName = "leases",  
  32.             CreateLeaseCollectionIfNotExists = true,  
  33.             LeaseCollectionPrefix = "Customers")]IReadOnlyList<Document> input)  
  34.         {  
  35.             try  
  36.             {  
  37.                 if (input != null && input.Count > 0)  
  38.                 {  
  39.                     foreach (var document in input)  
  40.                     {  
  41.                         var customer = JsonConvert.DeserializeObject<Customer>(document.ToString());  
  42.   
  43.                         await _customerAggregateRepository.UpsertCustomer(customer);  
  44.                     }  
  45.                 }  
  46.             }  
  47.             catch (Exception ex)  
  48.             {  
  49.                 _logger.LogError($"Exception thrown: {ex.Message}");  
  50.                 throw;  
  51.             }              
  52.         }  
  53.     }  
  54. }  
 This function does the following:
  • Listens to the ‘Customers’ container inside the ‘PizzaParlourDB’ database.
  • Creates a lease collection if it doesn’t exist. This controls the checkpoint of our Change Feed.
  • Creates a list of documents. This will be ordered in the order that the documents were modified.
  • If there are documents in the list, the Function iterates through each document, attempts to deserialize the document into a Customer object, and then upserts that Customer into our read-optimized store.
This is a much-simplified version of the service that we developed at work (not going to share that with you, sorry?). But since we’re working with a simple function here, we can write a simple unit test for it.
 

Writing our Change Feed Unit Test

 
Let’s write up a simple unit test for our Change Feed function. We could write the following:
  1. using Microsoft.Azure.Documents;  
  2. using Microsoft.Extensions.Logging;  
  3. using Moq;  
  4. using Newtonsoft.Json;  
  5. using PizzaParlour.Core.Models;  
  6. using PizzaParlour.CustomerManager.Aggregate.Functions;  
  7. using PizzaParlour.CustomerManager.Aggregate.Repositories;  
  8. using PizzaParlour.CustomerManager.Aggregate.UnitTests.Helpers;  
  9. using System.Collections.Generic;  
  10. using System.IO;  
  11. using System.Threading.Tasks;  
  12. using Xunit;  
  13.   
  14. namespace PizzaParlour.CustomerManager.Aggregate.UnitTests.FunctionTests  
  15. {  
  16.     public class CustomerFeedShould  
  17.     {  
  18.         private Mock<ILogger<CustomerFeed>> _loggerMock;  
  19.         private Mock<ICustomerAggregateRepository> _customerAggregateRepoMock;  
  20.   
  21.         private CustomerFeed _func;  
  22.   
  23.         public CustomerFeedShould()  
  24.         {  
  25.             _loggerMock = new Mock<ILogger<CustomerFeed>>();  
  26.             _customerAggregateRepoMock = new Mock<ICustomerAggregateRepository>();  
  27.   
  28.             _func = new CustomerFeed(  
  29.                 _loggerMock.Object,  
  30.                 _customerAggregateRepoMock.Object);  
  31.         }  
  32.   
  33.         [Fact]  
  34.         public async Task UpsertNewDocument()  
  35.         {  
  36.             // Arrange  
  37.             var documentList = new List<Document>();  
  38.             var testCustomer = TestDataGenerator.GenerateCustomer();  
  39.             var customerDocument = ConvertCustomerObjectToDocument(testCustomer);  
  40.             documentList.Add(customerDocument);  
  41.   
  42.             // Act  
  43.             await _func.Run(documentList);  
  44.   
  45.             // Assert  
  46.             _customerAggregateRepoMock.Verify(  
  47.                 r => r.UpsertCustomer(It.IsAny<Customer>()), Times.Once);  
  48.         }  
  49.   
  50.         private Document ConvertCustomerObjectToDocument(Customer customer)  
  51.         {  
  52.             var customerJSON = JsonConvert.SerializeObject(customer);  
  53.             var document = new Document();  
  54.             document.LoadFrom(new JsonTextReader(new StringReader(customerJSON)));  
  55.   
  56.             return document;  
  57.         }  
  58.     }  
  59. }  
Let’s step through our UpsertNewDocument() test,
 
First, we create a new list of Documents. This will be our list that we use to invoke our Function. We then want to generate a test customer to add to our list. Before we can add this test customer to our list, we’ll need to convert it to a Document.
 
For this purpose, I’ve got a private method that takes a Customer object, serializes it to JSON, and then loads that JSON object into our Document and returns it.
 
Once our customer has been converted into a Document type, we then add it to our list. We then pass our list with our Customer document to invoke the function.
 
In my function, I have a Repository class that takes care of the Upsert logic for me, so I’m just verifying that the Repository method fires at least once, since I only have one Customer document in my list.
 

Conclusion

 
This was a very basic example of how we can write unit tests for our Change Feed functions. Depending on how you are using and configuring Change Feed functions, your tests might need to be more comprehensive than this basic sample.
 
Hopefully, this gives you some guidance on how you can write Unit tests for Azure Functions that are triggered by the Change Feed.
The code shown is taken from this sample on my GitHub if you wanted to refer to it.
 
If you have any questions, please feel free to comment below or reach out to me on Twitter.