ASP.NET Core  

ASP.NET Core Microservices gRPC Message Brokers Architecture Guide (Part- 39 of 40)

Microservices

Previous article: ASP.NET Core FluentValidation & AutoMapper Guide | Clean Data & Robust APIs (Part-38 of 40)

 Table of Contents

  1. Microservices Architecture Fundamentals

  2. gRPC Deep Dive

  3. Building gRPC Services

  4. Advanced gRPC Patterns

  5. Message Brokers with RabbitMQ

  6. Event-Driven Architecture

  7. Service Discovery & API Gateway

  8. Containerization & Orchestration

  9. Distributed Data Management

  10. Observability & Monitoring

  11. Security in Microservices

  12. Real-World E-Commerce Platform

1. Microservices Architecture Fundamentals

The Monolith to Microservices Journey

Real-World Scenario: Imagine an e-commerce platform that started as a monolith but now faces scaling challenges:

  
    // Traditional Monolithic Architecture Problems
public class ECommerceMonolith
{
    // Tightly coupled components
    public async Task<OrderResult> ProcessOrderAsync(OrderRequest request)
    {
        // Inventory management
        var inventoryResult = await _inventoryService.CheckStockAsync(request.Items);
        if (!inventoryResult.Success)
            return OrderResult.Failed("Insufficient stock");
        
        // Payment processing
        var paymentResult = await _paymentService.ProcessPaymentAsync(request.Payment);
        if (!paymentResult.Success)
            return OrderResult.Failed("Payment failed");
        
        // Order creation
        var order = await _orderService.CreateOrderAsync(request);
        
        // Shipping calculation
        var shipping = await _shippingService.CalculateShippingAsync(order);
        
        // Notification
        await _notificationService.SendOrderConfirmationAsync(order);
        
        // Analytics
        await _analyticsService.TrackOrderAsync(order);
        
        return OrderResult.Success(order);
    }
}
  

Microservices Core Principles

  
    // Microservices Definition
public class MicroservicesPrinciples
{
    public List<string> KeyPrinciples = new()
    {
        "Single Responsibility: Each service focuses on one business capability",
        "Independent Deployment: Services can be deployed separately",
        "Decentralized Data Management: Each service owns its data",
        "Infrastructure Automation: CI/CD and containerization",
        "Design for Failure: Resilience and fault tolerance",
        "Evolutionary Design: Services can evolve independently"
    };
}
  

Benefits and Trade-offs

Benefits

  • Independent scaling of services

  • Technology diversity across services

  • Faster development and deployment cycles

  • Improved fault isolation

  • Better team autonomy

Challenges

  • Distributed system complexity

  • Data consistency challenges

  • Network latency and reliability

  • Operational overhead

  • Testing complexity

2. gRPC Deep Dive

What is gRPC?

gRPC is a modern, high-performance RPC (Remote Procedure Call) framework that can run in any environment.

  
    // Protobuf definition for Product Service
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

option csharp_namespace = "ProductService.Grpc";

service ProductService {
  rpc GetProduct (GetProductRequest) returns (ProductResponse);
  rpc CreateProduct (CreateProductRequest) returns (ProductResponse);
  rpc UpdateProduct (UpdateProductRequest) returns (ProductResponse);
  rpc DeleteProduct (DeleteProductRequest) returns (google.protobuf.Empty);
  rpc ListProducts (ListProductsRequest) returns (stream ProductResponse);
  rpc BulkCreateProducts (stream CreateProductRequest) returns (BulkCreateResponse);
}

message GetProductRequest {
  int32 product_id = 1;
}

message CreateProductRequest {
  string name = 1;
  string description = 2;
  double price = 3;
  int32 stock_quantity = 4;
  string category_id = 5;
}

message UpdateProductRequest {
  int32 product_id = 1;
  string name = 2;
  string description = 3;
  double price = 4;
  int32 stock_quantity = 5;
}

message DeleteProductRequest {
  int32 product_id = 1;
}

message ListProductsRequest {
  int32 page_size = 1;
  string page_token = 2;
  string filter = 3;
}

message ProductResponse {
  int32 product_id = 1;
  string name = 2;
  string description = 3;
  double price = 4;
  int32 stock_quantity = 5;
  string category_id = 6;
  google.protobuf.Timestamp created_at = 7;
  google.protobuf.Timestamp updated_at = 8;
  ProductStatus status = 9;
}

message BulkCreateResponse {
  int32 created_count = 1;
  repeated string errors = 2;
}

enum ProductStatus {
  ACTIVE = 0;
  INACTIVE = 1;
  OUT_OF_STOCK = 2;
  DISCONTINUED = 3;
}
  

gRPC vs REST Comparison

  
    // Communication Pattern Comparison
public class CommunicationPatterns
{
    public void CompareProtocols()
    {
        var restCharacteristics = new
        {
            Protocol = "HTTP/1.1",
            PayloadFormat = "JSON/XML",
            Communication = "Request-Response",
            Streaming = "Limited (SSE, WebSockets)",
            Performance = "Good for human-readable APIs",
            UseCases = "Public APIs, Browsers, Mobile Apps"
        };

        var grpcCharacteristics = new
        {
            Protocol = "HTTP/2",
            PayloadFormat = "Protocol Buffers (Binary)",
            Communication = "Multiple patterns",
            Streaming = "Full support (Client, Server, Bidirectional)",
            Performance = "High for service-to-service",
            UseCases = "Microservices, Mobile, Real-time"
        };
    }
}
  

HTTP/2 Benefits

  
    // HTTP/2 Features that power gRPC
public class Http2Features
{
    public List<string> KeyFeatures = new()
    {
        "Binary Framing: More efficient than text-based HTTP/1.1",
        "Multiplexing: Multiple requests over single connection",
        "Header Compression: HPACK reduces overhead",
        "Server Push: Server can push resources to client",
        "Stream Prioritization: Important requests get priority",
        "Flow Control: Prevents resource exhaustion"
    };
}
  

3. Building gRPC Services

Complete Product Service Implementation

Project Structure

  
    Microservices/
├── src/
│   ├── ProductService/
│   │   ├── Protos/
│   │   ├── Services/
│   │   ├── Models/
│   │   └── Data/
│   ├── OrderService/
│   ├── InventoryService/
│   └── ApiGateway/
├── contracts/
└── docker-compose.yml
  

Service Setup and Configuration

  
    // ProductService/Program.cs
using ProductService.Data;
using ProductService.Services;
using Microsoft.EntityFrameworkCore;

var builder = WebApplication.CreateBuilder(args);

// Add services to container
builder.Services.AddGrpc(options =>
{
    options.EnableDetailedErrors = builder.Environment.IsDevelopment();
    options.Interceptors.Add<ExceptionInterceptor>();
    options.Interceptors.Add<LoggingInterceptor>();
});

builder.Services.AddDbContext<ProductContext>(options =>
{
    options.UseSqlServer(builder.Configuration.GetConnectionString("ProductDatabase"));
});

builder.Services.AddScoped<IProductRepository, ProductRepository>();
builder.Services.AddSingleton<IServiceDiscovery, ConsulServiceDiscovery>();

// Health checks
builder.Services.AddHealthChecks()
    .AddDbContextCheck<ProductContext>()
    .AddUrlGroup(new Uri("http://localhost:5000/health"), "self");

// Distributed caching
builder.Services.AddStackExchangeRedisCache(options =>
{
    options.Configuration = builder.Configuration.GetConnectionString("Redis");
});

var app = builder.Build();

// Configure the HTTP request pipeline
app.MapGrpcService<ProductGrpcService>();
app.MapHealthChecks("/health");

// gRPC-Web for browser compatibility
app.MapGrpcService<ProductGrpcService>().EnableGrpcWeb();

app.MapGet("/", () => "Product gRPC Service is running!");

app.Run();
  

gRPC Service Implementation

  
    // ProductService/Services/ProductGrpcService.cs
using Grpc.Core;
using ProductService.Data;
using ProductService.Models;
using Google.Protobuf.WellKnownTypes;

namespace ProductService.Services;

public class ProductGrpcService : ProductService.Grpc.ProductService.ProductServiceBase
{
    private readonly IProductRepository _productRepository;
    private readonly ILogger<ProductGrpcService> _logger;
    private readonly ICacheService _cacheService;

    public ProductGrpcService(
        IProductRepository productRepository,
        ILogger<ProductGrpcService> logger,
        ICacheService cacheService)
    {
        _productRepository = productRepository;
        _logger = logger;
        _cacheService = cacheService;
    }

    public override async Task<ProductResponse> GetProduct(
        GetProductRequest request, 
        ServerCallContext context)
    {
        _logger.LogInformation("Getting product with ID: {ProductId}", request.ProductId);

        // Check cache first
        var cacheKey = $"product_{request.ProductId}";
        var cachedProduct = await _cacheService.GetAsync<ProductResponse>(cacheKey);
        
        if (cachedProduct != null)
        {
            _logger.LogDebug("Cache hit for product {ProductId}", request.ProductId);
            return cachedProduct;
        }

        var product = await _productRepository.GetByIdAsync(request.ProductId);
        
        if (product == null)
        {
            _logger.LogWarning("Product with ID {ProductId} not found", request.ProductId);
            throw new RpcException(new Status(StatusCode.NotFound, 
                $"Product with ID {request.ProductId} not found"));
        }

        var response = MapToProductResponse(product);
        
        // Cache the response
        await _cacheService.SetAsync(cacheKey, response, TimeSpan.FromMinutes(30));
        
        return response;
    }

    public override async Task<ProductResponse> CreateProduct(
        CreateProductRequest request, 
        ServerCallContext context)
    {
        _logger.LogInformation("Creating new product: {ProductName}", request.Name);

        // Validate request
        if (string.IsNullOrWhiteSpace(request.Name))
        {
            throw new RpcException(new Status(StatusCode.InvalidArgument, 
                "Product name is required"));
        }

        if (request.Price <= 0)
        {
            throw new RpcException(new Status(StatusCode.InvalidArgument, 
                "Product price must be positive"));
        }

        var product = new Product
        {
            Name = request.Name,
            Description = request.Description,
            Price = (decimal)request.Price,
            StockQuantity = request.StockQuantity,
            CategoryId = request.CategoryId,
            Status = ProductStatus.Active,
            CreatedAt = DateTime.UtcNow,
            UpdatedAt = DateTime.UtcNow
        };

        await _productRepository.AddAsync(product);
        await _productRepository.SaveChangesAsync();

        _logger.LogInformation("Product created with ID: {ProductId}", product.Id);

        // Invalidate relevant caches
        await _cacheService.RemoveAsync("products_list_*");

        return MapToProductResponse(product);
    }

    public override async Task ListProducts(
        ListProductsRequest request, 
        IServerStreamWriter<ProductResponse> responseStream, 
        ServerCallContext context)
    {
        _logger.LogInformation("Streaming products with page size: {PageSize}", request.PageSize);

        var pageSize = request.PageSize > 0 ? request.PageSize : 50;
        var products = _productRepository.GetAllActiveProducts();

        if (!string.IsNullOrEmpty(request.Filter))
        {
            products = products.Where(p => p.Name.Contains(request.Filter) || 
                                         p.Description.Contains(request.Filter));
        }

        var count = 0;
        foreach (var product in products)
        {
            if (context.CancellationToken.IsCancellationRequested)
                break;

            await responseStream.WriteAsync(MapToProductResponse(product));
            count++;

            // Simulate some delay for demonstration
            if (count % 10 == 0)
                await Task.Delay(100, context.CancellationToken);
        }

        _logger.LogInformation("Streamed {Count} products", count);
    }

    public override async Task<BulkCreateResponse> BulkCreateProducts(
        IAsyncStreamReader<CreateProductRequest> requestStream, 
        ServerCallContext context)
    {
        _logger.LogInformation("Starting bulk product creation");

        var response = new BulkCreateResponse();
        var errors = new List<string>();

        await foreach (var request in requestStream.ReadAllAsync(context.CancellationToken))
        {
            try
            {
                var product = new Product
                {
                    Name = request.Name,
                    Description = request.Description,
                    Price = (decimal)request.Price,
                    StockQuantity = request.StockQuantity,
                    CategoryId = request.CategoryId,
                    Status = ProductStatus.Active,
                    CreatedAt = DateTime.UtcNow,
                    UpdatedAt = DateTime.UtcNow
                };

                await _productRepository.AddAsync(product);
                response.CreatedCount++;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to create product: {ProductName}", request.Name);
                errors.Add($"Failed to create {request.Name}: {ex.Message}");
            }
        }

        if (response.CreatedCount > 0)
        {
            await _productRepository.SaveChangesAsync();
            await _cacheService.RemoveAsync("products_list_*");
        }

        response.Errors.AddRange(errors);
        _logger.LogInformation("Bulk creation completed: {CreatedCount} created, {ErrorCount} errors", 
            response.CreatedCount, errors.Count);

        return response;
    }

    private ProductResponse MapToProductResponse(Product product)
    {
        return new ProductResponse
        {
            ProductId = product.Id,
            Name = product.Name,
            Description = product.Description ?? string.Empty,
            Price = (double)product.Price,
            StockQuantity = product.StockQuantity,
            CategoryId = product.CategoryId ?? string.Empty,
            CreatedAt = Timestamp.FromDateTime(product.CreatedAt),
            UpdatedAt = product.UpdatedAt.HasValue ? 
                Timestamp.FromDateTime(product.UpdatedAt.Value) : null,
            Status = MapProductStatus(product.Status)
        };
    }

    private ProductService.Grpc.ProductStatus MapProductStatus(ProductStatus status)
    {
        return status switch
        {
            ProductStatus.Active => ProductService.Grpc.ProductStatus.Active,
            ProductStatus.Inactive => ProductService.Grpc.ProductStatus.Inactive,
            ProductStatus.OutOfStock => ProductService.Grpc.ProductStatus.OutOfStock,
            ProductStatus.Discontinued => ProductService.Grpc.ProductStatus.Discontinued,
            _ => ProductService.Grpc.ProductStatus.Inactive
        };
    }
}
  

gRPC Interceptors

  
    // ProductService/Interceptors/ExceptionInterceptor.cs
public class ExceptionInterceptor : Interceptor
{
    private readonly ILogger<ExceptionInterceptor> _logger;

    public ExceptionInterceptor(ILogger<ExceptionInterceptor> logger)
    {
        _logger = logger;
    }

    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        try
        {
            return await continuation(request, context);
        }
        catch (RpcException)
        {
            throw; // Already handled
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing gRPC call {Method}", context.Method);
            
            // Map to appropriate gRPC status
            var status = ex switch
            {
                ArgumentException => new Status(StatusCode.InvalidArgument, ex.Message),
                KeyNotFoundException => new Status(StatusCode.NotFound, ex.Message),
                UnauthorizedAccessException => new Status(StatusCode.PermissionDenied, ex.Message),
                _ => new Status(StatusCode.Internal, "An internal error occurred")
            };
            
            throw new RpcException(status);
        }
    }
}

// ProductService/Interceptors/LoggingInterceptor.cs
public class LoggingInterceptor : Interceptor
{
    private readonly ILogger<LoggingInterceptor> _logger;

    public LoggingInterceptor(ILogger<LoggingInterceptor> logger)
    {
        _logger = logger;
    }

    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        var stopwatch = Stopwatch.StartNew();
        
        _logger.LogInformation("Starting gRPC call {Method} from {Peer}", 
            context.Method, context.Peer);

        try
        {
            var response = await continuation(request, context);
            
            stopwatch.Stop();
            _logger.LogInformation("Completed gRPC call {Method} in {ElapsedMs}ms", 
                context.Method, stopwatch.ElapsedMilliseconds);
                
            return response;
        }
        catch (Exception ex)
        {
            stopwatch.Stop();
            _logger.LogError(ex, "gRPC call {Method} failed after {ElapsedMs}ms", 
                context.Method, stopwatch.ElapsedMilliseconds);
            throw;
        }
    }
}
  

4. Advanced gRPC Patterns

Client-Side Implementation

  
    // ProductService client implementation
public class ProductServiceClient : IProductServiceClient, IDisposable
{
    private readonly ProductService.Grpc.ProductService.ProductServiceClient _client;
    private readonly ChannelBase _channel;
    private readonly ILogger<ProductServiceClient> _logger;

    public ProductServiceClient(
        string serviceUrl,
        ILogger<ProductServiceClient> logger)
    {
        _logger = logger;
        
        // Create channel with configuration
        _channel = GrpcChannel.ForAddress(serviceUrl, new GrpcChannelOptions
        {
            HttpHandler = new SocketsHttpHandler
            {
                PooledConnectionIdleTimeout = Timeout.InfiniteTimeSpan,
                KeepAlivePingDelay = TimeSpan.FromSeconds(60),
                KeepAlivePingTimeout = TimeSpan.FromSeconds(30),
                EnableMultipleHttp2Connections = true
            },
            ServiceConfig = new ServiceConfig
            {
                LoadBalancingConfigs = { new RoundRobinConfig() },
                MethodConfigs =
                {
                    new MethodConfig
                    {
                        Names = { MethodName.Default },
                        RetryPolicy = new RetryPolicy
                        {
                            MaxAttempts = 3,
                            InitialBackoff = TimeSpan.FromSeconds(1),
                            MaxBackoff = TimeSpan.FromSeconds(5),
                            BackoffMultiplier = 1.5,
                            RetryableStatusCodes = { StatusCode.Unavailable }
                        }
                    }
                }
            }
        });

        _client = new ProductService.Grpc.ProductService.ProductServiceClient(_channel);
    }

    public async Task<ProductResponse> GetProductAsync(int productId, CancellationToken cancellationToken = default)
    {
        try
        {
            var request = new GetProductRequest { ProductId = productId };
            return await _client.GetProductAsync(request, cancellationToken: cancellationToken);
        }
        catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound)
        {
            _logger.LogWarning("Product {ProductId} not found", productId);
            throw new ProductNotFoundException(productId, ex);
        }
        catch (RpcException ex)
        {
            _logger.LogError(ex, "gRPC error getting product {ProductId}", productId);
            throw new ProductServiceException("Failed to get product", ex);
        }
    }

    public async IAsyncEnumerable<ProductResponse> StreamProductsAsync(
        int pageSize = 50, 
        string filter = null)
    {
        var request = new ListProductsRequest 
        { 
            PageSize = pageSize, 
            Filter = filter ?? string.Empty 
        };

        using var call = _client.ListProducts(request);
        
        await foreach (var product in call.ResponseStream.ReadAllAsync())
        {
            yield return product;
        }
    }

    public async Task<BulkCreateResponse> BulkCreateProductsAsync(
        IAsyncEnumerable<CreateProductRequest> products, 
        CancellationToken cancellationToken = default)
    {
        using var call = _client.BulkCreateProducts(cancellationToken: cancellationToken);
        
        await foreach (var product in products)
        {
            await call.RequestStream.WriteAsync(product);
        }

        await call.RequestStream.CompleteAsync();
        return await call.ResponseAsync;
    }

    public void Dispose()
    {
        _channel?.Dispose();
    }
}
  

Bidirectional Streaming Example

  
    // Real-time inventory update service
public class InventoryGrpcService : InventoryService.Grpc.InventoryService.InventoryServiceBase
{
    private readonly IInventoryRepository _inventoryRepository;
    private readonly ILogger<InventoryGrpcService> _logger;

    public InventoryGrpcService(
        IInventoryRepository inventoryRepository,
        ILogger<InventoryGrpcService> logger)
    {
        _inventoryRepository = inventoryRepository;
        _logger = logger;
    }

    public override async Task StreamInventoryUpdates(
        IAsyncStreamReader<InventoryUpdateRequest> requestStream,
        IServerStreamWriter<InventoryUpdateResponse> responseStream,
        ServerCallContext context)
    {
        _logger.LogInformation("Starting bidirectional inventory stream");

        // Read requests and send responses concurrently
        var readTask = ReadRequestsAsync(requestStream, context.CancellationToken);
        var writeTask = WriteResponsesAsync(responseStream, context.CancellationToken);

        await Task.WhenAll(readTask, writeTask);
    }

    private async Task ReadRequestsAsync(
        IAsyncStreamReader<InventoryUpdateRequest> requestStream,
        CancellationToken cancellationToken)
    {
        await foreach (var request in requestStream.ReadAllAsync(cancellationToken))
        {
            try
            {
                _logger.LogDebug("Processing inventory update for product {ProductId}", 
                    request.ProductId);

                // Process inventory update
                await _inventoryRepository.UpdateStockAsync(
                    request.ProductId, 
                    request.QuantityChange,
                    request.Reason);

                // You could also broadcast this update to other connected clients
                await BroadcastInventoryUpdate(request.ProductId, request.QuantityChange);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to process inventory update for product {ProductId}", 
                    request.ProductId);
            }
        }
    }

    private async Task WriteResponsesAsync(
        IServerStreamWriter<InventoryUpdateResponse> responseStream,
        CancellationToken cancellationToken)
    {
        // Simulate sending periodic inventory summaries
        var timer = new PeriodicTimer(TimeSpan.FromSeconds(30));
        
        while (await timer.WaitForNextTickAsync(cancellationToken))
        {
            try
            {
                var lowStockItems = await _inventoryRepository.GetLowStockItemsAsync(threshold: 10);
                
                var response = new InventoryUpdateResponse
                {
                    Summary = new InventorySummary
                    {
                        Timestamp = Timestamp.FromDateTime(DateTime.UtcNow),
                        LowStockItemCount = lowStockItems.Count,
                        TotalProducts = await _inventoryRepository.GetTotalProductCountAsync()
                    }
                };

                response.LowStockItems.AddRange(lowStockItems.Select(item => 
                    new LowStockItem
                    {
                        ProductId = item.ProductId,
                        ProductName = item.ProductName,
                        CurrentStock = item.CurrentStock,
                        MinimumRequired = item.MinimumRequired
                    }));

                await responseStream.WriteAsync(response);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Failed to send inventory summary");
            }
        }
    }

    private async Task BroadcastInventoryUpdate(int productId, int quantityChange)
    {
        // Implementation for broadcasting to other connected clients
        // This could use Redis pub/sub or other mechanisms
    }
}
  

gRPC Health Checks

  
    // Custom health check implementation
public class ProductServiceHealthCheck : HealthCheck.HealthBase
{
    private readonly ProductContext _dbContext;
    private readonly ILogger<ProductServiceHealthCheck> _logger;

    public ProductServiceHealthCheck(
        ProductContext dbContext,
        ILogger<ProductServiceHealthCheck> logger)
    {
        _dbContext = dbContext;
        _logger = logger;
    }

    public override async Task<HealthCheckResponse> Check(
        HealthCheckRequest request, 
        ServerCallContext context)
    {
        var status = HealthCheckResponse.Types.ServingStatus.Serving;
        var checks = new Dictionary<string, string>();

        // Check database connectivity
        try
        {
            await _dbContext.Database.CanConnectAsync(context.CancellationToken);
            checks["database"] = "healthy";
        }
        catch (Exception ex)
        {
            status = HealthCheckResponse.Types.ServingStatus.NotServing;
            checks["database"] = $"unhealthy: {ex.Message}";
            _logger.LogError(ex, "Database health check failed");
        }

        // Check memory usage
        var memoryUsage = GC.GetTotalMemory(forceFullCollection: false) / 1024 / 1024;
        checks["memory_mb"] = memoryUsage.ToString();
        
        if (memoryUsage > 500) // 500MB threshold
        {
            status = HealthCheckResponse.Types.ServingStatus.NotServing;
            checks["memory"] = "high_usage";
        }

        return new HealthCheckResponse
        {
            Status = status
        };
    }
}
  

5. Message Brokers with RabbitMQ

RabbitMQ Fundamentals

  
    // RabbitMQ configuration and setup
public static class RabbitMQExtensions
{
    public static IServiceCollection AddRabbitMQ(this IServiceCollection services, IConfiguration configuration)
    {
        var rabbitMQConfig = configuration.GetSection("RabbitMQ").Get<RabbitMQConfig>();
        
        services.AddSingleton(rabbitMQConfig);
        
        // Connection factory
        services.AddSingleton<IConnectionFactory>(sp =>
        {
            return new ConnectionFactory
            {
                HostName = rabbitMQConfig.HostName,
                Port = rabbitMQConfig.Port,
                UserName = rabbitMQConfig.UserName,
                Password = rabbitMQConfig.Password,
                VirtualHost = rabbitMQConfig.VirtualHost,
                DispatchConsumersAsync = true,
                AutomaticRecoveryEnabled = true,
                NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
            };
        });

        // Connection
        services.AddSingleton<IConnection>(sp =>
        {
            var factory = sp.GetRequiredService<IConnectionFactory>();
            return factory.CreateConnection();
        });

        // Channel
        services.AddSingleton<IModel>(sp =>
        {
            var connection = sp.GetRequiredService<IConnection>();
            var channel = connection.CreateModel();
            
            // Configure exchanges and queues
            ConfigureInfrastructure(channel, rabbitMQConfig);
            
            return channel;
        });

        services.AddScoped<IMessagePublisher, RabbitMQMessagePublisher>();
        services.AddHostedService<OrderCreatedEventConsumer>();

        return services;
    }

    private static void ConfigureInfrastructure(IModel channel, RabbitMQConfig config)
    {
        // Exchanges
        channel.ExchangeDeclare("order-events", ExchangeType.Topic, durable: true, autoDelete: false);
        channel.ExchangeDeclare("inventory-events", ExchangeType.Topic, durable: true, autoDelete: false);
        channel.ExchangeDeclare("dead-letter", ExchangeType.Fanout, durable: true, autoDelete: false);

        // Queues
        channel.QueueDeclare("order.created", durable: true, exclusive: false, autoDelete: false);
        channel.QueueDeclare("inventory.update", durable: true, exclusive: false, autoDelete: false);
        channel.QueueDeclare("dead-letter-queue", durable: true, exclusive: false, autoDelete: false);

        // Dead letter queue
        var deadLetterArgs = new Dictionary<string, object>
        {
            { "x-dead-letter-exchange", "dead-letter" },
            { "x-message-ttl", 30000 } // 30 seconds
        };

        channel.QueueDeclare("order.created.retry", durable: true, exclusive: false, autoDelete: false, arguments: deadLetterArgs);

        // Bindings
        channel.QueueBind("order.created", "order-events", "order.created");
        channel.QueueBind("inventory.update", "inventory-events", "inventory.updated");
        channel.QueueBind("dead-letter-queue", "dead-letter", "");
        channel.QueueBind("order.created.retry", "order-events", "order.created");
    }
}

public class RabbitMQConfig
{
    public string HostName { get; set; } = "localhost";
    public int Port { get; set; } = 5672;
    public string UserName { get; set; } = "guest";
    public string Password { get; set; } = "guest";
    public string VirtualHost { get; set; } = "/";
}
  

Message Publisher Implementation

  
    // Message publisher service
public class RabbitMQMessagePublisher : IMessagePublisher
{
    private readonly IModel _channel;
    private readonly ILogger<RabbitMQMessagePublisher> _logger;
    private readonly ISerializer _serializer;

    public RabbitMQMessagePublisher(
        IModel channel,
        ILogger<RabbitMQMessagePublisher> logger,
        ISerializer serializer)
    {
        _channel = channel;
        _logger = logger;
        _serializer = serializer;
    }

    public async Task PublishAsync<T>(T message, string exchange, string routingKey) where T : class
    {
        try
        {
            var body = _serializer.SerializeToBytes(message);
            var properties = _channel.CreateBasicProperties();
            
            properties.Persistent = true;
            properties.ContentType = "application/json";
            properties.MessageId = Guid.NewGuid().ToString();
            properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());

            _channel.BasicPublish(
                exchange: exchange,
                routingKey: routingKey,
                mandatory: true,
                basicProperties: properties,
                body: body);

            _logger.LogDebug("Published message to {Exchange} with routing key {RoutingKey}", 
                exchange, routingKey);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to publish message to {Exchange} with routing key {RoutingKey}", 
                exchange, routingKey);
            throw;
        }
    }

    public async Task PublishWithRetryAsync<T>(T message, string exchange, string routingKey, int maxRetries = 3) 
        where T : class
    {
        var retryCount = 0;
        
        while (retryCount < maxRetries)
        {
            try
            {
                await PublishAsync(message, exchange, routingKey);
                return;
            }
            catch (Exception ex) when (retryCount < maxRetries - 1)
            {
                retryCount++;
                _logger.LogWarning(ex, 
                    "Publish failed, retry {RetryCount}/{MaxRetries} after delay", 
                    retryCount, maxRetries);
                
                await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, retryCount)));
            }
        }
        
        throw new MessagePublishException($"Failed to publish message after {maxRetries} retries");
    }
}
  

Event Consumer Implementation

  
    // Order created event consumer
public class OrderCreatedEventConsumer : BackgroundService
{
    private readonly IModel _channel;
    private readonly ILogger<OrderCreatedEventConsumer> _logger;
    private readonly IServiceProvider _serviceProvider;
    private readonly ISerializer _serializer;

    public OrderCreatedEventConsumer(
        IModel channel,
        ILogger<OrderCreatedEventConsumer> logger,
        IServiceProvider serviceProvider,
        ISerializer serializer)
    {
        _channel = channel;
        _logger = logger;
        _serviceProvider = serviceProvider;
        _serializer = serializer;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var consumer = new AsyncEventingBasicConsumer(_channel);
        
        consumer.Received += async (model, ea) =>
        {
            try
            {
                await ProcessMessageAsync(ea);
                _channel.BasicAck(ea.DeliveryTag, multiple: false);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing message with delivery tag {DeliveryTag}", 
                    ea.DeliveryTag);
                
                // Negative acknowledgement with requeue
                _channel.BasicNack(ea.DeliveryTag, multiple: false, requeue: false);
            }
        };

        _channel.BasicConsume(
            queue: "order.created",
            autoAck: false,
            consumer: consumer);

        _logger.LogInformation("Started OrderCreatedEventConsumer");
        
        await Task.Delay(Timeout.Infinite, stoppingToken);
    }

    private async Task ProcessMessageAsync(BasicDeliverEventArgs ea)
    {
        using var scope = _serviceProvider.CreateScope();
        
        var message = _serializer.Deserialize<OrderCreatedEvent>(ea.Body.ToArray());
        var inventoryService = scope.ServiceProvider.GetRequiredService<IInventoryService>();
        var notificationService = scope.ServiceProvider.GetRequiredService<INotificationService>();

        _logger.LogInformation("Processing order created event for order {OrderId}", message.OrderId);

        // Reserve inventory
        await inventoryService.ReserveInventoryAsync(message.OrderId, message.Items);
        
        // Send notification
        await notificationService.SendOrderConfirmationAsync(message.OrderId, message.CustomerEmail);
        
        _logger.LogInformation("Successfully processed order created event for order {OrderId}", 
            message.OrderId);
    }
}

// Order created event model
public class OrderCreatedEvent
{
    public string EventId { get; set; } = Guid.NewGuid().ToString();
    public DateTime OccurredOn { get; set; } = DateTime.UtcNow;
    public int OrderId { get; set; }
    public int CustomerId { get; set; }
    public string CustomerEmail { get; set; }
    public List<OrderItem> Items { get; set; } = new();
    public decimal TotalAmount { get; set; }
}

public class OrderItem
{
    public int ProductId { get; set; }
    public string ProductName { get; set; }
    public int Quantity { get; set; }
    public decimal UnitPrice { get; set; }
}
  

6. Event-Driven Architecture

Event Sourcing with Microservices

  
    // Event-sourced order aggregate
public class OrderAggregate
{
    private readonly List<object> _changes = new();
    
    public string OrderId { get; private set; }
    public OrderStatus Status { get; private set; }
    public decimal TotalAmount { get; private set; }
    public string CustomerId { get; private set; }
    public DateTime CreatedAt { get; private set; }
    public DateTime? UpdatedAt { get; private set; }
    
    // Rehydrate from events
    public OrderAggregate(string orderId, IEnumerable<object> events)
    {
        OrderId = orderId;
        
        foreach (var @event in events)
        {
            Apply(@event, false);
        }
    }
    
    // Create new order
    public OrderAggregate(string orderId, string customerId, List<OrderItem> items)
    {
        if (string.IsNullOrWhiteSpace(orderId))
            throw new ArgumentException("Order ID is required", nameof(orderId));
            
        if (string.IsNullOrWhiteSpace(customerId))
            throw new ArgumentException("Customer ID is required", nameof(customerId));
            
        if (items == null || !items.Any())
            throw new ArgumentException("Order must have items", nameof(items));
            
        var orderCreated = new OrderCreatedEvent
        {
            OrderId = orderId,
            CustomerId = customerId,
            Items = items,
            TotalAmount = items.Sum(i => i.UnitPrice * i.Quantity),
            CreatedAt = DateTime.UtcNow
        };
        
        Apply(orderCreated);
    }
    
    public void UpdateShippingAddress(ShippingAddress address)
    {
        if (Status != OrderStatus.Created && Status != OrderStatus.Confirmed)
            throw new InvalidOperationException("Cannot update shipping address in current state");
            
        var @event = new OrderShippingAddressUpdatedEvent
        {
            OrderId = OrderId,
            ShippingAddress = address,
            UpdatedAt = DateTime.UtcNow
        };
        
        Apply(@event);
    }
    
    public void ConfirmOrder()
    {
        if (Status != OrderStatus.Created)
            throw new InvalidOperationException("Order can only be confirmed from created state");
            
        var @event = new OrderConfirmedEvent
        {
            OrderId = OrderId,
            ConfirmedAt = DateTime.UtcNow
        };
        
        Apply(@event);
    }
    
    private void Apply(object @event, bool isNew = true)
    {
        When(@event);
        
        if (isNew)
        {
            _changes.Add(@event);
        }
    }
    
    private void When(object @event)
    {
        switch (@event)
        {
            case OrderCreatedEvent e:
                OrderId = e.OrderId;
                CustomerId = e.CustomerId;
                TotalAmount = e.TotalAmount;
                Status = OrderStatus.Created;
                CreatedAt = e.CreatedAt;
                break;
                
            case OrderConfirmedEvent e:
                Status = OrderStatus.Confirmed;
                UpdatedAt = e.ConfirmedAt;
                break;
                
            case OrderShippingAddressUpdatedEvent e:
                UpdatedAt = e.UpdatedAt;
                break;
        }
    }
    
    public IReadOnlyCollection<object> GetUncommittedChanges() => _changes.AsReadOnly();
    
    public void MarkChangesAsCommitted() => _changes.Clear();
}

// Event store implementation
public class EventStore : IEventStore
{
    private readonly IEventStoreRepository _repository;
    private readonly IEventPublisher _eventPublisher;
    private readonly ILogger<EventStore> _logger;

    public EventStore(
        IEventStoreRepository repository,
        IEventPublisher eventPublisher,
        ILogger<EventStore> logger)
    {
        _repository = repository;
        _eventPublisher = eventPublisher;
        _logger = logger;
    }

    public async Task SaveAsync(string aggregateId, IEnumerable<object> events, int expectedVersion)
    {
        var eventData = events.Select(@event => new EventData
        {
            Id = Guid.NewGuid().ToString(),
            AggregateId = aggregateId,
            EventType = @event.GetType().Name,
            EventData = JsonSerializer.Serialize(@event, @event.GetType()),
            Version = ++expectedVersion,
            Timestamp = DateTime.UtcNow
        }).ToList();

        await _repository.AppendEventsAsync(aggregateId, eventData, expectedVersion - events.Count());
        
        // Publish events for other microservices
        foreach (var @event in events)
        {
            await _eventPublisher.PublishAsync(@event);
        }
        
        _logger.LogInformation("Saved {EventCount} events for aggregate {AggregateId}", 
            eventData.Count, aggregateId);
    }

    public async Task<List<object>> GetEventsAsync(string aggregateId)
    {
        var eventData = await _repository.GetEventsAsync(aggregateId);
        
        return eventData.Select(DeserializeEvent).ToList();
    }

    private object DeserializeEvent(EventData eventData)
    {
        var eventType = Type.GetType($"ECommerce.Domain.Events.{eventData.EventType}, ECommerce.Domain");
        if (eventType == null)
            throw new InvalidOperationException($"Unknown event type: {eventData.EventType}");
            
        return JsonSerializer.Deserialize(eventData.EventData, eventType);
    }
}
  

Saga Pattern Implementation

  
    // Order processing saga
public class OrderProcessingSaga : 
    IEventHandler<OrderCreatedEvent>,
    IEventHandler<PaymentProcessedEvent>,
    IEventHandler<InventoryReservedEvent>,
    IEventHandler<ShippingCreatedEvent>
{
    private readonly ISagaRepository _sagaRepository;
    private readonly IEventPublisher _eventPublisher;
    private readonly ILogger<OrderProcessingSaga> _logger;

    public OrderProcessingSaga(
        ISagaRepository sagaRepository,
        IEventPublisher eventPublisher,
        ILogger<OrderProcessingSaga> logger)
    {
        _sagaRepository = sagaRepository;
        _eventPublisher = eventPublisher;
        _logger = logger;
    }

    public async Task Handle(OrderCreatedEvent @event)
    {
        _logger.LogInformation("Starting order processing saga for order {OrderId}", @event.OrderId);

        var saga = new OrderSaga(@event.OrderId, @event.CustomerId, @event.TotalAmount);
        await _sagaRepository.SaveAsync(saga);

        // Start payment processing
        await _eventPublisher.PublishAsync(new ProcessPaymentCommand
        {
            OrderId = @event.OrderId,
            Amount = @event.TotalAmount,
            PaymentMethod = "CreditCard" // Would come from order
        });
    }

    public async Task Handle(PaymentProcessedEvent @event)
    {
        var saga = await _sagaRepository.GetByIdAsync(@event.OrderId);
        
        if (@event.Success)
        {
            saga.MarkPaymentProcessed();
            await _sagaRepository.SaveAsync(saga);

            // Proceed to inventory reservation
            await _eventPublisher.PublishAsync(new ReserveInventoryCommand
            {
                OrderId = @event.OrderId,
                Items = @event.Items
            });
        }
        else
        {
            saga.MarkPaymentFailed(@event.ErrorMessage);
            await _sagaRepository.SaveAsync(saga);

            // Compensating action
            await _eventPublisher.PublishAsync(new CancelOrderCommand
            {
                OrderId = @event.OrderId,
                Reason = "Payment failed: " + @event.ErrorMessage
            });
        }
    }

    public async Task Handle(InventoryReservedEvent @event)
    {
        var saga = await _sagaRepository.GetByIdAsync(@event.OrderId);
        saga.MarkInventoryReserved();
        await _sagaRepository.SaveAsync(saga);

        // Proceed to shipping
        await _eventPublisher.PublishAsync(new CreateShippingCommand
        {
            OrderId = @event.OrderId,
            ShippingAddress = @event.ShippingAddress,
            Items = @event.Items
        });
    }

    public async Task Handle(ShippingCreatedEvent @event)
    {
        var saga = await _sagaRepository.GetByIdAsync(@event.OrderId);
        saga.MarkShippingCreated();
        await _sagaRepository.SaveAsync(saga);

        // Order processing completed
        await _eventPublisher.PublishAsync(new CompleteOrderCommand
        {
            OrderId = @event.OrderId
        });
        
        _logger.LogInformation("Order processing saga completed for order {OrderId}", @event.OrderId);
    }
}
  

7. Service Discovery & API Gateway

Consul Service Discovery

  
    // Consul service registration
public class ConsulServiceDiscovery : IServiceDiscovery, IHostedService
{
    private readonly IConsulClient _consulClient;
    private readonly ServiceConfig _serviceConfig;
    private readonly ILogger<ConsulServiceDiscovery> _logger;
    private string _serviceId;

    public ConsulServiceDiscovery(
        IConsulClient consulClient,
        IConfiguration configuration,
        ILogger<ConsulServiceDiscovery> logger)
    {
        _consulClient = consulClient;
        _logger = logger;
        _serviceConfig = configuration.GetSection("ServiceDiscovery").Get<ServiceConfig>();
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _serviceId = $"{_serviceConfig.ServiceName}-{Guid.NewGuid()}";
        
        var registration = new AgentServiceRegistration
        {
            ID = _serviceId,
            Name = _serviceConfig.ServiceName,
            Address = _serviceConfig.ServiceAddress,
            Port = _serviceConfig.ServicePort,
            Tags = _serviceConfig.Tags,
            Check = new AgentServiceCheck
            {
                HTTP = $"http://{_serviceConfig.ServiceAddress}:{_serviceConfig.ServicePort}/health",
                Interval = TimeSpan.FromSeconds(30),
                Timeout = TimeSpan.FromSeconds(10),
                DeregisterCriticalServiceAfter = TimeSpan.FromMinutes(1)
            }
        };

        await _consulClient.Agent.ServiceRegister(registration, cancellationToken);
        _logger.LogInformation("Service {ServiceName} registered with Consul", _serviceConfig.ServiceName);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _consulClient.Agent.ServiceDeregister(_serviceId, cancellationToken);
        _logger.LogInformation("Service {ServiceName} deregistered from Consul", _serviceConfig.ServiceName);
    }

    public async Task<ServiceEndpoint> GetServiceAsync(string serviceName)
    {
        var services = await _consulClient.Health.Service(serviceName, string.Empty, true);
        
        if (services.Response == null || !services.Response.Any())
        {
            throw new ServiceNotFoundException($"Service {serviceName} not found in Consul");
        }

        // Simple round-robin load balancing
        var service = services.Response[Random.Shared.Next(services.Response.Length)];
        
        return new ServiceEndpoint
        {
            Address = service.Service.Address,
            Port = service.Service.Port
        };
    }

    public async Task<List<ServiceEndpoint>> GetAllServicesAsync(string serviceName)
    {
        var services = await _consulClient.Health.Service(serviceName, string.Empty, true);
        
        return services.Response?.Select(s => new ServiceEndpoint
        {
            Address = s.Service.Address,
            Port = s.Service.Port
        }).ToList() ?? new List<ServiceEndpoint>();
    }
}
  

API Gateway with Ocelot

  
    // Ocelot configuration
public static class OcelotConfiguration
{
    public static WebApplicationBuilder AddOcelotGateway(this WebApplicationBuilder builder)
    {
        builder.Services.AddOcelot()
            .AddConsul()
            .AddConfigStoredInConsul();
            
        builder.Services.AddAuthentication("Bearer")
            .AddJwtBearer("Bearer", options =>
            {
                options.Authority = builder.Configuration["Identity:Authority"];
                options.RequireHttpsMetadata = false;
                options.TokenValidationParameters = new TokenValidationParameters
                {
                    ValidateAudience = false
                };
            });

        return builder;
    }
}

// ocelot.json configuration
{
  "Routes": [
    {
      "DownstreamPathTemplate": "/api/products/{everything}",
      "DownstreamScheme": "https",
      "DownstreamHostAndPorts": [
        {
          "Host": "product-service",
          "Port": 5001
        }
      ],
      "UpstreamPathTemplate": "/products/{everything}",
      "UpstreamHttpMethod": [ "GET", "POST", "PUT", "DELETE" ],
      "AuthenticationOptions": {
        "AuthenticationProviderKey": "Bearer",
        "AllowedScopes": []
      },
      "RateLimitOptions": {
        "ClientWhitelist": [],
        "EnableRateLimiting": true,
        "Period": "1s",
        "PeriodTimespan": 1,
        "Limit": 100
      }
    },
    {
      "DownstreamPathTemplate": "/api/orders/{everything}",
      "DownstreamScheme": "https",
      "DownstreamHostAndPorts": [
        {
          "Host": "order-service",
          "Port": 5002
        }
      ],
      "UpstreamPathTemplate": "/orders/{everything}",
      "UpstreamHttpMethod": [ "GET", "POST", "PUT", "DELETE" ],
      "AuthenticationOptions": {
        "AuthenticationProviderKey": "Bearer",
        "AllowedScopes": [ "orders" ]
      }
    },
    {
      "DownstreamPathTemplate": "/api/inventory/{everything}",
      "DownstreamScheme": "https",
      "DownstreamHostAndPorts": [
        {
          "Host": "inventory-service",
          "Port": 5003
        }
      ],
      "UpstreamPathTemplate": "/inventory/{everything}",
      "UpstreamHttpMethod": [ "GET", "POST", "PUT" ],
      "AuthenticationOptions": {
        "AuthenticationProviderKey": "Bearer",
        "AllowedScopes": [ "inventory" ]
      }
    }
  ],
  "GlobalConfiguration": {
    "BaseUrl": "https://localhost:7000",
    "ServiceDiscoveryProvider": {
      "Host": "consul",
      "Port": 8500,
      "Type": "Consul"
    }
  }
}
  

8. Containerization & Orchestration

Docker Configuration

  
    # Product Service Dockerfile
FROM mcr.microsoft.com/dotnet/aspnet:8.0 AS base
WORKDIR /app
EXPOSE 80
EXPOSE 443

# Install curl for health checks
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["src/ProductService/ProductService.csproj", "src/ProductService/"]
COPY ["contracts/ProductService.Grpc/ProductService.Grpc.csproj", "contracts/ProductService.Grpc/"]
RUN dotnet restore "src/ProductService/ProductService.csproj"
COPY . .
WORKDIR "/src/src/ProductService"
RUN dotnet build "ProductService.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "ProductService.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .

# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app
USER appuser

ENTRYPOINT ["dotnet", "ProductService.dll"]
  

Kubernetes Deployment

  
    # product-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: product-service
  namespace: ecommerce
  labels:
    app: product-service
    version: v1.0.0
spec:
  replicas: 3
  selector:
    matchLabels:
      app: product-service
  template:
    metadata:
      labels:
        app: product-service
        version: v1.0.0
      annotations:
        consul.hashicorp.com/connect-inject: "true"
        consul.hashicorp.com/service-tags: "version=v1.0.0"
    spec:
      containers:
      - name: product-service
        image: ecommerce/product-service:latest
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 80
          name: http
        - containerPort: 443
          name: https
        env:
        - name: ASPNETCORE_ENVIRONMENT
          value: "Production"
        - name: ConnectionStrings__ProductDatabase
          valueFrom:
            secretKeyRef:
              name: database-secrets
              key: product-connection-string
        - name: RabbitMQ__HostName
          value: "rabbitmq"
        - name: Consul__Host
          value: "consul-server"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        livenessProbe:
          httpGet:
            path: /health
            port: 80
          initialDelaySeconds: 30
          periodSeconds: 10
          timeoutSeconds: 5
        readinessProbe:
          httpGet:
            path: /health/ready
            port: 80
          initialDelaySeconds: 5
          periodSeconds: 5
        startupProbe:
          httpGet:
            path: /health/startup
            port: 80
          initialDelaySeconds: 10
          periodSeconds: 10
          failureThreshold: 3
        securityContext:
          allowPrivilegeEscalation: false
          runAsNonRoot: true
          runAsUser: 1000
          capabilities:
            drop:
            - ALL
      restartPolicy: Always
      terminationGracePeriodSeconds: 60
---
apiVersion: v1
kind: Service
metadata:
  name: product-service
  namespace: ecommerce
  labels:
    app: product-service
  annotations:
    consul.hashicorp.com/service-tags: "version=v1.0.0"
spec:
  selector:
    app: product-service
  ports:
  - name: http
    port: 80
    targetPort: 80
    protocol: TCP
  - name: grpc
    port: 5001
    targetPort: 5001
    protocol: TCP
  type: ClusterIP
  

Docker Compose for Development

  
    # docker-compose.yml
version: '3.8'

services:
  product-service:
    build:
      context: .
      dockerfile: src/ProductService/Dockerfile
    environment:
      - ASPNETCORE_ENVIRONMENT=Development
      - ConnectionStrings__ProductDatabase=Server=sql-server;Database=ProductDB;User Id=sa;Password=YourPassword123!;TrustServerCertificate=true;
      - RabbitMQ__HostName=rabbitmq
      - Consul__Host=consul
    ports:
      - "5001:80"
    depends_on:
      - sql-server
      - rabbitmq
      - consul
    networks:
      - ecommerce-network

  order-service:
    build:
      context: .
      dockerfile: src/OrderService/Dockerfile
    environment:
      - ASPNETCORE_ENVIRONMENT=Development
      - ConnectionStrings__OrderDatabase=Server=sql-server;Database=OrderDB;User Id=sa;Password=YourPassword123!;TrustServerCertificate=true;
      - RabbitMQ__HostName=rabbitmq
    ports:
      - "5002:80"
    depends_on:
      - sql-server
      - rabbitmq
    networks:
      - ecommerce-network

  api-gateway:
    build:
      context: .
      dockerfile: src/ApiGateway/Dockerfile
    environment:
      - ASPNETCORE_ENVIRONMENT=Development
    ports:
      - "7000:80"
    depends_on:
      - product-service
      - order-service
    networks:
      - ecommerce-network

  sql-server:
    image: mcr.microsoft.com/mssql/server:2022-latest
    environment:
      SA_PASSWORD: "YourPassword123!"
      ACCEPT_EULA: "Y"
      MSSQL_PID: "Express"
    ports:
      - "1433:1433"
    volumes:
      - sql-data:/var/opt/mssql
    networks:
      - ecommerce-network

  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - rabbitmq-data:/var/lib/rabbitmq
    networks:
      - ecommerce-network

  consul:
    image: consul:1.15
    ports:
      - "8500:8500"
    command: "agent -dev -client=0.0.0.0"
    networks:
      - ecommerce-network

  seq:
    image: datalust/seq:latest
    environment:
      - ACCEPT_EULA=Y
    ports:
      - "5341:5341"
      - "8081:80"
    volumes:
      - seq-data:/data
    networks:
      - ecommerce-network

volumes:
  sql-data:
  rabbitmq-data:
  seq-data:

networks:
  ecommerce-network:
    driver: bridge
  

9. Distributed Data Management

Database per Service Pattern

  
    // Product service database context
public class ProductContext : DbContext
{
    public ProductContext(DbContextOptions<ProductContext> options) : base(options)
    {
    }

    public DbSet<Product> Products => Set<Product>();
    public DbSet<Category> Categories => Set<Category>();
    public DbSet<ProductReview> ProductReviews => Set<ProductReview>();

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<Product>(entity =>
        {
            entity.HasKey(e => e.Id);
            entity.Property(e => e.Name).IsRequired().HasMaxLength(100);
            entity.Property(e => e.Description).HasMaxLength(500);
            entity.Property(e => e.Price).HasColumnType("decimal(18,2)");
            entity.HasIndex(e => e.Name);
            entity.HasIndex(e => e.CategoryId);
            entity.HasQueryFilter(e => e.Status == ProductStatus.Active);
        });

        modelBuilder.Entity<Category>(entity =>
        {
            entity.HasKey(e => e.Id);
            entity.Property(e => e.Name).IsRequired().HasMaxLength(50);
            entity.HasIndex(e => e.Name).IsUnique();
        });

        base.OnModelCreating(modelBuilder);
    }
}

// Order service database context
public class OrderContext : DbContext
{
    public OrderContext(DbContextOptions<OrderContext> options) : base(options)
    {
    }

    public DbSet<Order> Orders => Set<Order>();
    public DbSet<OrderItem> OrderItems => Set<OrderItem>();

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.Entity<Order>(entity =>
        {
            entity.HasKey(e => e.Id);
            entity.Property(e => e.TotalAmount).HasColumnType("decimal(18,2)");
            entity.Property(e => e.Status).HasConversion<string>();
            entity.HasIndex(e => e.CustomerId);
            entity.HasIndex(e => e.Status);
            entity.HasIndex(e => e.CreatedAt);
        });

        modelBuilder.Entity<OrderItem>(entity =>
        {
            entity.HasKey(e => e.Id);
            entity.Property(e => e.UnitPrice).HasColumnType("decimal(18,2)");
            entity.HasOne(e => e.Order)
                  .WithMany(o => o.Items)
                  .HasForeignKey(e => e.OrderId);
        });

        base.OnModelCreating(modelBuilder);
    }
}
  

Saga Data Management

  
    // Saga data model
public class OrderSaga
{
    public string OrderId { get; set; }
    public string CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public OrderSagaState State { get; set; }
    public Dictionary<string, object> Context { get; set; } = new();
    public List<SagaStep> Steps { get; set; } = new();
    public DateTime CreatedAt { get; set; }
    public DateTime? UpdatedAt { get; set; }
    public DateTime? CompletedAt { get; set; }

    public void MarkPaymentProcessed()
    {
        State = OrderSagaState.PaymentProcessed;
        Steps.Add(new SagaStep { Name = "PaymentProcessed", CompletedAt = DateTime.UtcNow });
        UpdatedAt = DateTime.UtcNow;
    }

    public void MarkInventoryReserved()
    {
        State = OrderSagaState.InventoryReserved;
        Steps.Add(new SagaStep { Name = "InventoryReserved", CompletedAt = DateTime.UtcNow });
        UpdatedAt = DateTime.UtcNow;
    }

    public void MarkShippingCreated()
    {
        State = OrderSagaState.ShippingCreated;
        Steps.Add(new SagaStep { Name = "ShippingCreated", CompletedAt = DateTime.UtcNow });
        UpdatedAt = DateTime.UtcNow;
    }

    public void MarkCompleted()
    {
        State = OrderSagaState.Completed;
        CompletedAt = DateTime.UtcNow;
        UpdatedAt = DateTime.UtcNow;
    }

    public void MarkPaymentFailed(string error)
    {
        State = OrderSagaState.PaymentFailed;
        Context["PaymentError"] = error;
        UpdatedAt = DateTime.UtcNow;
    }
}

public enum OrderSagaState
{
    Started,
    PaymentProcessed,
    InventoryReserved,
    ShippingCreated,
    Completed,
    PaymentFailed,
    InventoryReservationFailed,
    ShippingCreationFailed
}

public class SagaStep
{
    public string Name { get; set; }
    public DateTime? CompletedAt { get; set; }
    public string Error { get; set; }
}
  

10. Observability & Monitoring

Distributed Tracing

  
    // OpenTelemetry configuration
public static class OpenTelemetryExtensions
{
    public static IServiceCollection AddOpenTelemetry(this IServiceCollection services, IConfiguration configuration)
    {
        services.AddOpenTelemetry()
            .WithTracing(tracing =>
            {
                tracing.AddAspNetCoreInstrumentation(options =>
                {
                    options.EnrichWithHttpRequest = (activity, httpRequest) =>
                    {
                        activity.SetTag("request.protocol", httpRequest.Protocol);
                    };
                    options.EnrichWithHttpResponse = (activity, httpResponse) =>
                    {
                        activity.SetTag("response.length", httpResponse.ContentLength);
                    };
                    options.RecordException = true;
                })
                .AddGrpcClientInstrumentation()
                .AddHttpClientInstrumentation()
                .AddEntityFrameworkCoreInstrumentation(options =>
                {
                    options.SetDbStatementForText = true;
                })
                .AddRedisInstrumentation()
                .AddSource("ProductService")
                .AddSource("OrderService")
                .AddSource("InventoryService")
                .SetSampler(new AlwaysOnSampler())
                .AddOtlpExporter(options =>
                {
                    options.Endpoint = new Uri(configuration["Otlp:Endpoint"]);
                })
                .AddConsoleExporter();
            })
            .WithMetrics(metrics =>
            {
                metrics.AddAspNetCoreInstrumentation()
                    .AddHttpClientInstrumentation()
                    .AddRuntimeInstrumentation()
                    .AddProcessInstrumentation()
                    .AddMeter("Microsoft.AspNetCore.Hosting")
                    .AddMeter("Microsoft.AspNetCore.Server.Kestrel")
                    .AddOtlpExporter(options =>
                    {
                        options.Endpoint = new Uri(configuration["Otlp:Endpoint"]);
                    });
            });

        return services;
    }
}
  

Health Checks with Dependencies

  
    // Comprehensive health checks
public static class HealthCheckExtensions
{
    public static IHealthChecksBuilder AddMicroserviceHealthChecks(
        this IServiceCollection services, 
        IConfiguration configuration)
    {
        return services.AddHealthChecks()
            // Database health checks
            .AddSqlServer(
                connectionString: configuration.GetConnectionString("ProductDatabase"),
                name: "product-database",
                tags: new[] { "ready", "live" })
            .AddRedis(
                redisConnectionString: configuration.GetConnectionString("Redis"),
                name: "redis",
                tags: new[] { "ready", "live" })
            // External services
            .AddUrlGroup(
                new Uri("https://api.paymentgateway.com/health"),
                name: "payment-gateway",
                tags: new[] { "ready" })
            // Message broker
            .AddRabbitMQ(
                rabbitConnectionString: configuration.GetConnectionString("RabbitMQ"),
                name: "rabbitmq",
                tags: new[] { "ready", "live" })
            // Disk storage
            .AddDiskStorageHealthCheck(s => 
                s.AddDrive("C:\\", 1024), 
                name: "storage",
                tags: new[] { "live" })
            // Memory check
            .AddProcessAllocatedMemoryHealthCheck(512, "process-memory")
            // Custom checks
            .AddCheck<GrpcHealthCheck>("grpc-endpoints", tags: new[] { "ready" })
            .AddCheck<DatabaseMigrationHealthCheck>("database-migrations", tags: new[] { "ready" });
    }
}

// Custom gRPC health check
public class GrpcHealthCheck : IHealthCheck
{
    private readonly IProductServiceClient _productServiceClient;
    private readonly ILogger<GrpcHealthCheck> _logger;

    public GrpcHealthCheck(
        IProductServiceClient productServiceClient,
        ILogger<GrpcHealthCheck> logger)
    {
        _productServiceClient = productServiceClient;
        _logger = logger;
    }

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context, 
        CancellationToken cancellationToken = default)
    {
        try
        {
            // Test gRPC connection by making a simple call
            await _productServiceClient.GetProductAsync(1, cancellationToken);
            return HealthCheckResult.Healthy("gRPC endpoints are responsive");
        }
        catch (Exception ex)
        {
            _logger.LogWarning(ex, "gRPC health check failed");
            return HealthCheckResult.Unhealthy("gRPC endpoints are not responsive");
        }
    }
}
  

11. Security in Microservices

Secure Service Communication

  
    // gRPC with authentication and authorization
public static class GrpcSecurityExtensions
{
    public static IServiceCollection AddSecureGrpc(this IServiceCollection services)
    {
        services.AddGrpc(options =>
        {
            options.Interceptors.Add<AuthInterceptor>();
            options.Interceptors.Add<LoggingInterceptor>();
        });

        services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
            .AddJwtBearer(options =>
            {
                options.Authority = "https://localhost:5001";
                options.TokenValidationParameters = new TokenValidationParameters
                {
                    ValidateAudience = false,
                    ValidTypes = new[] { "at+jwt" }
                };
            });

        services.AddAuthorization(options =>
        {
            options.AddPolicy("ProductRead", policy =>
                policy.RequireAuthenticatedUser().RequireClaim("scope", "product.read"));
            options.AddPolicy("ProductWrite", policy =>
                policy.RequireAuthenticatedUser().RequireClaim("scope", "product.write"));
        });

        return services;
    }
}

// Authentication interceptor
public class AuthInterceptor : Interceptor
{
    private readonly IHttpContextAccessor _httpContextAccessor;
    private readonly ILogger<AuthInterceptor> _logger;

    public AuthInterceptor(
        IHttpContextAccessor httpContextAccessor,
        ILogger<AuthInterceptor> logger)
    {
        _httpContextAccessor = httpContextAccessor;
        _logger = logger;
    }

    public override async Task<TResponse> UnaryServerHandler<TRequest, TResponse>(
        TRequest request,
        ServerCallContext context,
        UnaryServerMethod<TRequest, TResponse> continuation)
    {
        var httpContext = _httpContextAccessor.HttpContext;
        
        if (httpContext == null)
        {
            throw new RpcException(new Status(StatusCode.Unauthenticated, "Authentication required"));
        }

        // Extract token from gRPC metadata
        var token = context.RequestHeaders.FirstOrDefault(h => h.Key == "authorization")?.Value;
        
        if (string.IsNullOrEmpty(token))
        {
            throw new RpcException(new Status(StatusCode.Unauthenticated, "Authorization token required"));
        }

        // Validate token and set user context
        try
        {
            var user = await ValidateTokenAsync(token);
            context.UserState["User"] = user;
        }
        catch (Exception ex)
        {
            _logger.LogWarning(ex, "Token validation failed");
            throw new RpcException(new Status(StatusCode.Unauthenticated, "Invalid token"));
        }

        return await continuation(request, context);
    }

    private async Task<ClaimsPrincipal> ValidateTokenAsync(string token)
    {
        // Token validation logic
        // This would typically use JWT validation with your identity provider
        return await Task.FromResult(new ClaimsPrincipal());
    }
}
  

Secure Message Brokers

  
    // Secure RabbitMQ configuration
public class SecureRabbitMQConnectionFactory
{
    public ConnectionFactory CreateSecureFactory(RabbitMQConfig config)
    {
        return new ConnectionFactory
        {
            HostName = config.HostName,
            Port = config.Port,
            UserName = config.UserName,
            Password = config.Password,
            VirtualHost = config.VirtualHost,
            
            // Security settings
            Ssl = new SslOption
            {
                Enabled = config.UseSsl,
                ServerName = config.HostName,
                CertificateValidationCallback = (sender, certificate, chain, sslPolicyErrors) =>
                {
                    // Custom certificate validation
                    return sslPolicyErrors == SslPolicyErrors.None;
                }
            },
            
            // Connection settings
            RequestedHeartbeat = TimeSpan.FromSeconds(60),
            NetworkRecoveryInterval = TimeSpan.FromSeconds(10),
            AutomaticRecoveryEnabled = true,
            TopologyRecoveryEnabled = true
        };
    }
}
  

12. Real-World E-Commerce Platform

Complete Microservices Architecture

  
    // Order service with event sourcing
public class OrderService : OrderService.Grpc.OrderService.OrderServiceBase
{
    private readonly IEventStore _eventStore;
    private readonly IOrderRepository _orderRepository;
    private readonly IEventPublisher _eventPublisher;
    private readonly ILogger<OrderService> _logger;

    public OrderService(
        IEventStore eventStore,
        IOrderRepository orderRepository,
        IEventPublisher eventPublisher,
        ILogger<OrderService> logger)
    {
        _eventStore = eventStore;
        _orderRepository = orderRepository;
        _eventPublisher = eventPublisher;
        _logger = logger;
    }

    public override async Task<CreateOrderResponse> CreateOrder(
        CreateOrderRequest request, 
        ServerCallContext context)
    {
        _logger.LogInformation("Creating order for customer {CustomerId}", request.CustomerId);

        try
        {
            // Validate products via gRPC call to ProductService
            var productService = context.GetHttpContext().RequestServices
                .GetRequiredService<IProductServiceClient>();
                
            foreach (var item in request.Items)
            {
                var product = await productService.GetProductAsync(item.ProductId);
                if (product.StockQuantity < item.Quantity)
                {
                    throw new RpcException(new Status(StatusCode.FailedPrecondition,
                        $"Insufficient stock for product {item.ProductId}"));
                }
            }

            // Create order aggregate
            var orderId = Guid.NewGuid().ToString();
            var orderItems = request.Items.Select(i => new OrderItem
            {
                ProductId = i.ProductId,
                ProductName = i.ProductName,
                Quantity = i.Quantity,
                UnitPrice = (decimal)i.UnitPrice
            }).ToList();

            var order = new OrderAggregate(orderId, request.CustomerId, orderItems);

            // Save events
            var events = order.GetUncommittedChanges();
            await _eventStore.SaveAsync(orderId, events, -1);
            order.MarkChangesAsCommitted();

            // Publish integration event
            await _eventPublisher.PublishAsync(new OrderCreatedIntegrationEvent
            {
                OrderId = orderId,
                CustomerId = request.CustomerId,
                Items = request.Items.ToList(),
                TotalAmount = (decimal)request.TotalAmount
            });

            _logger.LogInformation("Order {OrderId} created successfully", orderId);

            return new CreateOrderResponse
            {
                OrderId = orderId,
                Status = OrderStatus.Created
            };
        }
        catch (RpcException)
        {
            throw;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to create order for customer {CustomerId}", request.CustomerId);
            throw new RpcException(new Status(StatusCode.Internal, "Order creation failed"));
        }
    }
}
  

Integration Testing

  
    // Microservices integration tests
public class MicroservicesIntegrationTests : IClassFixture<MicroservicesTestFixture>
{
    private readonly MicroservicesTestFixture _fixture;
    private readonly HttpClient _apiGatewayClient;

    public MicroservicesIntegrationTests(MicroservicesTestFixture fixture)
    {
        _fixture = fixture;
        _apiGatewayClient = fixture.CreateClient();
    }

    [Fact]
    public async Task CreateOrder_ValidRequest_ShouldProcessSuccessfully()
    {
        // Arrange
        var createOrderRequest = new
        {
            CustomerId = 123,
            Items = new[]
            {
                new { ProductId = 1, Quantity = 2, UnitPrice = 25.99m },
                new { ProductId = 2, Quantity = 1, UnitPrice = 15.50m }
            },
            ShippingAddress = new
            {
                Street = "123 Test St",
                City = "Test City",
                Country = "Test Country",
                ZipCode = "12345"
            }
        };

        // Act
        var response = await _apiGatewayClient.PostAsJsonAsync("/orders", createOrderRequest);

        // Assert
        response.EnsureSuccessStatusCode();
        
        var orderResponse = await response.Content.ReadFromJsonAsync<CreateOrderResponse>();
        orderResponse.Should().NotBeNull();
        orderResponse.OrderId.Should().NotBeNullOrEmpty();
        orderResponse.Status.Should().Be(OrderStatus.Created);

        // Verify events were published
        await _fixture.WaitForEvent<OrderCreatedIntegrationEvent>(
            e => e.OrderId == orderResponse.OrderId, 
            TimeSpan.FromSeconds(10));

        // Verify inventory was updated
        var inventoryResponse = await _apiGatewayClient.GetAsync($"/inventory/products/1");
        inventoryResponse.EnsureSuccessStatusCode();
    }
}

public class MicroservicesTestFixture : WebApplicationFactory<Program>
{
    private readonly TestContainersFixture _containers;

    public MicroservicesTestFixture()
    {
        _containers = new TestContainersFixture();
    }

    protected override void ConfigureWebHost(IWebHostBuilder builder)
    {
        builder.ConfigureServices(services =>
        {
            // Replace real services with test doubles
            services.AddSingleton<IMessagePublisher, TestMessagePublisher>();
            services.AddSingleton<IProductServiceClient, TestProductServiceClient>();
        });

        builder.UseEnvironment("Testing");
    }

    protected override IHost CreateHost(IHostBuilder builder)
    {
        // Start test containers before host
        _containers.StartAsync().GetAwaiter().GetResult();
        
        return base.CreateHost(builder);
    }

    public async Task WaitForEvent<TEvent>(Func<TEvent, bool> predicate, TimeSpan timeout)
    {
        var testPublisher = Services.GetRequiredService<TestMessagePublisher>();
        var startTime = DateTime.UtcNow;
        
        while (DateTime.UtcNow - startTime < timeout)
        {
            if (testPublisher.PublishedEvents.OfType<TEvent>().Any(predicate))
                return;
                
            await Task.Delay(100);
        }
        
        throw new TimeoutException($"Event of type {typeof(TEvent).Name} not published within timeout");
    }
}
  

This comprehensive microservices foundation guide provides everything needed to architect, build, and deploy scalable microservices using gRPC and message brokers in  ASP.NET  Core. The patterns and practices demonstrated here form the foundation for building robust, maintainable distributed systems.