![CQRS]()
Previous Article: ASP.NET Core Future Trends 2025: MAUI, AI Integration, Blazor Hybrid & Cloud-Native Development (Part-35 of 40)
Table of Contents
Understanding the Problem
CQRS Fundamentals
MediatR Deep Dive
Basic Implementation
Advanced CQRS Patterns
Validation & Behavior
Performance Optimization
Event Sourcing Integration
Testing Strategies
Real-World E-Commerce Case Study
Microservices Integration
Production Best Practices
1. Understanding the Problem
The Traditional CRUD Challenges
Real-World Scenario: Imagine building an e-commerce platform where a single product page needs to:
// Traditional monolithic service approach
public class ProductService : IProductService
{
private readonly IProductRepository _productRepository;
private readonly IInventoryRepository _inventoryRepository;
private readonly IDiscountRepository _discountRepository;
private readonly IReviewRepository _reviewRepository;
private readonly IAnalyticsRepository _analyticsRepository;
public async Task<ProductDTO> GetProductAsync(int productId)
{
// Multiple database calls
var product = await _productRepository.GetByIdAsync(productId);
var inventory = await _inventoryRepository.GetByProductIdAsync(productId);
var discounts = await _discountRepository.GetActiveDiscountsAsync(productId);
var reviews = await _reviewRepository.GetByProductIdAsync(productId);
// Business logic scattered
var discountedPrice = CalculateDiscountedPrice(product.Price, discounts);
var stockStatus = CalculateStockStatus(inventory.Quantity);
var averageRating = CalculateAverageRating(reviews);
// Analytics tracking
await _analyticsRepository.TrackProductViewAsync(productId);
// Complex mapping
return new ProductDTO
{
Id = product.Id,
Name = product.Name,
Price = product.Price,
DiscountedPrice = discountedPrice,
StockStatus = stockStatus,
AverageRating = averageRating,
Reviews = reviews,
// ... more properties
};
}
// Similar complexity for write operations
public async Task UpdateProductAsync(ProductUpdateDTO update)
{
// Validation
if (update.Price < 0)
throw new ArgumentException("Price cannot be negative");
// Business rules
if (update.Price > 1000 && !await IsPremiumUserAsync(update.UpdatedBy))
throw new UnauthorizedAccessException("Only premium users can set high prices");
// Multiple updates
await _productRepository.UpdateAsync(update);
await _priceHistoryRepository.AddAsync(update);
await _auditRepository.LogUpdateAsync(update);
// Notifications
await _notificationService.NotifyPriceChangeAsync(update);
}
}
Problems with the Traditional Approach
Single Responsibility Violation: Services handle too many concerns
Performance Issues: Multiple database calls in a single operation
Complex Testing: Difficult to unit test individual behaviors
Tight Coupling: Changes affect multiple parts of the system
Scalability Challenges: Read and write operations compete for resources
2. CQRS Fundamentals
What is CQRS?
Command Query Responsibility Segregation is a pattern that separates read and write operations into different models.
// CQRS Core Concepts
public interface ICommand
{
// Marker interface for commands
}
public interface IQuery<TResponse>
{
// Marker interface for queries
}
public interface ICommandHandler<TCommand>
where TCommand : ICommand
{
Task Handle(TCommand command, CancellationToken cancellationToken);
}
public interface IQueryHandler<TQuery, TResponse>
where TQuery : IQuery<TResponse>
{
Task<TResponse> Handle(TQuery query, CancellationToken cancellationToken);
}
CQRS Architecture Patterns
// Level 1: Simple Separation
public class SimpleCQRS
{
// Commands - Write operations
public class CreateProductCommand : ICommand
{
public string Name { get; set; }
public decimal Price { get; set; }
public string Description { get; set; }
}
// Queries - Read operations
public class GetProductQuery : IQuery<ProductDTO>
{
public int ProductId { get; set; }
}
// Separate handlers
public class CreateProductCommandHandler : ICommandHandler<CreateProductCommand>
{
public async Task Handle(CreateProductCommand command, CancellationToken cancellationToken)
{
// Write logic only
var product = new Product(command.Name, command.Price, command.Description);
await _productRepository.AddAsync(product);
}
}
public class GetProductQueryHandler : IQueryHandler<GetProductQuery, ProductDTO>
{
public async Task<ProductDTO> Handle(GetProductQuery query, CancellationToken cancellationToken)
{
// Read logic only - optimized for reading
return await _productReadRepository.GetByIdAsync(query.ProductId);
}
}
}
When to Use CQRS
Perfect Scenarios :
High-traffic applications with different read/write patterns
Complex business logic requiring separation of concerns
Systems needing different data models for reads vs writes
Applications requiring event sourcing
Microservices architectures
Not Recommended For :
3. MediatR Deep Dive
MediatR Architecture
MediatR is a mediator pattern implementation for .NET that helps reduce coupling between components.
// MediatR Core Interfaces
public interface IMediator
{
Task<TResponse> Send<TResponse>(IRequest<TResponse> request, CancellationToken cancellationToken = default);
Task<object?> Send(object request, CancellationToken cancellationToken = default);
Task Publish(object notification, CancellationToken cancellationToken = default);
}
public interface IRequest<TResponse> { }
public interface IRequestHandler<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken);
}
public interface INotification { }
public interface INotificationHandler<TNotification>
where TNotification : INotification
{
Task Handle(TNotification notification, CancellationToken cancellationToken);
}
Setting Up MediatR
// Program.cs - Configuration
using MediatR;
var builder = WebApplication.CreateBuilder(args);
// Add MediatR
builder.Services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssemblyContaining<Program>();
// Add behaviors
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
});
// Add supporting services
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
var app = builder.Build();
MediatR Pipeline Behaviors
// Logging Behavior
public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;
public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger)
{
_logger = logger;
}
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
var requestName = typeof(TRequest).Name;
_logger.LogInformation("Handling {RequestName} with {@Request}", requestName, request);
try
{
var response = await next();
_logger.LogInformation("Handled {RequestName} successfully", requestName);
return response;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error handling {RequestName}", requestName);
throw;
}
}
}
// Validation Behavior
public class ValidationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
private readonly IEnumerable<IValidator<TRequest>> _validators;
public ValidationBehavior(IEnumerable<IValidator<TRequest>> validators)
{
_validators = validators;
}
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
if (_validators.Any())
{
var context = new ValidationContext<TRequest>(request);
var validationResults = await Task.WhenAll(
_validators.Select(v => v.ValidateAsync(context, cancellationToken)));
var failures = validationResults
.SelectMany(r => r.Errors)
.Where(f => f != null)
.ToList();
if (failures.Count != 0)
throw new ValidationException(failures);
}
return await next();
}
}
4. Basic Implementation
Complete E-Commerce Example
Let's implement a complete e-commerce system using CQRS and MediatR.
Project Structure
ECommerce.CQRS/
├── src/
│ ├── ECommerce.Application/
│ │ ├── Commands/
│ │ ├── Queries/
│ │ ├── Behaviors/
│ │ └── Models/
│ ├── ECommerce.Domain/
│ ├── ECommerce.Infrastructure/
│ └── ECommerce.API/
├── tests/
└── docker-compose.yml
Domain Models
// ECommerce.Domain/Entities/Product.cs
public class Product : Entity
{
public string Name { get; private set; }
public string Description { get; private set; }
public decimal Price { get; private set; }
public int StockQuantity { get; private set; }
public bool IsActive { get; private set; }
public DateTime CreatedAt { get; private set; }
public DateTime? UpdatedAt { get; private set; }
private Product() { } // For EF Core
public Product(string name, string description, decimal price, int stockQuantity)
{
Name = name ?? throw new ArgumentNullException(nameof(name));
Description = description ?? throw new ArgumentNullException(nameof(description));
Price = price >= 0 ? price : throw new ArgumentException("Price cannot be negative");
StockQuantity = stockQuantity >= 0 ? stockQuantity :
throw new ArgumentException("Stock quantity cannot be negative");
IsActive = true;
CreatedAt = DateTime.UtcNow;
}
public void UpdatePrice(decimal newPrice)
{
if (newPrice < 0)
throw new ArgumentException("Price cannot be negative");
Price = newPrice;
UpdatedAt = DateTime.UtcNow;
}
public void UpdateStock(int quantity)
{
if (quantity < 0)
throw new ArgumentException("Stock quantity cannot be negative");
StockQuantity = quantity;
UpdatedAt = DateTime.UtcNow;
}
public void Deactivate()
{
IsActive = false;
UpdatedAt = DateTime.UtcNow;
}
}
// ECommerce.Domain/Common/Entity.cs
public abstract class Entity
{
public int Id { get; protected set; }
private readonly List<IDomainEvent> _domainEvents = new();
public IReadOnlyCollection<IDomainEvent> DomainEvents => _domainEvents.AsReadOnly();
protected void AddDomainEvent(IDomainEvent eventItem)
{
_domainEvents.Add(eventItem);
}
public void ClearDomainEvents()
{
_domainEvents.Clear();
}
}
// ECommerce.Domain/Common/IDomainEvent.cs
public interface IDomainEvent : INotification
{
DateTime OccurredOn { get; }
}
Commands Implementation
// ECommerce.Application/Commands/CreateProductCommand.cs
public class CreateProductCommand : IRequest<CreateProductResponse>
{
public string Name { get; set; }
public string Description { get; set; }
public decimal Price { get; set; }
public int StockQuantity { get; set; }
public CreateProductCommand(string name, string description, decimal price, int stockQuantity)
{
Name = name;
Description = description;
Price = price;
StockQuantity = stockQuantity;
}
}
public class CreateProductResponse
{
public int ProductId { get; set; }
public string Name { get; set; }
public DateTime CreatedAt { get; set; }
public CreateProductResponse(int productId, string name, DateTime createdAt)
{
ProductId = productId;
Name = name;
CreatedAt = createdAt;
}
}
// ECommerce.Application/Commands/CreateProductCommandValidator.cs
public class CreateProductCommandValidator : AbstractValidator<CreateProductCommand>
{
public CreateProductCommandValidator()
{
RuleFor(x => x.Name)
.NotEmpty().WithMessage("Product name is required")
.MaximumLength(100).WithMessage("Product name cannot exceed 100 characters");
RuleFor(x => x.Description)
.NotEmpty().WithMessage("Product description is required")
.MaximumLength(500).WithMessage("Description cannot exceed 500 characters");
RuleFor(x => x.Price)
.GreaterThanOrEqualTo(0).WithMessage("Price cannot be negative")
.LessThanOrEqualTo(1000000).WithMessage("Price seems too high");
RuleFor(x => x.StockQuantity)
.GreaterThanOrEqualTo(0).WithMessage("Stock quantity cannot be negative");
}
}
// ECommerce.Application/Commands/CreateProductCommandHandler.cs
public class CreateProductCommandHandler : IRequestHandler<CreateProductCommand, CreateProductResponse>
{
private readonly IProductRepository _productRepository;
private readonly ILogger<CreateProductCommandHandler> _logger;
public CreateProductCommandHandler(
IProductRepository productRepository,
ILogger<CreateProductCommandHandler> logger)
{
_productRepository = productRepository;
_logger = logger;
}
public async Task<CreateProductResponse> Handle(
CreateProductCommand request,
CancellationToken cancellationToken)
{
_logger.LogInformation("Creating product: {ProductName}", request.Name);
// Business logic
var product = new Product(
request.Name,
request.Description,
request.Price,
request.StockQuantity);
// Persist
await _productRepository.AddAsync(product);
await _productRepository.SaveChangesAsync(cancellationToken);
_logger.LogInformation("Product created with ID: {ProductId}", product.Id);
return new CreateProductResponse(product.Id, product.Name, product.CreatedAt);
}
}
Queries Implementation
// ECommerce.Application/Queries/GetProductQuery.cs
public class GetProductQuery : IRequest<ProductDTO>
{
public int ProductId { get; set; }
public GetProductQuery(int productId)
{
ProductId = productId;
}
}
public class ProductDTO
{
public int Id { get; set; }
public string Name { get; set; }
public string Description { get; set; }
public decimal Price { get; set; }
public int StockQuantity { get; set; }
public bool IsActive { get; set; }
public DateTime CreatedAt { get; set; }
public DateTime? UpdatedAt { get; set; }
public string StockStatus { get; set; }
}
// ECommerce.Application/Queries/GetProductQueryHandler.cs
public class GetProductQueryHandler : IRequestHandler<GetProductQuery, ProductDTO>
{
private readonly IProductReadRepository _productReadRepository;
private readonly ILogger<GetProductQueryHandler> _logger;
public GetProductQueryHandler(
IProductReadRepository productReadRepository,
ILogger<GetProductQueryHandler> logger)
{
_productReadRepository = productReadRepository;
_logger = logger;
}
public async Task<ProductDTO> Handle(
GetProductQuery request,
CancellationToken cancellationToken)
{
_logger.LogDebug("Retrieving product with ID: {ProductId}", request.ProductId);
var product = await _productReadRepository.GetByIdAsync(request.ProductId);
if (product == null)
{
_logger.LogWarning("Product with ID {ProductId} not found", request.ProductId);
throw new ProductNotFoundException(request.ProductId);
}
// Map to DTO and calculate derived properties
return new ProductDTO
{
Id = product.Id,
Name = product.Name,
Description = product.Description,
Price = product.Price,
StockQuantity = product.StockQuantity,
IsActive = product.IsActive,
CreatedAt = product.CreatedAt,
UpdatedAt = product.UpdatedAt,
StockStatus = CalculateStockStatus(product.StockQuantity)
};
}
private string CalculateStockStatus(int stockQuantity)
{
return stockQuantity switch
{
0 => "Out of Stock",
< 10 => "Low Stock",
_ => "In Stock"
};
}
}
// Custom exception
public class ProductNotFoundException : Exception
{
public ProductNotFoundException(int productId)
: base($"Product with ID {productId} was not found.")
{
}
public ProductNotFoundException(string message) : base(message)
{
}
}
API Controllers
csharp
// ECommerce.API/Controllers/ProductsController.cs
[ApiController]
[Route("api/[controller]")]
public class ProductsController : ControllerBase
{
private readonly IMediator _mediator;
private readonly ILogger<ProductsController> _logger;
public ProductsController(IMediator mediator, ILogger<ProductsController> logger)
{
_mediator = mediator;
_logger = logger;
}
[HttpPost]
[ProducesResponseType(typeof(CreateProductResponse), StatusCodes.Status201Created)]
[ProducesResponseType(StatusCodes.Status400BadRequest)]
public async Task<ActionResult<CreateProductResponse>> CreateProduct(
[FromBody] CreateProductCommand command)
{
_logger.LogInformation("Creating new product: {ProductName}", command.Name);
var result = await _mediator.Send(command);
return CreatedAtAction(
nameof(GetProduct),
new { id = result.ProductId },
result);
}
[HttpGet("{id}")]
[ProducesResponseType(typeof(ProductDTO), StatusCodes.Status200OK)]
[ProducesResponseType(StatusCodes.Status404NotFound)]
public async Task<ActionResult<ProductDTO>> GetProduct(int id)
{
var query = new GetProductQuery(id);
var product = await _mediator.Send(query);
return Ok(product);
}
[HttpGet]
[ProducesResponseType(typeof(PaginatedList<ProductSummaryDTO>), StatusCodes.Status200OK)]
public async Task<ActionResult<PaginatedList<ProductSummaryDTO>>> GetProducts(
[FromQuery] GetProductsQuery query)
{
var result = await _mediator.Send(query);
return Ok(result);
}
}
5. Advanced CQRS Patterns
Domain Events with MediatR
csharp
// ECommerce.Domain/Events/ProductCreatedEvent.cs
public class ProductCreatedEvent : IDomainEvent
{
public int ProductId { get; }
public string ProductName { get; }
public decimal Price { get; }
public DateTime OccurredOn { get; }
public ProductCreatedEvent(int productId, string productName, decimal price)
{
ProductId = productId;
ProductName = productName;
Price = price;
OccurredOn = DateTime.UtcNow;
}
}
// ECommerce.Application/DomainEventHandlers/ProductCreatedEventHandler.cs
public class ProductCreatedEventHandler : INotificationHandler<ProductCreatedEvent>
{
private readonly ILogger<ProductCreatedEventHandler> _logger;
private readonly IEmailService _emailService;
private readonly IAnalyticsRepository _analyticsRepository;
public ProductCreatedEventHandler(
ILogger<ProductCreatedEventHandler> logger,
IEmailService emailService,
IAnalyticsRepository analyticsRepository)
{
_logger = logger;
_emailService = emailService;
_analyticsRepository = analyticsRepository;
}
public async Task Handle(ProductCreatedEvent notification, CancellationToken cancellationToken)
{
_logger.LogInformation(
"Handling product created event for product: {ProductName}",
notification.ProductName);
// Multiple handlers can process the same event
await Task.WhenAll(
SendAdminNotification(notification, cancellationToken),
TrackAnalytics(notification, cancellationToken),
UpdateSearchIndex(notification, cancellationToken)
);
}
private async Task SendAdminNotification(ProductCreatedEvent notification, CancellationToken cancellationToken)
{
try
{
await _emailService.SendProductCreatedNotificationAsync(
notification.ProductId,
notification.ProductName);
_logger.LogInformation("Admin notification sent for product {ProductId}", notification.ProductId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send admin notification for product {ProductId}", notification.ProductId);
// Don't throw - other handlers should still execute
}
}
private async Task TrackAnalytics(ProductCreatedEvent notification, CancellationToken cancellationToken)
{
await _analyticsRepository.TrackProductCreationAsync(
notification.ProductId,
notification.Price);
}
private async Task UpdateSearchIndex(ProductCreatedEvent notification, CancellationToken cancellationToken)
{
// Update search index asynchronously
await Task.Delay(100, cancellationToken); // Simulate work
_logger.LogInformation("Search index updated for product {ProductId}", notification.ProductId);
}
}
// Updated command handler to publish domain events
public class CreateProductCommandHandler : IRequestHandler<CreateProductCommand, CreateProductResponse>
{
private readonly IProductRepository _productRepository;
private readonly ILogger<CreateProductCommandHandler> _logger;
private readonly IPublisher _publisher;
public CreateProductCommandHandler(
IProductRepository productRepository,
ILogger<CreateProductCommandHandler> logger,
IPublisher publisher)
{
_productRepository = productRepository;
_logger = logger;
_publisher = publisher;
}
public async Task<CreateProductResponse> Handle(
CreateProductCommand request,
CancellationToken cancellationToken)
{
var product = new Product(
request.Name,
request.Description,
request.Price,
request.StockQuantity);
await _productRepository.AddAsync(product);
await _productRepository.SaveChangesAsync(cancellationToken);
// Publish domain events
foreach (var domainEvent in product.DomainEvents)
{
await _publisher.Publish(domainEvent, cancellationToken);
}
product.ClearDomainEvents();
return new CreateProductResponse(product.Id, product.Name, product.CreatedAt);
}
}
Separate Read/Write Models
// Write Model (Domain)
public class Product : Entity
{
// Rich behavior and business rules
public void ApplyDiscount(decimal percentage)
{
if (percentage < 0 || percentage > 100)
throw new ArgumentException("Discount percentage must be between 0 and 100");
Price = Price * (100 - percentage) / 100;
AddDomainEvent(new ProductPriceChangedEvent(Id, Price));
}
}
// Read Model (DTOs optimized for reading)
public class ProductDetailView
{
public int Id { get; set; }
public string Name { get; set; }
public string Description { get; set; }
public decimal Price { get; set; }
public decimal DiscountedPrice { get; set; }
public string CategoryName { get; set; }
public double AverageRating { get; set; }
public int ReviewCount { get; set; }
public string[] ImageUrls { get; set; }
public Dictionary<string, string> Specifications { get; set; }
public ProductVariantDTO[] Variants { get; set; }
}
// Separate read repository
public interface IProductReadRepository
{
Task<ProductDetailView> GetProductDetailAsync(int productId);
Task<PaginatedList<ProductSummaryView>> GetProductsAsync(ProductQuery query);
Task<List<ProductSearchResult>> SearchProductsAsync(string searchTerm);
}
// Implementation with Dapper for performance
public class ProductReadRepository : IProductReadRepository
{
private readonly IDbConnection _connection;
public ProductReadRepository(IDbConnection connection)
{
_connection = connection;
}
public async Task<ProductDetailView> GetProductDetailAsync(int productId)
{
const string sql = @"
SELECT
p.Id, p.Name, p.Description, p.Price,
c.Name as CategoryName,
COALESCE(AVG(r.Rating), 0) as AverageRating,
COUNT(r.Id) as ReviewCount,
-- Complex joins for optimized reading
FROM Products p
LEFT JOIN Categories c ON p.CategoryId = c.Id
LEFT JOIN Reviews r ON p.Id = r.ProductId
LEFT JOIN ProductImages pi ON p.Id = pi.ProductId
WHERE p.Id = @ProductId AND p.IsActive = 1
GROUP BY p.Id, p.Name, p.Description, p.Price, c.Name";
return await _connection.QuerySingleOrDefaultAsync<ProductDetailView>(sql, new { ProductId = productId });
}
}
6. Validation & Behavior
Advanced Validation Pipeline
// FluentValidation validators
public class UpdateProductCommandValidator : AbstractValidator<UpdateProductCommand>
{
public UpdateProductCommandValidator()
{
RuleFor(x => x.ProductId)
.GreaterThan(0).WithMessage("Product ID must be positive");
RuleFor(x => x.Name)
.NotEmpty().When(x => x.Name != null)
.MaximumLength(100).WithMessage("Name cannot exceed 100 characters");
RuleFor(x => x.Price)
.GreaterThanOrEqualTo(0).When(x => x.Price.HasValue)
.WithMessage("Price cannot be negative");
RuleFor(x => x.StockQuantity)
.GreaterThanOrEqualTo(0).When(x => x.StockQuantity.HasValue)
.WithMessage("Stock quantity cannot be negative");
// Cross-property validation
RuleFor(x => x)
.Must(x => !(x.Price.HasValue && x.Price.Value == 0 && x.StockQuantity > 0))
.WithMessage("Products with stock cannot have zero price")
.OverridePropertyName("Price");
}
}
// Custom validation behavior
public class CustomValidationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
private readonly IEnumerable<IValidator<TRequest>> _validators;
private readonly ILogger<CustomValidationBehavior<TRequest, TResponse>> _logger;
public CustomValidationBehavior(
IEnumerable<IValidator<TRequest>> validators,
ILogger<CustomValidationBehavior<TRequest, TResponse>> logger)
{
_validators = validators;
_logger = logger;
}
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
var requestName = typeof(TRequest).Name;
if (_validators.Any())
{
_logger.LogDebug("Validating request {RequestName}", requestName);
var context = new ValidationContext<TRequest>(request);
var validationResults = await Task.WhenAll(
_validators.Select(v => v.ValidateAsync(context, cancellationToken)));
var failures = validationResults
.SelectMany(r => r.Errors)
.Where(f => f != null)
.ToList();
if (failures.Count != 0)
{
_logger.LogWarning(
"Validation failed for {RequestName} with {ErrorCount} errors",
requestName, failures.Count);
throw new CustomValidationException(failures);
}
}
return await next();
}
}
// Custom validation exception
public class CustomValidationException : Exception
{
public IDictionary<string, string[]> Errors { get; }
public CustomValidationException(IEnumerable<ValidationFailure> failures)
: base("Validation failed")
{
Errors = failures
.GroupBy(e => e.PropertyName, e => e.ErrorMessage)
.ToDictionary(failureGroup => failureGroup.Key, failureGroup => failureGroup.ToArray());
}
}
Authorization Behavior
// Authorization behavior
public class AuthorizationBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
private readonly ICurrentUserService _currentUserService;
private readonly IIdentityService _identityService;
private readonly ILogger<AuthorizationBehavior<TRequest, TResponse>> _logger;
public AuthorizationBehavior(
ICurrentUserService currentUserService,
IIdentityService identityService,
ILogger<AuthorizationBehavior<TRequest, TResponse>> logger)
{
_currentUserService = currentUserService;
_identityService = identityService;
_logger = logger;
}
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
// Check for authorization attributes
var authorizeAttributes = request.GetType()
.GetCustomAttributes<AuthorizeAttribute>()
.ToList();
if (authorizeAttributes.Any())
{
// Ensure user is authenticated
if (_currentUserService.UserId == null)
{
throw new UnauthorizedAccessException("User is not authenticated");
}
// Check roles
var authorizeAttributesWithRoles = authorizeAttributes
.Where(a => !string.IsNullOrWhiteSpace(a.Roles))
.ToList();
if (authorizeAttributesWithRoles.Any())
{
var authorized = false;
foreach (var roles in authorizeAttributesWithRoles.Select(a => a.Roles.Split(',')))
{
foreach (var role in roles)
{
var isInRole = await _identityService.IsInRoleAsync(
_currentUserService.UserId, role.Trim());
if (isInRole)
{
authorized = true;
break;
}
}
}
if (!authorized)
{
_logger.LogWarning(
"User {UserId} is not authorized for request {RequestName}",
_currentUserService.UserId, typeof(TRequest).Name);
throw new UnauthorizedAccessException("User is not authorized");
}
}
// Check policy
var authorizeAttributesWithPolicies = authorizeAttributes
.Where(a => !string.IsNullOrWhiteSpace(a.Policy))
.ToList();
if (authorizeAttributesWithPolicies.Any())
{
foreach (var policy in authorizeAttributesWithPolicies.Select(a => a.Policy))
{
var authorized = await _identityService.AuthorizeAsync(
_currentUserService.UserId, policy);
if (!authorized)
{
_logger.LogWarning(
"User {UserId} failed policy {Policy} for request {RequestName}",
_currentUserService.UserId, policy, typeof(TRequest).Name);
throw new UnauthorizedAccessException("Policy requirement failed");
}
}
}
}
return await next();
}
}
// Usage with authorization
[Authorize(Roles = "Admin,ProductManager")]
public class CreateProductCommand : IRequest<CreateProductResponse>
{
// Command properties
}
[Authorize(Policy = "CanUpdateProductPrice")]
public class UpdateProductPriceCommand : IRequest<Unit>
{
public int ProductId { get; set; }
public decimal NewPrice { get; set; }
}
7. Performance Optimization
Caching Strategies
// Cache behavior
public class CachingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : ICacheableRequest
{
private readonly IDistributedCache _cache;
private readonly ILogger<CachingBehavior<TRequest, TResponse>> _logger;
public CachingBehavior(
IDistributedCache cache,
ILogger<CachingBehavior<TRequest, TResponse>> logger)
{
_cache = cache;
_logger = logger;
}
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
var cacheKey = request.GetCacheKey();
try
{
// Try to get from cache
var cachedResponse = await _cache.GetStringAsync(cacheKey, cancellationToken);
if (cachedResponse != null)
{
_logger.LogDebug("Cache hit for {CacheKey}", cacheKey);
return JsonSerializer.Deserialize<TResponse>(cachedResponse);
}
_logger.LogDebug("Cache miss for {CacheKey}", cacheKey);
// Execute request
var response = await next();
// Cache the response
var cacheOptions = new DistributedCacheEntryOptions
{
AbsoluteExpirationRelativeToNow = request.GetCacheDuration()
};
await _cache.SetStringAsync(
cacheKey,
JsonSerializer.Serialize(response),
cacheOptions,
cancellationToken);
return response;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in caching behavior for {CacheKey}", cacheKey);
// If caching fails, still proceed with the request
return await next();
}
}
}
// Cacheable request interface
public interface ICacheableRequest
{
string GetCacheKey();
TimeSpan? GetCacheDuration();
}
// Cacheable query example
public class GetProductQuery : IRequest<ProductDTO>, ICacheableRequest
{
public int ProductId { get; set; }
public string GetCacheKey() => $"product_{ProductId}";
public TimeSpan? GetCacheDuration() => TimeSpan.FromMinutes(30);
}
Query Optimization
// Optimized query with projections
public class GetProductsQuery : IRequest<PaginatedList<ProductSummaryDTO>>, ICacheableRequest
{
public int PageNumber { get; set; } = 1;
public int PageSize { get; set; } = 20;
public string Category { get; set; }
public decimal? MinPrice { get; set; }
public decimal? MaxPrice { get; set; }
public string SortBy { get; set; } = "name";
public bool SortDescending { get; set; }
public string GetCacheKey()
{
var key = $"products_page_{PageNumber}_size_{PageSize}";
if (!string.IsNullOrEmpty(Category)) key += $"_cat_{Category}";
if (MinPrice.HasValue) key += $"_min_{MinPrice}";
if (MaxPrice.HasValue) key += $"_max_{MaxPrice}";
key += $"_sort_{SortBy}_{(SortDescending ? "desc" : "asc")}";
return key;
}
public TimeSpan? GetCacheDuration() => TimeSpan.FromMinutes(5);
}
// Optimized query handler
public class GetProductsQueryHandler : IRequestHandler<GetProductsQuery, PaginatedList<ProductSummaryDTO>>
{
private readonly IProductReadRepository _readRepository;
private readonly ILogger<GetProductsQueryHandler> _logger;
public GetProductsQueryHandler(
IProductReadRepository readRepository,
ILogger<GetProductsQueryHandler> logger)
{
_readRepository = readRepository;
_logger = logger;
}
public async Task<PaginatedList<ProductSummaryDTO>> Handle(
GetProductsQuery request,
CancellationToken cancellationToken)
{
var (products, totalCount) = await _readRepository.GetProductsAsync(
new ProductQuery
{
PageNumber = request.PageNumber,
PageSize = request.PageSize,
Category = request.Category,
MinPrice = request.MinPrice,
MaxPrice = request.MaxPrice,
SortBy = request.SortBy,
SortDescending = request.SortDescending
});
return new PaginatedList<ProductSummaryDTO>(
products,
totalCount,
request.PageNumber,
request.PageSize);
}
}
// Paginated list
public class PaginatedList<T>
{
public List<T> Items { get; }
public int PageNumber { get; }
public int TotalPages { get; }
public int TotalCount { get; }
public bool HasPreviousPage => PageNumber > 1;
public bool HasNextPage => PageNumber < TotalPages;
public PaginatedList(List<T> items, int count, int pageNumber, int pageSize)
{
PageNumber = pageNumber;
TotalPages = (int)Math.Ceiling(count / (double)pageSize);
TotalCount = count;
Items = items;
}
}
8. Event Sourcing Integration
Event Sourced Aggregates
// Event-sourced product aggregate
public class EventSourcedProduct : AggregateRoot
{
public string Name { get; private set; }
public string Description { get; private set; }
public decimal Price { get; private set; }
public int StockQuantity { get; private set; }
public bool IsActive { get; private set; }
// Constructor for creating new aggregate
public EventSourcedProduct(string name, string description, decimal price, int stockQuantity)
{
if (string.IsNullOrWhiteSpace(name))
throw new ArgumentException("Product name cannot be empty", nameof(name));
Apply(new ProductCreatedEvent(Guid.NewGuid(), name, description, price, stockQuantity));
}
// Constructor for rebuilding from events
private EventSourcedProduct() { }
public void UpdatePrice(decimal newPrice)
{
if (newPrice < 0)
throw new ArgumentException("Price cannot be negative");
if (newPrice != Price)
{
Apply(new ProductPriceUpdatedEvent(Id, newPrice, Price));
}
}
public void UpdateStock(int quantity)
{
if (quantity < 0)
throw new ArgumentException("Stock quantity cannot be negative");
if (quantity != StockQuantity)
{
Apply(new ProductStockUpdatedEvent(Id, quantity, StockQuantity));
}
}
// Event application methods
protected override void When(object @event)
{
switch (@event)
{
case ProductCreatedEvent e:
Id = e.ProductId;
Name = e.Name;
Description = e.Description;
Price = e.Price;
StockQuantity = e.StockQuantity;
IsActive = true;
break;
case ProductPriceUpdatedEvent e:
Price = e.NewPrice;
break;
case ProductStockUpdatedEvent e:
StockQuantity = e.NewQuantity;
break;
case ProductDeactivatedEvent e:
IsActive = false;
break;
}
}
// Ensure business rules
protected override void EnsureValidState()
{
if (string.IsNullOrWhiteSpace(Name))
throw new InvalidOperationException("Product must have a name");
if (Price < 0)
throw new InvalidOperationException("Price cannot be negative");
if (StockQuantity < 0)
throw new InvalidOperationException("Stock quantity cannot be negative");
}
}
// Base aggregate root
public abstract class AggregateRoot
{
private readonly List<object> _changes = new();
public Guid Id { get; protected set; }
public int Version { get; private set; } = -1;
public IReadOnlyCollection<object> GetUncommittedChanges() => _changes.AsReadOnly();
public void MarkChangesAsCommitted() => _changes.Clear();
protected void Apply(object @event)
{
When(@event);
EnsureValidState();
_changes.Add(@event);
}
public void LoadFromHistory(IEnumerable<object> history)
{
foreach (var @event in history)
{
When(@event);
Version++;
}
}
protected abstract void When(object @event);
protected abstract void EnsureValidState();
}
Event Store Implementation
// Event store interface
public interface IEventStore
{
Task SaveAsync(Guid aggregateId, IEnumerable<object> events, int expectedVersion);
Task<List<object>> GetEventsAsync(Guid aggregateId);
Task<bool> AggregateExistsAsync(Guid aggregateId);
}
// Event store implementation
public class EventStore : IEventStore
{
private readonly IEventStoreRepository _repository;
private readonly ILogger<EventStore> _logger;
public EventStore(IEventStoreRepository repository, ILogger<EventStore> logger)
{
_repository = repository;
_logger = logger;
}
public async Task SaveAsync(Guid aggregateId, IEnumerable<object> events, int expectedVersion)
{
var eventData = events.Select(@event => new EventData
{
Id = Guid.NewGuid(),
AggregateId = aggregateId,
EventType = @event.GetType().Name,
EventData = JsonSerializer.Serialize(@event, @event.GetType()),
Timestamp = DateTime.UtcNow,
Version = ++expectedVersion
}).ToList();
await _repository.AppendEventsAsync(eventData);
_logger.LogInformation(
"Saved {EventCount} events for aggregate {AggregateId}",
eventData.Count, aggregateId);
}
public async Task<List<object>> GetEventsAsync(Guid aggregateId)
{
var eventData = await _repository.GetEventsAsync(aggregateId);
return eventData.Select(DeserializeEvent).ToList();
}
private object DeserializeEvent(EventData eventData)
{
var eventType = Type.GetType($"ECommerce.Domain.Events.{eventData.EventType}, ECommerce.Domain");
if (eventType == null)
throw new InvalidOperationException($"Unknown event type: {eventData.EventType}");
return JsonSerializer.Deserialize(eventData.EventData, eventType);
}
}
9. Testing Strategies
Unit Testing Commands and Queries
// Unit tests for command handler
public class CreateProductCommandHandlerTests
{
private readonly Mock<IProductRepository> _productRepositoryMock;
private readonly Mock<ILogger<CreateProductCommandHandler>> _loggerMock;
private readonly Mock<IPublisher> _publisherMock;
private readonly CreateProductCommandHandler _handler;
public CreateProductCommandHandlerTests()
{
_productRepositoryMock = new Mock<IProductRepository>();
_loggerMock = new Mock<ILogger<CreateProductCommandHandler>>();
_publisherMock = new Mock<IPublisher>();
_handler = new CreateProductCommandHandler(
_productRepositoryMock.Object,
_loggerMock.Object,
_publisherMock.Object);
}
[Fact]
public async Task Handle_ValidCommand_ShouldCreateProduct()
{
// Arrange
var command = new CreateProductCommand(
"Test Product",
"Test Description",
99.99m,
10);
_productRepositoryMock
.Setup(r => r.AddAsync(It.IsAny<Product>()))
.Returns(Task.CompletedTask);
_productRepositoryMock
.Setup(r => r.SaveChangesAsync(It.IsAny<CancellationToken>()))
.Returns(Task.CompletedTask);
// Act
var result = await _handler.Handle(command, CancellationToken.None);
// Assert
result.Should().NotBeNull();
result.ProductId.Should().BeGreaterThan(0);
result.Name.Should().Be(command.Name);
_productRepositoryMock.Verify(
r => r.AddAsync(It.Is<Product>(p =>
p.Name == command.Name &&
p.Price == command.Price)),
Times.Once);
_productRepositoryMock.Verify(
r => r.SaveChangesAsync(It.IsAny<CancellationToken>()),
Times.Once);
}
[Fact]
public async Task Handle_InvalidPrice_ShouldThrowException()
{
// Arrange
var command = new CreateProductCommand(
"Test Product",
"Test Description",
-10m, // Invalid price
10);
// Act & Assert
await Assert.ThrowsAsync<ArgumentException>(() =>
_handler.Handle(command, CancellationToken.None));
}
}
// Integration tests
public class ProductCommandsIntegrationTests : IClassFixture<WebApplicationFactory<Program>>
{
private readonly WebApplicationFactory<Program> _factory;
private readonly HttpClient _client;
public ProductCommandsIntegrationTests(WebApplicationFactory<Program> factory)
{
_factory = factory;
_client = factory.CreateClient();
}
[Fact]
public async Task CreateProduct_ValidRequest_ShouldReturnCreated()
{
// Arrange
var command = new CreateProductCommand(
"Integration Test Product",
"Integration Test Description",
49.99m,
25);
// Act
var response = await _client.PostAsJsonAsync("/api/products", command);
// Assert
response.StatusCode.Should().Be(HttpStatusCode.Created);
var content = await response.Content.ReadAsStringAsync();
var result = JsonSerializer.Deserialize<CreateProductResponse>(content,
new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
result.Should().NotBeNull();
result.ProductId.Should().BeGreaterThan(0);
}
}
Behavior Testing
// Testing pipeline behaviors
public class ValidationBehaviorTests
{
[Fact]
public async Task Handle_ValidRequest_ShouldPassThrough()
{
// Arrange
var validators = new List<IValidator<TestCommand>>
{
new TestCommandValidator()
};
var behavior = new ValidationBehavior<TestCommand, TestResponse>(validators);
var command = new TestCommand { Name = "Valid Name", Value = 10 };
// Act
var result = await behavior.Handle(
command,
() => Task.FromResult(new TestResponse()),
CancellationToken.None);
// Assert
result.Should().NotBeNull();
}
[Fact]
public async Task Handle_InvalidRequest_ShouldThrowValidationException()
{
// Arrange
var validators = new List<IValidator<TestCommand>>
{
new TestCommandValidator()
};
var behavior = new ValidationBehavior<TestCommand, TestResponse>(validators);
var command = new TestCommand { Name = "", Value = -1 }; // Invalid
// Act & Assert
await Assert.ThrowsAsync<ValidationException>(() =>
behavior.Handle(
command,
() => Task.FromResult(new TestResponse()),
CancellationToken.None));
}
}
// Test command and validator
public class TestCommand : IRequest<TestResponse>
{
public string Name { get; set; }
public int Value { get; set; }
}
public class TestResponse
{
public string Result { get; set; } = "Success";
}
public class TestCommandValidator : AbstractValidator<TestCommand>
{
public TestCommandValidator()
{
RuleFor(x => x.Name).NotEmpty();
RuleFor(x => x.Value).GreaterThan(0);
}
}
10. Real-World E-Commerce Case Study
Complete Order Processing System
// Order processing commands
public class CreateOrderCommand : IRequest<CreateOrderResponse>
{
public int CustomerId { get; set; }
public List<OrderItemDTO> Items { get; set; }
public ShippingAddressDTO ShippingAddress { get; set; }
public PaymentMethodDTO PaymentMethod { get; set; }
}
public class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, CreateOrderResponse>
{
private readonly IOrderRepository _orderRepository;
private readonly IProductRepository _productRepository;
private readonly IPublisher _publisher;
private readonly ILogger<CreateOrderCommandHandler> _logger;
public CreateOrderCommandHandler(
IOrderRepository orderRepository,
IProductRepository productRepository,
IPublisher publisher,
ILogger<CreateOrderCommandHandler> logger)
{
_orderRepository = orderRepository;
_productRepository = productRepository;
_publisher = publisher;
_logger = logger;
}
public async Task<CreateOrderResponse> Handle(
CreateOrderCommand request,
CancellationToken cancellationToken)
{
_logger.LogInformation("Creating order for customer {CustomerId}", request.CustomerId);
// Validate products and inventory
var productIds = request.Items.Select(i => i.ProductId).ToList();
var products = await _productRepository.GetByIdsAsync(productIds);
var orderItems = new List<OrderItem>();
foreach (var item in request.Items)
{
var product = products.FirstOrDefault(p => p.Id == item.ProductId);
if (product == null)
throw new ProductNotFoundException(item.ProductId);
if (product.StockQuantity < item.Quantity)
throw new InsufficientStockException(product.Id, product.StockQuantity, item.Quantity);
orderItems.Add(new OrderItem(product.Id, product.Name, product.Price, item.Quantity));
}
// Create order
var order = new Order(
request.CustomerId,
orderItems,
MapToShippingAddress(request.ShippingAddress),
MapToPaymentMethod(request.PaymentMethod));
// Reserve inventory
foreach (var item in orderItems)
{
var product = products.First(p => p.Id == item.ProductId);
product.UpdateStock(product.StockQuantity - item.Quantity);
}
// Save changes
await _orderRepository.AddAsync(order);
await _productRepository.SaveChangesAsync(cancellationToken);
await _orderRepository.SaveChangesAsync(cancellationToken);
// Publish events
foreach (var domainEvent in order.DomainEvents)
{
await _publisher.Publish(domainEvent, cancellationToken);
}
_logger.LogInformation("Order {OrderId} created successfully", order.Id);
return new CreateOrderResponse(order.Id, order.TotalAmount, order.Status);
}
}
// Order created event and handlers
public class OrderCreatedEvent : IDomainEvent
{
public int OrderId { get; }
public int CustomerId { get; }
public decimal TotalAmount { get; }
public DateTime OccurredOn { get; }
public OrderCreatedEvent(int orderId, int customerId, decimal totalAmount)
{
OrderId = orderId;
CustomerId = customerId;
TotalAmount = totalAmount;
OccurredOn = DateTime.UtcNow;
}
}
// Multiple event handlers for order created
public class OrderCreatedEventHandler : INotificationHandler<OrderCreatedEvent>
{
private readonly IEmailService _emailService;
private readonly ILogger<OrderCreatedEventHandler> _logger;
public OrderCreatedEventHandler(
IEmailService emailService,
ILogger<OrderCreatedEventHandler> logger)
{
_emailService = emailService;
_logger = logger;
}
public async Task Handle(OrderCreatedEvent notification, CancellationToken cancellationToken)
{
try
{
await _emailService.SendOrderConfirmationAsync(
notification.OrderId,
notification.CustomerId);
_logger.LogInformation("Order confirmation sent for order {OrderId}", notification.OrderId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to send order confirmation for order {OrderId}", notification.OrderId);
}
}
}
public class UpdateInventoryEventHandler : INotificationHandler<OrderCreatedEvent>
{
private readonly IInventoryService _inventoryService;
private readonly ILogger<UpdateInventoryEventHandler> _logger;
public UpdateInventoryEventHandler(
IInventoryService inventoryService,
ILogger<UpdateInventoryEventHandler> logger)
{
_inventoryService = inventoryService;
_logger = logger;
}
public async Task Handle(OrderCreatedEvent notification, CancellationToken cancellationToken)
{
await _inventoryService.ReserveInventoryForOrderAsync(notification.OrderId);
_logger.LogInformation("Inventory reserved for order {OrderId}", notification.OrderId);
}
}
public class ProcessPaymentEventHandler : INotificationHandler<OrderCreatedEvent>
{
private readonly IPaymentService _paymentService;
private readonly ILogger<ProcessPaymentEventHandler> _logger;
public ProcessPaymentEventHandler(
IPaymentService paymentService,
ILogger<ProcessPaymentEventHandler> logger)
{
_paymentService = paymentService;
_logger = logger;
}
public async Task Handle(OrderCreatedEvent notification, CancellationToken cancellationToken)
{
var paymentResult = await _paymentService.ProcessPaymentAsync(notification.OrderId);
if (paymentResult.Success)
{
_logger.LogInformation("Payment processed successfully for order {OrderId}", notification.OrderId);
// Publish payment succeeded event
await _paymentService.PublishPaymentSucceededEvent(notification.OrderId);
}
else
{
_logger.LogWarning("Payment failed for order {OrderId}: {Error}",
notification.OrderId, paymentResult.ErrorMessage);
// Publish payment failed event
await _paymentService.PublishPaymentFailedEvent(notification.OrderId, paymentResult.ErrorMessage);
}
}
}
11. Microservices Integration
Cross-Service Communication
// Integration events
public interface IIntegrationEvent : INotification
{
Guid Id { get; }
DateTime OccurredOn { get; }
}
public class OrderCreatedIntegrationEvent : IIntegrationEvent
{
public Guid Id { get; }
public int OrderId { get; }
public int CustomerId { get; }
public decimal TotalAmount { get; }
public DateTime OccurredOn { get; }
public List<OrderItemDTO> Items { get; }
public OrderCreatedIntegrationEvent(int orderId, int customerId, decimal totalAmount, List<OrderItemDTO> items)
{
Id = Guid.NewGuid();
OrderId = orderId;
CustomerId = customerId;
TotalAmount = totalAmount;
OccurredOn = DateTime.UtcNow;
Items = items;
}
}
// Integration event handler in shipping service
public class OrderCreatedIntegrationEventHandler : INotificationHandler<OrderCreatedIntegrationEvent>
{
private readonly IShippingService _shippingService;
private readonly ILogger<OrderCreatedIntegrationEventHandler> _logger;
public OrderCreatedIntegrationEventHandler(
IShippingService shippingService,
ILogger<OrderCreatedIntegrationEventHandler> logger)
{
_shippingService = shippingService;
_logger = logger;
}
public async Task Handle(OrderCreatedIntegrationEvent notification, CancellationToken cancellationToken)
{
_logger.LogInformation(
"Processing shipping for order {OrderId}",
notification.OrderId);
try
{
var shippingRequest = new CreateShippingRequest
{
OrderId = notification.OrderId,
CustomerId = notification.CustomerId,
TotalWeight = CalculateTotalWeight(notification.Items),
ShippingAddress = notification.ShippingAddress
};
await _shippingService.CreateShippingAsync(shippingRequest);
_logger.LogInformation(
"Shipping created for order {OrderId}",
notification.OrderId);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Failed to create shipping for order {OrderId}",
notification.OrderId);
throw;
}
}
private decimal CalculateTotalWeight(List<OrderItemDTO> items)
{
// Calculate total weight based on items
return items.Sum(i => i.Quantity * i.UnitWeight);
}
}
Saga Pattern Implementation
// Order processing saga
public class OrderProcessingSaga :
IEventHandler<OrderCreatedEvent>,
IEventHandler<PaymentProcessedEvent>,
IEventHandler<InventoryReservedEvent>,
IEventHandler<ShippingCreatedEvent>
{
private readonly ISagaRepository _sagaRepository;
private readonly IMediator _mediator;
private readonly ILogger<OrderProcessingSaga> _logger;
public OrderProcessingSaga(
ISagaRepository sagaRepository,
IMediator mediator,
ILogger<OrderProcessingSaga> logger)
{
_sagaRepository = sagaRepository;
_mediator = mediator;
_logger = logger;
}
public async Task Handle(OrderCreatedEvent @event, CancellationToken cancellationToken)
{
_logger.LogInformation("Starting order processing saga for order {OrderId}", @event.OrderId);
var saga = new OrderSaga(@event.OrderId, @event.CustomerId, @event.TotalAmount);
await _sagaRepository.SaveAsync(saga);
// Start payment processing
await _mediator.Publish(new ProcessPaymentCommand(@event.OrderId, @event.TotalAmount));
}
public async Task Handle(PaymentProcessedEvent @event, CancellationToken cancellationToken)
{
var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId);
if (@event.Success)
{
saga.MarkPaymentProcessed();
await _sagaRepository.SaveAsync(saga);
// Proceed to inventory reservation
await _mediator.Publish(new ReserveInventoryCommand(@event.OrderId));
}
else
{
saga.MarkPaymentFailed(@event.ErrorMessage);
await _sagaRepository.SaveAsync(saga);
// Compensating action
await _mediator.Publish(new CancelOrderCommand(@event.OrderId, "Payment failed"));
}
}
public async Task Handle(InventoryReservedEvent @event, CancellationToken cancellationToken)
{
var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId);
saga.MarkInventoryReserved();
await _sagaRepository.SaveAsync(saga);
// Proceed to shipping
await _mediator.Publish(new CreateShippingCommand(@event.OrderId));
}
public async Task Handle(ShippingCreatedEvent @event, CancellationToken cancellationToken)
{
var saga = await _sagaRepository.GetByOrderIdAsync(@event.OrderId);
saga.MarkShippingCreated();
await _sagaRepository.SaveAsync(saga);
// Order processing completed
await _mediator.Publish(new CompleteOrderCommand(@event.OrderId));
_logger.LogInformation("Order processing saga completed for order {OrderId}", @event.OrderId);
}
}
12. Production Best Practices
Configuration and Dependency Injection
// Dependency injection setup
public static class DependencyInjection
{
public static IServiceCollection AddApplication(this IServiceCollection services)
{
// MediatR
services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssembly(Assembly.GetExecutingAssembly());
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(AuthorizationBehavior<,>));
cfg.AddBehavior(typeof(IPipelineBehavior<,>), typeof(CachingBehavior<,>));
});
// FluentValidation
services.AddValidatorsFromAssembly(Assembly.GetExecutingAssembly());
// Behaviors
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(AuthorizationBehavior<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(CachingBehavior<,>));
// Services
services.AddScoped<ICurrentUserService, CurrentUserService>();
services.AddScoped<IIdentityService, IdentityService>();
// Caching
services.AddDistributedMemoryCache(); // Or Redis in production
return services;
}
}
// Program.cs configuration
var builder = WebApplication.CreateBuilder(args);
// Add services
builder.Services.AddApplication();
builder.Services.AddInfrastructure(builder.Configuration);
builder.Services.AddWebServices();
// Configure logging
builder.Logging.AddConsole();
builder.Logging.AddDebug();
builder.Logging.AddApplicationInsights();
var app = builder.Build();
// Configure pipeline
if (app.Environment.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
else
{
app.UseExceptionHandler("/error");
app.UseHsts();
}
app.UseHttpsRedirection();
app.UseRouting();
app.UseAuthentication();
app.UseAuthorization();
app.MapControllers();
app.Run();
Monitoring and Diagnostics
// Enhanced logging behavior
public class DiagnosticBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
{
private readonly ILogger<DiagnosticBehavior<TRequest, TResponse>> _logger;
private readonly ICurrentUserService _currentUserService;
public DiagnosticBehavior(
ILogger<DiagnosticBehavior<TRequest, TResponse>> logger,
ICurrentUserService currentUserService)
{
_logger = logger;
_currentUserService = currentUserService;
}
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
var requestName = typeof(TRequest).Name;
var requestId = Guid.NewGuid();
var userId = _currentUserService.UserId ?? "Anonymous";
using var activity = DiagnosticsConfig.ActivitySource.StartActivity(requestName);
activity?.SetTag("request.id", requestId);
activity?.SetTag("user.id", userId);
using (_logger.BeginScope(new Dictionary<string, object>
{
["RequestId"] = requestId,
["RequestName"] = requestName,
["UserId"] = userId
}))
{
var stopwatch = Stopwatch.StartNew();
_logger.LogInformation(
"Handling request {RequestName} for user {UserId}",
requestName, userId);
try
{
var response = await next();
stopwatch.Stop();
_logger.LogInformation(
"Request {RequestName} completed in {ElapsedMilliseconds}ms",
requestName, stopwatch.ElapsedMilliseconds);
activity?.SetTag("duration", stopwatch.ElapsedMilliseconds);
activity?.SetStatus(ActivityStatusCode.Ok);
return response;
}
catch (Exception ex)
{
stopwatch.Stop();
_logger.LogError(
ex,
"Request {RequestName} failed after {ElapsedMilliseconds}ms",
requestName, stopwatch.ElapsedMilliseconds);
activity?.SetTag("duration", stopwatch.ElapsedMilliseconds);
activity?.SetStatus(ActivityStatusCode.Error);
activity?.RecordException(ex);
throw;
}
}
}
}
// Diagnostics configuration
public static class DiagnosticsConfig
{
public static readonly string ServiceName = "ECommerce.API";
public static readonly ActivitySource ActivitySource = new(ServiceName);
}
Performance Monitoring
// Performance monitoring behavior
public class PerformanceBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
{
private readonly ILogger<PerformanceBehavior<TRequest, TResponse>> _logger;
private readonly IPerformanceMetrics _performanceMetrics;
private readonly Stopwatch _timer;
public PerformanceBehavior(
ILogger<PerformanceBehavior<TRequest, TResponse>> logger,
IPerformanceMetrics performanceMetrics)
{
_logger = logger;
_performanceMetrics = performanceMetrics;
_timer = new Stopwatch();
}
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
_timer.Start();
var response = await next();
_timer.Stop();
var elapsedMilliseconds = _timer.ElapsedMilliseconds;
var requestName = typeof(TRequest).Name;
// Log slow requests
if (elapsedMilliseconds > 500) // Threshold: 500ms
{
_logger.LogWarning(
"Slow request detected: {RequestName} took {ElapsedMilliseconds}ms",
requestName, elapsedMilliseconds);
}
// Track metrics
_performanceMetrics.RecordRequestDuration(requestName, elapsedMilliseconds);
return response;
}
}
This comprehensive CQRS and MediatR masterclass provides a complete implementation guide for building scalable, maintainable ASP.NET Core applications. The patterns and practices demonstrated here will help you tackle complex business domains while maintaining clean, testable code architecture.