![SQLClient Raw Performance Part 7: Mastering Data Streaming & Bulk Operations in ASP.NET Core - FreeLearning365.com SQLClient Raw Performance Part 7: Mastering Data Streaming & Bulk Operations in ASP.NET Core - FreeLearning365.com]()
Table of Contents
Introduction to High-Volume Data Processing
Understanding Data Streaming Concepts
SqlDataReader Streaming Techniques
Bulk Copy Operations with SqlBulkCopy
Memory-Efficient Large Object Handling
Table-Valued Parameters for Bulk Operations
Asynchronous Streaming Patterns
Real-World ETL Pipeline Implementation
Performance Monitoring and Optimization
Best Practices and Common Pitfalls
1. Introduction to High-Volume Data Processing
In modern applications, dealing with large datasets is increasingly common. Traditional ORM approaches often fall short when processing millions of records due to memory constraints and performance overhead.
The Challenge with ORMs
// Problematic ORM approach for large datasets
var allUsers = context.Users.ToList(); // Loads ALL records into memory!
foreach (var user in allUsers)
{
// Process user
}
SqlClient Advantage
// Efficient SqlClient streaming approach
using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
using var command = new SqlCommand("SELECT * FROM Users", connection);
using var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
while (await reader.ReadAsync())
{
// Process one record at a time
ProcessUser(reader);
}
2. Understanding Data Streaming Concepts
Sequential Access vs Random Access
Random Access (Default)
// Default behavior - loads entire row into memory
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
var name = reader["Name"]; // Can access columns in any order
var email = reader["Email"];
}
Sequential Access (Memory Efficient)
// Sequential access - streams data efficiently
using var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
while (await reader.ReadAsync())
{
var name = reader.GetString(0); // Must access columns in order
var email = reader.GetString(1);
// Large data fields are streamed, not fully loaded
}
Memory Usage Comparison
Let's create a comprehensive example demonstrating the memory differences:
Program.cs
using System.Data;
using Microsoft.Data.SqlClient;
namespace DataStreamingDemo
{
class Program
{
static async Task Main(string[] args)
{
const string connectionString = "Server=.;Database=StreamingDemo;Trusted_Connection=true;";
await InitializeDatabase(connectionString);
Console.WriteLine("=== Memory Usage Comparison ===");
// Test random access memory usage
var randomMemory = await TestRandomAccessMemory(connectionString);
Console.WriteLine($"Random Access Memory: {randomMemory} MB");
// Test sequential access memory usage
var sequentialMemory = await TestSequentialAccessMemory(connectionString);
Console.WriteLine($"Sequential Access Memory: {sequentialMemory} MB");
Console.WriteLine($"Memory saved: {randomMemory - sequentialMemory} MB");
}
static async Task<long> TestRandomAccessMemory( string connectionString)
{
var memoryBefore = GC.GetTotalMemory(true);
using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
using var command = new SqlCommand("SELECT * FROM LargeDataTable", connection);
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
// Access columns randomly
var id = reader["Id"];
var largeData = reader["LargeData"];
var timestamp = reader["Timestamp"];
}
var memoryAfter = GC.GetTotalMemory(true);
return (memoryAfter - memoryBefore) / (1024 * 1024);
}
static async Task<long> TestSequentialAccessMemory(string connectionString)
{
var memoryBefore = GC.GetTotalMemory(true);
using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
using var command = new SqlCommand("SELECT * FROM LargeDataTable", connection);
using var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
while (await reader.ReadAsync())
{
// Must access columns sequentially
var id = reader.GetInt32(0);
var largeData = reader.GetString(1);
var timestamp = reader.GetDateTime(2);
}
var memoryAfter = GC.GetTotalMemory(true);
return (memoryAfter - memoryBefore) / (1024 * 1024);
}
static async Task InitializeDatabase(string connectionString)
{
using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
// Create test table
var createTableSql = @"
IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='LargeDataTable' AND xtype='U')
CREATE TABLE LargeDataTable (
Id INT IDENTITY PRIMARY KEY,
LargeData NVARCHAR(MAX),
Timestamp DATETIME2
)";
using var createCommand = new SqlCommand(createTableSql, connection);
await createCommand.ExecuteNonQueryAsync();
// Insert sample data
for (int i = 0; i < 10000; i++)
{
var insertSql = "INSERT INTO LargeDataTable (LargeData, Timestamp) VALUES (@data, @time)";
using var insertCommand = new SqlCommand(insertSql, connection);
insertCommand.Parameters.AddWithValue("@data", new string('X', 10000)); // 10KB per row
insertCommand.Parameters.AddWithValue("@time", DateTime.Now);
await insertCommand.ExecuteNonQueryAsync();
}
}
}
}
3. SqlDataReader Streaming Techniques
Efficient Large Object Streaming
LargeTextStreamer.cs
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
using System.Text;
namespace DataStreamingDemo.Services
{
public class LargeTextStreamer
{
private readonly string _connectionString;
public LargeTextStreamer(string connectionString)
{
_connectionString = connectionString;
}
public async IAsyncEnumerable<string> StreamLargeTextDataAsync()
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
const string sql = @"
SELECT
Id,
LargeTextContent,
CreatedDate
FROM DocumentContents
WHERE LargeTextContent IS NOT NULL";
using var command = new SqlCommand(sql, connection);
using var reader = await command.ExecuteReaderAsync(
CommandBehavior.SequentialAccess | CommandBehavior.CloseConnection);
var buffer = new char[4096]; // 4KB buffer
var stringBuilder = new StringBuilder();
while (await reader.ReadAsync())
{
var id = reader.GetInt32(0);
// Stream large text content
using var textReader = reader.GetTextReader(1);
stringBuilder.Clear();
int charsRead;
while ((charsRead = await textReader.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
stringBuilder.Append(buffer, 0, charsRead);
// Yield when we have a reasonable chunk
if (stringBuilder.Length > 8192) // 8KB chunks
{
yield return $"Document {id}: {stringBuilder}";
stringBuilder.Clear();
}
}
// Yield remaining content
if (stringBuilder.Length > 0)
{
yield return $"Document {id}: {stringBuilder}";
}
}
}
public async Task ProcessLargeDocumentsInBatchesAsync(
Func<string, Task> processChunk,
int batchSize = 1000)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
var currentBatch = new List<string>();
await foreach (var chunk in StreamLargeTextDataAsync())
{
currentBatch.Add(chunk);
if (currentBatch.Count >= batchSize)
{
// Process batch concurrently
var tasks = currentBatch.Select(processChunk);
await Task.WhenAll(tasks);
currentBatch.Clear();
}
}
// Process remaining items
if (currentBatch.Count > 0)
{
var tasks = currentBatch.Select(processChunk);
await Task.WhenAll(tasks);
}
}
}
}
Binary Data Streaming
BinaryStreamProcessor.cs
using System.Data;
using Microsoft.Data.SqlClient;
using System.IO;
namespace DataStreamingDemo.Services
{
public class BinaryStreamProcessor
{
private readonly string _connectionString;
public BinaryStreamProcessor(string connectionString)
{
_connectionString = connectionString;
}
public async Task StreamLargeBinaryToFileAsync(int documentId, string outputFilePath)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
const string sql = @"
SELECT DocumentContent
FROM Documents
WHERE Id = @Id";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@Id", documentId);
using var reader = await command.ExecuteReaderAsync(
CommandBehavior.SequentialAccess | CommandBehavior.SingleRow);
if (await reader.ReadAsync())
{
using var stream = reader.GetStream(0);
using var fileStream = File.Create(outputFilePath);
await stream.CopyToAsync(fileStream);
}
}
public async Task<long> UploadLargeBinaryAsync(string filePath, string fileName)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
using var transaction = await connection.BeginTransactionAsync();
try
{
const string sql = @"
INSERT INTO Documents (FileName, DocumentContent, FileSize, UploadDate)
OUTPUT INSERTED.Id
VALUES (@FileName, @DocumentContent, @FileSize, @UploadDate)";
using var command = new SqlCommand(sql, connection, transaction);
command.Parameters.AddWithValue("@FileName", fileName);
command.Parameters.AddWithValue("@FileSize", new FileInfo(filePath).Length);
command.Parameters.AddWithValue("@UploadDate", DateTime.UtcNow);
// Use streaming for the binary parameter
using var fileStream = File.OpenRead(filePath);
command.Parameters.Add("@DocumentContent", SqlDbType.VarBinary, -1).Value = fileStream;
var documentId = (int)await command.ExecuteScalarAsync();
await transaction.CommitAsync();
return documentId;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
public async IAsyncEnumerable<byte[]> StreamBinaryInChunksAsync(int documentId, int chunkSize = 81920)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
const string sql = @"
SELECT DocumentContent
FROM Documents
WHERE Id = @Id";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@Id", documentId);
using var reader = await command.ExecuteReaderAsync(
CommandBehavior.SequentialAccess | CommandBehavior.SingleRow);
if (await reader.ReadAsync())
{
using var stream = reader.GetStream(0);
var buffer = new byte[chunkSize];
int bytesRead;
while ((bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
if (bytesRead < buffer.Length)
{
var actualData = new byte[bytesRead];
Array.Copy(buffer, actualData, bytesRead);
yield return actualData;
}
else
{
yield return buffer;
}
}
}
}
}
}
4. Bulk Copy Operations with SqlBulkCopy
High-Performance Data Import
BulkDataImporter.cs
using System.Data;
using Microsoft.Data.SqlClient;
using System.Collections.Concurrent;
namespace DataStreamingDemo.Services
{
public class BulkDataImporter
{
private readonly string _connectionString;
public BulkDataImporter(string connectionString)
{
_connectionString = connectionString;
}
public async Task BulkInsertAsync<T>(
IEnumerable<T> data,
string tableName,
Func<T, object[]> mapper,
string[] columnNames = null)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
using var bulkCopy = new SqlBulkCopy(connection)
{
DestinationTableName = tableName,
BatchSize = 5000, // Optimal batch size
BulkCopyTimeout = 300, // 5 minutes
EnableStreaming = true // Enable streaming for large datasets
};
// Create data table
var dataTable = CreateDataTable(data, mapper, columnNames);
// Configure column mappings if provided
if (columnNames != null)
{
foreach (var columnName in columnNames)
{
bulkCopy.ColumnMappings.Add(columnName, columnName);
}
}
await bulkCopy.WriteToServerAsync(dataTable);
}
private DataTable CreateDataTable<T>(
IEnumerable<T> data,
Func<T, object[]> mapper,
string[] columnNames)
{
var dataTable = new DataTable();
// Add columns based on first item
var firstItem = data.FirstOrDefault();
if (firstItem != null)
{
var firstRow = mapper(firstItem);
for (int i = 0; i < firstRow.Length; i++)
{
var columnName = columnNames?[i] ?? $"Column{i}";
dataTable.Columns.Add(columnName, firstRow[i]?.GetType() ?? typeof(string));
}
// Add rows
foreach (var item in data)
{
var row = mapper(item);
dataTable.Rows.Add(row);
}
}
return dataTable;
}
public async Task BulkInsertWithTransactionAsync<T>(
IEnumerable<T> data,
string tableName,
Func<T, object[]> mapper,
int batchSize = 10000)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
using var transaction = await connection.BeginTransactionAsync();
try
{
var batches = data.Batch(batchSize);
foreach (var batch in batches)
{
using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction)
{
DestinationTableName = tableName,
BatchSize = 5000,
BulkCopyTimeout = 300
};
var dataTable = CreateDataTable(batch, mapper, null);
await bulkCopy.WriteToServerAsync(dataTable);
}
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
public async Task ParallelBulkInsertAsync<T>(
IEnumerable<T> data,
string tableName,
Func<T, object[]> mapper,
int parallelDegree = 4)
{
var batches = data.Batch(10000).ToList();
var tasks = new List<Task>();
var semaphore = new SemaphoreSlim(parallelDegree);
foreach (var batch in batches)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
await BulkInsertAsync(batch, tableName, mapper);
}
finally
{
semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
}
}
// Extension method for batching
public static class EnumerableExtensions
{
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
{
var batch = new List<T>(batchSize);
foreach (var item in source)
{
batch.Add(item);
if (batch.Count >= batchSize)
{
yield return batch;
batch = new List<T>(batchSize);
}
}
if (batch.Count > 0)
yield return batch;
}
}
}
Real-World Bulk Import Example
CustomerDataImporter.cs
using System.Data;
using Microsoft.Data.SqlClient;
using CsvHelper;
using System.Globalization;
namespace DataStreamingDemo.Services
{
public class CustomerData
{
public string FirstName { get; set; }
public string LastName { get; set; }
public string Email { get; set; }
public string Phone { get; set; }
public string Address { get; set; }
public string City { get; set; }
public string Country { get; set; }
public DateTime RegistrationDate { get; set; }
}
public class CustomerDataImporter
{
private readonly string _connectionString;
private readonly BulkDataImporter _bulkImporter;
public CustomerDataImporter(string connectionString)
{
_connectionString = connectionString;
_bulkImporter = new BulkDataImporter(connectionString);
}
public async Task ImportCustomersFromCsvAsync(string csvFilePath)
{
using var reader = new StreamReader(csvFilePath);
using var csv = new CsvReader(reader, CultureInfo.InvariantCulture);
var customers = csv.GetRecords<CustomerData>();
await _bulkImporter.BulkInsertAsync(
customers,
"Customers",
customer => new object[]
{
customer.FirstName,
customer.LastName,
customer.Email,
customer.Phone,
customer.Address,
customer.City,
customer.Country,
customer.RegistrationDate
},
new[] { "FirstName", "LastName", "Email", "Phone", "Address", "City", "Country", "RegistrationDate" });
}
public async Task<int> ImportCustomersWithValidationAsync(string csvFilePath)
{
var validCustomers = new ConcurrentBag<CustomerData>();
var invalidCustomers = new ConcurrentBag<(CustomerData Customer, string Error)>();
// Read and validate data
await foreach (var customer in ReadCustomersAsync(csvFilePath))
{
var validationResult = ValidateCustomer(customer);
if (validationResult.IsValid)
{
validCustomers.Add(customer);
}
else
{
invalidCustomers.Add((customer, validationResult.ErrorMessage));
}
}
// Import valid customers
if (validCustomers.Any())
{
await _bulkImporter.BulkInsertAsync(
validCustomers,
"Customers",
customer => new object[]
{
customer.FirstName,
customer.LastName,
customer.Email,
customer.Phone,
customer.Address,
customer.City,
customer.Country,
customer.RegistrationDate
});
}
// Log invalid customers
await LogInvalidCustomersAsync(invalidCustomers);
return validCustomers.Count;
}
private async IAsyncEnumerable<CustomerData> ReadCustomersAsync(string csvFilePath)
{
using var reader = new StreamReader(csvFilePath);
using var csv = new CsvReader(reader, CultureInfo.InvariantCulture);
await foreach (var record in csv.GetRecordsAsync<CustomerData>())
{
yield return record;
}
}
private (bool IsValid, string ErrorMessage) ValidateCustomer(CustomerData customer)
{
if (string.IsNullOrWhiteSpace(customer.Email))
return (false, "Email is required");
if (string.IsNullOrWhiteSpace(customer.FirstName))
return (false, "First name is required");
if (customer.RegistrationDate > DateTime.Now)
return (false, "Registration date cannot be in the future");
return (true, string.Empty);
}
private async Task LogInvalidCustomersAsync(
IEnumerable<(CustomerData Customer, string Error)> invalidCustomers)
{
using var writer = new StreamWriter("invalid_customers.log", append: true);
foreach (var (customer, error) in invalidCustomers)
{
await writer.WriteLineAsync(
$"{DateTime.Now}: {customer.Email} - {error}. Data: {System.Text.Json.JsonSerializer.Serialize(customer)}");
}
}
}
}
5. Memory-Efficient Large Object Handling
Streaming Large Text Content
LargeTextProcessor.cs
using System.Data;
using Microsoft.Data.SqlClient;
using System.Text;
namespace DataStreamingDemo.Services
{
public class LargeTextProcessor
{
private readonly string _connectionString;
public LargeTextProcessor(string connectionString)
{
_connectionString = connectionString;
}
public async Task<string> StreamLargeTextAsync(int recordId)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
const string sql = @"
SELECT LargeTextColumn
FROM LargeTextTable
WHERE Id = @Id";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@Id", recordId);
using var reader = await command.ExecuteReaderAsync(
CommandBehavior.SequentialAccess | CommandBehavior.SingleRow);
if (await reader.ReadAsync())
{
using var textReader = reader.GetTextReader(0);
var stringBuilder = new StringBuilder();
var buffer = new char[4096];
int charsRead;
while ((charsRead = await textReader.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
stringBuilder.Append(buffer, 0, charsRead);
}
return stringBuilder.ToString();
}
return string.Empty;
}
public async IAsyncEnumerable<string> ProcessLargeTextInChunksAsync(
int recordId,
int chunkSize = 8192)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
const string sql = @"
SELECT LargeTextColumn
FROM LargeTextTable
WHERE Id = @Id";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@Id", recordId);
using var reader = await command.ExecuteReaderAsync(
CommandBehavior.SequentialAccess | CommandBehavior.SingleRow);
if (await reader.ReadAsync())
{
using var textReader = reader.GetTextReader(0);
var buffer = new char[chunkSize];
int charsRead;
while ((charsRead = await textReader.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
yield return new string(buffer, 0, charsRead);
}
}
}
public async Task<int> InsertLargeTextAsync(string largeText)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
const string sql = @"
INSERT INTO LargeTextTable (LargeTextColumn, CreatedDate)
OUTPUT INSERTED.Id
VALUES (@LargeText, @CreatedDate)";
using var command = new SqlCommand(sql, connection);
command.Parameters.AddWithValue("@CreatedDate", DateTime.UtcNow);
// Use streaming parameter for large text
using var textReader = new StringReader(largeText);
command.Parameters.Add("@LargeText", SqlDbType.NText).Value = textReader;
return (int)await command.ExecuteScalarAsync();
}
}
}
Memory Monitoring Helper
MemoryMonitor.cs
using System.Diagnostics;
namespace DataStreamingDemo.Utilities
{
public class MemoryMonitor : IDisposable
{
private readonly string _operationName;
private readonly Stopwatch _stopwatch;
private long _initialMemory;
public MemoryMonitor(string operationName)
{
_operationName = operationName;
_stopwatch = Stopwatch.StartNew();
_initialMemory = GC.GetTotalMemory(true);
Console.WriteLine($"Starting: {_operationName}");
Console.WriteLine($"Initial Memory: {_initialMemory / 1024 / 1024} MB");
}
public void Dispose()
{
_stopwatch.Stop();
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
var finalMemory = GC.GetTotalMemory(true);
var memoryUsed = finalMemory - _initialMemory;
Console.WriteLine($"Completed: {_operationName}");
Console.WriteLine($"Time Taken: {_stopwatch.Elapsed}");
Console.WriteLine($"Memory Used: {memoryUsed / 1024 / 1024} MB");
Console.WriteLine($"Final Memory: {finalMemory / 1024 / 1024} MB");
Console.WriteLine("---");
}
public static MemoryMonitor Start(string operationName)
{
return new MemoryMonitor(operationName);
}
}
}
6. Table-Valued Parameters for Bulk Operations
Efficient Bulk Parameter Passing
TableValuedParameterHelper.cs
using System.Data;
using Microsoft.Data.SqlClient;
namespace DataStreamingDemo.Services
{
public class OrderItem
{
public int ProductId { get; set; }
public int Quantity { get; set; }
public decimal UnitPrice { get; set; }
public string ProductName { get; set; }
}
public class TableValuedParameterHelper
{
private readonly string _connectionString;
public TableValuedParameterHelper(string connectionString)
{
_connectionString = connectionString;
}
// First, create the user-defined table type in SQL Server
public async Task CreateOrderItemTableTypeAsync()
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
var createTypeSql = @"
IF NOT EXISTS (SELECT * FROM sys.types WHERE name = 'OrderItemTableType')
CREATE TYPE OrderItemTableType AS TABLE
(
ProductId INT,
Quantity INT,
UnitPrice DECIMAL(18,2),
ProductName NVARCHAR(100)
)";
using var command = new SqlCommand(createTypeSql, connection);
await command.ExecuteNonQueryAsync();
}
public async Task<int> CreateOrderWithItemsAsync(
int customerId,
IEnumerable<OrderItem> orderItems)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
// Create data table for TVP
var itemsTable = new DataTable();
itemsTable.Columns.Add("ProductId", typeof(int));
itemsTable.Columns.Add("Quantity", typeof(int));
itemsTable.Columns.Add("UnitPrice", typeof(decimal));
itemsTable.Columns.Add("ProductName", typeof(string));
foreach (var item in orderItems)
{
itemsTable.Rows.Add(
item.ProductId,
item.Quantity,
item.UnitPrice,
item.ProductName);
}
using var command = new SqlCommand("CreateOrderWithItems", connection);
command.CommandType = CommandType.StoredProcedure;
command.Parameters.AddWithValue("@CustomerId", customerId);
command.Parameters.Add(new SqlParameter
{
ParameterName = "@OrderItems",
SqlDbType = SqlDbType.Structured,
TypeName = "dbo.OrderItemTableType",
Value = itemsTable
});
var orderIdParam = new SqlParameter("@OrderId", SqlDbType.Int)
{
Direction = ParameterDirection.Output
};
command.Parameters.Add(orderIdParam);
await command.ExecuteNonQueryAsync();
return (int)orderIdParam.Value;
}
public async Task BatchUpdateProductPricesAsync(IEnumerable<(int ProductId, decimal NewPrice)> priceUpdates)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
// Create data table for TVP
var updatesTable = new DataTable();
updatesTable.Columns.Add("ProductId", typeof(int));
updatesTable.Columns.Add("NewPrice", typeof(decimal));
foreach (var (productId, newPrice) in priceUpdates)
{
updatesTable.Rows.Add(productId, newPrice);
}
using var command = new SqlCommand("BatchUpdateProductPrices", connection);
command.CommandType = CommandType.StoredProcedure;
command.Parameters.Add(new SqlParameter
{
ParameterName = "@PriceUpdates",
SqlDbType = SqlDbType.Structured,
TypeName = "dbo.ProductPriceUpdateType",
Value = updatesTable
});
await command.ExecuteNonQueryAsync();
}
}
}
SQL Server Stored Procedures
CreateOrderWithItems Stored Procedure
CREATE OR ALTER PROCEDURE CreateOrderWithItems
@CustomerId INT,
@OrderItems OrderItemTableType READONLY,
@OrderId INT OUTPUT
AS
BEGIN
SET NOCOUNT ON;
BEGIN TRANSACTION;
BEGIN TRY
-- Create the order
INSERT INTO Orders (CustomerId, OrderDate, TotalAmount)
SELECT
@CustomerId,
GETUTCDATE(),
SUM(Quantity * UnitPrice)
FROM @OrderItems;
SET @OrderId = SCOPE_IDENTITY();
-- Insert order items
INSERT INTO OrderItems (OrderId, ProductId, Quantity, UnitPrice, ProductName)
SELECT
@OrderId,
ProductId,
Quantity,
UnitPrice,
ProductName
FROM @OrderItems;
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION;
THROW;
END CATCH
END
BatchUpdateProductPrices Stored Procedure
CREATE OR ALTER PROCEDURE BatchUpdateProductPrices
@PriceUpdates ProductPriceUpdateType READONLY
AS
BEGIN
SET NOCOUNT ON;
UPDATE p
SET p.UnitPrice = pu.NewPrice,
p.LastUpdated = GETUTCDATE()
FROM Products p
INNER JOIN @PriceUpdates pu ON p.Id = pu.ProductId;
END
7. Asynchronous Streaming Patterns
Advanced Async Streaming Implementation
AsyncDataStreamer.cs
using System.Data;
using Microsoft.Data.SqlClient;
using System.Collections.Concurrent;
using System.Threading.Channels;
namespace DataStreamingDemo.Services
{
public class AsyncDataStreamer<T>
{
private readonly string _connectionString;
private readonly Func<SqlDataReader, T> _mapper;
public AsyncDataStreamer(string connectionString, Func<SqlDataReader, T> mapper)
{
_connectionString = connectionString;
_mapper = mapper;
}
public IAsyncEnumerable<T> StreamDataAsync(string query, params SqlParameter[] parameters)
{
return StreamDataAsync(query, CommandType.Text, parameters);
}
public async IAsyncEnumerable<T> StreamDataAsync(
string query,
CommandType commandType,
params SqlParameter[] parameters)
{
using var connection = new SqlConnection(_connectionString);
await connection.OpenAsync();
using var command = new SqlCommand(query, connection)
{
CommandType = commandType
};
if (parameters.Length > 0)
{
command.Parameters.AddRange(parameters);
}
using var reader = await command.ExecuteReaderAsync(
CommandBehavior.SequentialAccess | CommandBehavior.CloseConnection);
while (await reader.ReadAsync())
{
yield return _mapper(reader);
}
}
public ChannelReader<T> CreateDataChannel(
string query,
int channelCapacity = 1000,
params SqlParameter[] parameters)
{
var channel = Channel.CreateBounded<T>(new BoundedChannelOptions(channelCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true
});
_ = Task.Run(async () =>
{
try
{
await foreach (var item in StreamDataAsync(query, parameters))
{
await channel.Writer.WriteAsync(item);
}
channel.Writer.Complete();
}
catch (Exception ex)
{
channel.Writer.Complete(ex);
}
});
return channel.Reader;
}
public async Task ProcessDataInParallelAsync(
string query,
Func<T, Task> processor,
int maxDegreeOfParallelism = 4,
params SqlParameter[] parameters)
{
var channelReader = CreateDataChannel(query, maxDegreeOfParallelism * 2, parameters);
var tasks = Enumerable.Range(0, maxDegreeOfParallelism)
.Select(_ => ProcessChannelAsync(channelReader, processor))
.ToArray();
await Task.WhenAll(tasks);
}
private async Task ProcessChannelAsync(ChannelReader<T> reader, Func<T, Task> processor)
{
await foreach (var item in reader.ReadAllAsync())
{
await processor(item);
}
}
public async Task<DataStreamingResult<T>> StreamDataWithStatsAsync(
string query,
params SqlParameter[] parameters)
{
var result = new DataStreamingResult<T>
{
StartTime = DateTime.UtcNow
};
var items = new List<T>();
await foreach (var item in StreamDataAsync(query, parameters))
{
items.Add(item);
result.RecordsProcessed++;
}
result.EndTime = DateTime.UtcNow;
result.Data = items;
return result;
}
}
public class DataStreamingResult<T>
{
public DateTime StartTime { get; set; }
public DateTime EndTime { get; set; }
public TimeSpan Duration => EndTime - StartTime;
public long RecordsProcessed { get; set; }
public double RecordsPerSecond => Duration.TotalSeconds > 0 ? RecordsProcessed / Duration.TotalSeconds : 0;
public List<T> Data { get; set; } = new List<T>();
}
}
Real-Time Data Streaming Example
RealTimeDataProcessor.cs
using System.Data;
using Microsoft.Data.SqlClient;
using System.Threading.Channels;
namespace DataStreamingDemo.Services
{
public class SensorReading
{
public int SensorId { get; set; }
public double Value { get; set; }
public DateTime Timestamp { get; set; }
public string SensorType { get; set; }
public string Location { get; set; }
}
public class RealTimeDataProcessor
{
private readonly string _connectionString;
private readonly AsyncDataStreamer<SensorReading> _streamer;
public RealTimeDataProcessor(string connectionString)
{
_connectionString = connectionString;
_streamer = new AsyncDataStreamer<SensorReading>(
connectionString,
MapSensorReading);
}
private SensorReading MapSensorReading(SqlDataReader reader)
{
return new SensorReading
{
SensorId = reader.GetInt32(0),
Value = reader.GetDouble(1),
Timestamp = reader.GetDateTime(2),
SensorType = reader.GetString(3),
Location = reader.GetString(4)
};
}
public async Task ProcessRealTimeSensorDataAsync(
DateTime startTime,
DateTime endTime,
Func<SensorReading, Task> processor)
{
const string query = @"
SELECT SensorId, Value, Timestamp, SensorType, Location
FROM SensorReadings
WHERE Timestamp BETWEEN @StartTime AND @EndTime
ORDER BY Timestamp";
var parameters = new[]
{
new SqlParameter("@StartTime", startTime),
new SqlParameter("@EndTime", endTime)
};
await _streamer.ProcessDataInParallelAsync(
query,
processor,
maxDegreeOfParallelism: 8,
parameters);
}
public async Task<ChannelReader<SensorReading>> CreateRealTimeDataChannelAsync(
string sensorType = null,
string location = null)
{
var query = @"
SELECT SensorId, Value, Timestamp, SensorType, Location
FROM SensorReadings
WHERE Timestamp >= DATEADD(hour, -1, GETUTCDATE())";
var parameters = new List<SqlParameter>();
if (!string.IsNullOrEmpty(sensorType))
{
query += " AND SensorType = @SensorType";
parameters.Add(new SqlParameter("@SensorType", sensorType));
}
if (!string.IsNullOrEmpty(location))
{
query += " AND Location = @Location";
parameters.Add(new SqlParameter("@Location", location));
}
query += " ORDER BY Timestamp DESC";
return _streamer.CreateDataChannel(query, 5000, parameters.ToArray());
}
public async Task GenerateRealTimeAlertsAsync()
{
var channel = await CreateRealTimeDataChannelAsync();
await foreach (var reading in channel.ReadAllAsync())
{
// Check for alert conditions
if (IsAlertCondition(reading))
{
await SendAlertAsync(reading);
}
// Process the reading
await ProcessSensorReadingAsync(reading);
}
}
private bool IsAlertCondition(SensorReading reading)
{
// Define alert conditions based on sensor type
return reading.SensorType switch
{
"Temperature" => reading.Value > 100.0, // High temperature alert
"Pressure" => reading.Value > 150.0, // High pressure alert
"Humidity" => reading.Value > 90.0, // High humidity alert
_ => false
};
}
private async Task SendAlertAsync(SensorReading reading)
{
// Implement alert sending logic
Console.WriteLine($"ALERT: Sensor {reading.SensorId} at {reading.Location} " +
$"reported {reading.Value} for {reading.SensorType} at {reading.Timestamp}");
await Task.Delay(100); // Simulate async operation
}
private async Task ProcessSensorReadingAsync(SensorReading reading)
{
// Implement reading processing logic
await Task.Delay(50); // Simulate processing time
}
}
}
8. Real-World ETL Pipeline Implementation
Complete ETL Pipeline Example
ETLPipeline.cs
using System.Data;
using Microsoft.Data.SqlClient;
using System.Text.Json;
namespace DataStreamingDemo.Pipelines
{
public class ETLPipelineConfiguration
{
public string SourceConnectionString { get; set; }
public string TargetConnectionString { get; set; }
public string SourceQuery { get; set; }
public string TargetTable { get; set; }
public int BatchSize { get; set; } = 10000;
public int MaxDegreeOfParallelism { get; set; } = 4;
public bool EnableTransaction { get; set; } = true;
public TimeSpan CommandTimeout { get; set; } = TimeSpan.FromMinutes(30);
}
public class ETLPipelineResult
{
public bool Success { get; set; }
public long RecordsProcessed { get; set; }
public TimeSpan Duration { get; set; }
public DateTime StartTime { get; set; }
public DateTime EndTime { get; set; }
public Exception Error { get; set; }
public Dictionary<string, object> Metrics { get; set; } = new Dictionary<string, object>();
}
public class ETLPipeline<T>
{
private readonly ETLPipelineConfiguration _config;
private readonly Func<SqlDataReader, T> _mapper;
private readonly Func<T, object[]> _targetMapper;
public ETLPipeline(
ETLPipelineConfiguration config,
Func<SqlDataReader, T> mapper,
Func<T, object[]> targetMapper)
{
_config = config;
_mapper = mapper;
_targetMapper = targetMapper;
}
public async Task<ETLPipelineResult> ExecuteAsync()
{
var result = new ETLPipelineResult
{
StartTime = DateTime.UtcNow
};
try
{
using var sourceConnection = new SqlConnection(_config.SourceConnectionString);
using var targetConnection = new SqlConnection(_config.TargetConnectionString);
await sourceConnection.OpenAsync();
await targetConnection.OpenAsync();
using var sourceCommand = new SqlCommand(_config.SourceQuery, sourceConnection)
{
CommandTimeout = (int)_config.CommandTimeout.TotalSeconds
};
using var reader = await sourceCommand.ExecuteReaderAsync(
CommandBehavior.SequentialAccess | CommandBehavior.CloseConnection);
SqlTransaction transaction = null;
if (_config.EnableTransaction)
{
transaction = (SqlTransaction)await targetConnection.BeginTransactionAsync();
}
var bulkCopy = new SqlBulkCopy(targetConnection, SqlBulkCopyOptions.Default, transaction)
{
DestinationTableName = _config.TargetTable,
BatchSize = _config.BatchSize,
BulkCopyTimeout = (int)_config.CommandTimeout.TotalSeconds,
EnableStreaming = true
};
// Configure column mappings if needed
await ConfigureBulkCopyMappingsAsync(bulkCopy, targetConnection);
var dataTable = new DataTable();
var currentBatch = new List<T>();
long totalRecords = 0;
while (await reader.ReadAsync())
{
var item = _mapper(reader);
currentBatch.Add(item);
if (currentBatch.Count >= _config.BatchSize)
{
dataTable = CreateDataTable(currentBatch);
await bulkCopy.WriteToServerAsync(dataTable);
totalRecords += currentBatch.Count;
currentBatch.Clear();
dataTable.Clear();
}
}
// Process remaining records
if (currentBatch.Count > 0)
{
dataTable = CreateDataTable(currentBatch);
await bulkCopy.WriteToServerAsync(dataTable);
totalRecords += currentBatch.Count;
}
if (transaction != null)
{
await transaction.CommitAsync();
}
result.RecordsProcessed = totalRecords;
result.Success = true;
}
catch (Exception ex)
{
result.Success = false;
result.Error = ex;
}
finally
{
result.EndTime = DateTime.UtcNow;
result.Duration = result.EndTime - result.StartTime;
}
return result;
}
private DataTable CreateDataTable(List<T> data)
{
var dataTable = new DataTable();
if (data.Any())
{
var firstRow = _targetMapper(data.First());
for (int i = 0; i < firstRow.Length; i++)
{
dataTable.Columns.Add($"Column{i}", firstRow[i]?.GetType() ?? typeof(object));
}
foreach (var item in data)
{
var row = _targetMapper(item);
dataTable.Rows.Add(row);
}
}
return dataTable;
}
private async Task ConfigureBulkCopyMappingsAsync(SqlBulkCopy bulkCopy, SqlConnection connection)
{
// Auto-configure column mappings based on target table schema
using var command = new SqlCommand(
$"SELECT TOP 1 * FROM {_config.TargetTable}", connection);
using var reader = await command.ExecuteReaderAsync(CommandBehavior.SchemaOnly);
var schemaTable = reader.GetSchemaTable();
if (schemaTable != null)
{
foreach (DataRow row in schemaTable.Rows)
{
var columnName = row["ColumnName"].ToString();
bulkCopy.ColumnMappings.Add(columnName, columnName);
}
}
}
public async Task<ETLPipelineResult> ExecuteParallelAsync()
{
var result = new ETLPipelineResult
{
StartTime = DateTime.UtcNow
};
try
{
// Read all source data IDs first
var sourceIds = await ReadSourceIdsAsync();
// Process in parallel batches
var batches = sourceIds.Batch(_config.BatchSize);
var tasks = new List<Task<long>>();
var semaphore = new SemaphoreSlim(_config.MaxDegreeOfParallelism);
foreach (var batch in batches)
{
await semaphore.WaitAsync();
tasks.Add(Task.Run(async () =>
{
try
{
return await ProcessBatchAsync(batch.ToList());
}
finally
{
semaphore.Release();
}
}));
}
var batchResults = await Task.WhenAll(tasks);
result.RecordsProcessed = batchResults.Sum();
result.Success = true;
}
catch (Exception ex)
{
result.Success = false;
result.Error = ex;
}
finally
{
result.EndTime = DateTime.UtcNow;
result.Duration = result.EndTime - result.StartTime;
}
return result;
}
private async Task<List<int>> ReadSourceIdsAsync()
{
var ids = new List<int>();
using var connection = new SqlConnection(_config.SourceConnectionString);
await connection.OpenAsync();
var idQuery = _config.SourceQuery.Replace("*", "Id");
using var command = new SqlCommand(idQuery, connection);
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
ids.Add(reader.GetInt32(0));
}
return ids;
}
private async Task<long> ProcessBatchAsync(List<int> batchIds)
{
if (!batchIds.Any()) return 0;
using var sourceConnection = new SqlConnection(_config.SourceConnectionString);
using var targetConnection = new SqlConnection(_config.TargetConnectionString);
await sourceConnection.OpenAsync();
await targetConnection.OpenAsync();
var batchQuery = $"{_config.SourceQuery} WHERE Id IN ({string.Join(",", batchIds)})";
using var sourceCommand = new SqlCommand(batchQuery, sourceConnection);
using var reader = await sourceCommand.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
var bulkCopy = new SqlBulkCopy(targetConnection)
{
DestinationTableName = _config.TargetTable,
BatchSize = 1000,
EnableStreaming = true
};
var dataTable = new DataTable();
var recordsProcessed = 0L;
while (await reader.ReadAsync())
{
var item = _mapper(reader);
var row = _targetMapper(item);
if (dataTable.Columns.Count == 0)
{
for (int i = 0; i < row.Length; i++)
{
dataTable.Columns.Add($"Column{i}", row[i]?.GetType() ?? typeof(object));
}
}
dataTable.Rows.Add(row);
recordsProcessed++;
if (dataTable.Rows.Count >= 1000)
{
await bulkCopy.WriteToServerAsync(dataTable);
dataTable.Clear();
}
}
if (dataTable.Rows.Count > 0)
{
await bulkCopy.WriteToServerAsync(dataTable);
}
return recordsProcessed;
}
}
}
ETL Pipeline Usage Example
CustomerMigrationPipeline.cs
using System.Data;
using Microsoft.Data.SqlClient;
namespace DataStreamingDemo.Pipelines
{
public class CustomerMigrationPipeline
{
public async Task<ETLPipelineResult> MigrateCustomersAsync()
{
var config = new ETLPipelineConfiguration
{
SourceConnectionString = "SourceDBConnectionString",
TargetConnectionString = "TargetDBConnectionString",
SourceQuery = @"
SELECT
CustomerId,
FirstName,
LastName,
Email,
Phone,
Address,
City,
Country,
RegistrationDate,
LastActivityDate,
TotalOrders,
TotalSpent
FROM Customers
WHERE RegistrationDate >= '2020-01-01'",
TargetTable = "MigratedCustomers",
BatchSize = 5000,
MaxDegreeOfParallelism = 3,
EnableTransaction = true,
CommandTimeout = TimeSpan.FromMinutes(15)
};
var pipeline = new ETLPipeline<CustomerData>(config, MapSourceCustomer, MapTargetCustomer);
return await pipeline.ExecuteParallelAsync();
}
private CustomerData MapSourceCustomer(SqlDataReader reader)
{
return new CustomerData
{
CustomerId = reader.GetInt32(0),
FirstName = reader.GetString(1),
LastName = reader.GetString(2),
Email = reader.GetString(3),
Phone = reader.IsDBNull(4) ? null : reader.GetString(4),
Address = reader.GetString(5),
City = reader.GetString(6),
Country = reader.GetString(7),
RegistrationDate = reader.GetDateTime(8),
LastActivityDate = reader.IsDBNull(9) ? (DateTime?)null : reader.GetDateTime(9),
TotalOrders = reader.GetInt32(10),
TotalSpent = reader.GetDecimal(11)
};
}
private object[] MapTargetCustomer(CustomerData customer)
{
return new object[]
{
customer.CustomerId,
customer.FirstName,
customer.LastName,
customer.Email,
customer.Phone ?? (object)DBNull.Value,
customer.Address,
customer.City,
customer.Country,
customer.RegistrationDate,
customer.LastActivityDate ?? (object)DBNull.Value,
customer.TotalOrders,
customer.TotalSpent,
DateTime.UtcNow // MigrationTimestamp
};
}
}
public class CustomerData
{
public int CustomerId { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public string Email { get; set; }
public string Phone { get; set; }
public string Address { get; set; }
public string City { get; set; }
public string Country { get; set; }
public DateTime RegistrationDate { get; set; }
public DateTime? LastActivityDate { get; set; }
public int TotalOrders { get; set; }
public decimal TotalSpent { get; set; }
}
}
9. Performance Monitoring and Optimization
Comprehensive Performance Monitor
PerformanceMonitor.cs
using System.Diagnostics;
using System.Text.Json;
namespace DataStreamingDemo.Monitoring
{
public class PerformanceMetrics
{
public string OperationName { get; set; }
public DateTime StartTime { get; set; }
public DateTime EndTime { get; set; }
public TimeSpan Duration => EndTime - StartTime;
public long MemoryBefore { get; set; }
public long MemoryAfter { get; set; }
public long MemoryUsed => MemoryAfter - MemoryBefore;
public long RecordsProcessed { get; set; }
public double RecordsPerSecond => Duration.TotalSeconds > 0 ? RecordsProcessed / Duration.TotalSeconds : 0;
public Exception Error { get; set; }
public Dictionary<string, object> CustomMetrics { get; set; } = new Dictionary<string, object>();
public List<PerformanceMetrics> ChildOperations { get; set; } = new List<PerformanceMetrics>();
}
public class PerformanceMonitor : IDisposable
{
private readonly PerformanceMetrics _metrics;
private readonly Stopwatch _stopwatch;
private readonly bool _enableMemoryTracking;
public PerformanceMonitor(string operationName, bool enableMemoryTracking = true)
{
_metrics = new PerformanceMetrics
{
OperationName = operationName,
StartTime = DateTime.UtcNow
};
_enableMemoryTracking = enableMemoryTracking;
_stopwatch = Stopwatch.StartNew();
if (_enableMemoryTracking)
{
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
_metrics.MemoryBefore = GC.GetTotalMemory(true);
}
}
public void AddCustomMetric(string key, object value)
{
_metrics.CustomMetrics[key] = value;
}
public void RecordProgress(long recordsProcessed)
{
_metrics.RecordsProcessed = recordsProcessed;
}
public void AddChildOperation(PerformanceMetrics childMetrics)
{
_metrics.ChildOperations.Add(childMetrics);
}
public void Dispose()
{
_stopwatch.Stop();
_metrics.EndTime = DateTime.UtcNow;
if (_enableMemoryTracking)
{
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
_metrics.MemoryAfter = GC.GetTotalMemory(true);
}
// Log metrics
LogMetrics(_metrics);
}
public void SetError(Exception error)
{
_metrics.Error = error;
}
public PerformanceMetrics GetMetrics()
{
return _metrics;
}
private void LogMetrics(PerformanceMetrics metrics)
{
var logEntry = new
{
Timestamp = DateTime.UtcNow,
Operation = metrics.OperationName,
DurationSeconds = metrics.Duration.TotalSeconds,
MemoryUsedMB = metrics.MemoryUsed / 1024 / 1024,
RecordsProcessed = metrics.RecordsProcessed,
RecordsPerSecond = metrics.RecordsPerSecond,
Error = metrics.Error?.Message,
CustomMetrics = metrics.CustomMetrics,
ChildOperations = metrics.ChildOperations.Count
};
var logMessage = JsonSerializer.Serialize(logEntry, new JsonSerializerOptions
{
WriteIndented = true
});
Console.WriteLine("=== PERFORMANCE METRICS ===");
Console.WriteLine(logMessage);
Console.WriteLine("===========================");
// Also write to file for persistence
File.AppendAllText("performance_metrics.log", $"{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss} - {logMessage}{Environment.NewLine}");
}
public static PerformanceMonitor Start(string operationName, bool enableMemoryTracking = true)
{
return new PerformanceMonitor(operationName, enableMemoryTracking);
}
}
public class BulkOperationMonitor
{
public static async Task<PerformanceMetrics> MonitorBulkOperationAsync(
string operationName,
Func<Task<long>> operation)
{
using var monitor = PerformanceMonitor.Start(operationName);
try
{
var recordsProcessed = await operation();
monitor.RecordProgress(recordsProcessed);
return monitor.GetMetrics();
}
catch (Exception ex)
{
monitor.SetError(ex);
throw;
}
}
public static PerformanceMetrics MonitorBulkOperation(
string operationName,
Func<long> operation)
{
using var monitor = PerformanceMonitor.Start(operationName);
try
{
var recordsProcessed = operation();
monitor.RecordProgress(recordsProcessed);
return monitor.GetMetrics();
}
catch (Exception ex)
{
monitor.SetError(ex);
throw;
}
}
}
}
Performance Optimization Tips
OptimizationTips.cs
namespace DataStreamingDemo.Optimization
{
public static class SqlClientOptimizationTips
{
public static class DataReaderOptimizations
{
public const string UseSequentialAccess =
"Use CommandBehavior.SequentialAccess for large BLOB/CLOB data";
public const string UseTypedAccessors =
"Use GetInt32, GetString etc. instead of indexer for better performance";
public const string CloseReaderEarly =
"Close DataReader as soon as possible to release database resources";
public const string UseAsyncMethods =
"Use ExecuteReaderAsync for non-blocking database operations";
}
public static class BulkCopyOptimizations
{
public const string BatchSize =
"Set optimal BatchSize (typically 1000-5000) for SqlBulkCopy";
public const string EnableStreaming =
"Set EnableStreaming = true for large data transfers";
public const string UseTransactions =
"Wrap bulk operations in transactions for data consistency";
public const string TableLock =
"Consider using SqlBulkCopyOptions.TableLock for faster bulk inserts";
}
public static class ConnectionOptimizations
{
public const string ConnectionPooling =
"Use connection pooling (enabled by default) for better performance";
public const string AsyncConnections =
"Use OpenAsync() for non-blocking connection establishment";
public const string ProperDisposal =
"Always dispose connections to return them to the pool";
public const string ConnectionTimeout =
"Set appropriate connection timeout based on network latency";
}
public static class MemoryOptimizations
{
public const string StreamLargeObjects =
"Stream large objects instead of loading them entirely into memory";
public const string UseDataTableEfficiently =
"Clear DataTable after use and set appropriate initial capacity";
public const string BufferManagement =
"Use appropriate buffer sizes (4KB-8KB) for streaming operations";
public const string GarbageCollection =
"Force GC collection after large operations if memory pressure is high";
}
public static class ParallelismOptimizations
{
public const string DegreeOfParallelism =
"Set optimal degree of parallelism based on database and system resources";
public const string ResourceContention =
"Monitor for resource contention when using parallel operations";
public const string BatchProcessing =
"Process data in batches to balance memory usage and performance";
public const string ChannelCapacity =
"Set appropriate channel capacity for producer-consumer scenarios";
}
}
public class PerformanceChecklist
{
public List<string> PreOperationChecks { get; } = new List<string>
{
"✓ Verify database connection string",
"✓ Check network connectivity to database server",
"✓ Validate SQL query performance with execution plans",
"✓ Ensure adequate memory availability",
"✓ Configure appropriate timeouts",
"✓ Set optimal batch sizes",
"✓ Enable connection pooling"
};
public List<string> DuringOperationChecks { get; } = new List<string>
{
"✓ Monitor memory usage during large operations",
"✓ Track operation progress and estimated completion time",
"✓ Watch for database blocking and deadlocks",
"✓ Monitor network throughput",
"✓ Check for exceptions and errors",
"✓ Validate data consistency at checkpoints"
};
public List<string> PostOperationChecks { get; } = new List<string>
{
"✓ Verify all records were processed",
"✓ Check for data integrity issues",
"✓ Analyze performance metrics",
"✓ Review error logs",
"✓ Clean up temporary resources",
"✓ Document performance findings"
};
public void PrintChecklist(string phase)
{
var checks = phase.ToLower() switch
{
"pre" => PreOperationChecks,
"during" => DuringOperationChecks,
"post" => PostOperationChecks,
_ => PreOperationChecks.Concat(DuringOperationChecks).Concat(PostOperationChecks).ToList()
};
Console.WriteLine($"=== {phase.ToUpper()} OPERATION CHECKS ===");
foreach (var check in checks)
{
Console.WriteLine(check);
}
Console.WriteLine("=================================");
}
}
}
10. Best Practices and Common Pitfalls
Comprehensive Best Practices Guide
BestPracticesGuide.cs
using System.Data;
using Microsoft.Data.SqlClient;
namespace DataStreamingDemo.BestPractices
{
public class SqlClientBestPractices
{
public static class ConnectionManagement
{
public static async Task<SqlConnection> CreateAndOpenConnectionAsync(string connectionString)
{
var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
return connection;
}
public static void UseConnectionPooling(string connectionString)
{
// Connection pooling is enabled by default
// Ensure connection string doesn't have "Pooling=false"
var builder = new SqlConnectionStringBuilder(connectionString)
{
MaxPoolSize = 100, // Adjust based on your needs
MinPoolSize = 0,
Pooling = true
};
}
}
public static class CommandExecution
{
public static SqlCommand CreateCommandWithTimeout(
SqlConnection connection,
string query,
TimeSpan timeout)
{
return new SqlCommand(query, connection)
{
CommandTimeout = (int)timeout.TotalSeconds
};
}
public static void UseStoredProceduresForComplexOperations(SqlConnection connection)
{
// Prefer stored procedures for:
// - Complex business logic
// - Multiple operations
// - Transactions
// - Security (parameterized queries)
}
public static void AddParametersSafely(SqlCommand command, params SqlParameter[] parameters)
{
foreach (var parameter in parameters)
{
command.Parameters.Add(parameter);
}
}
}
public static class DataReading
{
public static async Task<List<T>> ReadDataEfficientlyAsync<T>(
SqlConnection connection,
string query,
Func<SqlDataReader, T> mapper)
{
var results = new List<T>();
using var command = new SqlCommand(query, connection);
using var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
{
results.Add(mapper(reader));
}
return results;
}
public static async IAsyncEnumerable<T> StreamDataEfficientlyAsync<T>(
SqlConnection connection,
string query,
Func<SqlDataReader, T> mapper)
{
using var command = new SqlCommand(query, connection);
using var reader = await command.ExecuteReaderAsync(CommandBehavior.SequentialAccess);
while (await reader.ReadAsync())
{
yield return mapper(reader);
}
}
}
public static class BulkOperations
{
public static SqlBulkCopy CreateOptimizedBulkCopy(SqlConnection connection, string tableName)
{
return new SqlBulkCopy(connection)
{
DestinationTableName = tableName,
BatchSize = 5000,
BulkCopyTimeout = 300,
EnableStreaming = true
};
}
public static async Task BulkInsertWithTransactionAsync(
SqlConnection connection,
DataTable data,
string tableName)
{
using var transaction = await connection.BeginTransactionAsync();
try
{
using var bulkCopy = CreateOptimizedBulkCopy(connection, tableName);
bulkCopy.EnableStreaming = true;
await bulkCopy.WriteToServerAsync(data);
await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
public static class ErrorHandling
{
public static async Task<T> ExecuteWithRetryAsync<T>(
Func<Task<T>> operation,
int maxRetries = 3,
TimeSpan initialDelay = default)
{
if (initialDelay == default)
initialDelay = TimeSpan.FromSeconds(1);
var retries = 0;
var delay = initialDelay;
while (true)
{
try
{
return await operation();
}
catch (SqlException ex) when (IsTransientError(ex) && retries < maxRetries)
{
retries++;
await Task.Delay(delay);
delay = TimeSpan.FromSeconds(delay.TotalSeconds * 2); // Exponential backoff
}
}
}
private static bool IsTransientError(SqlException ex)
{
// SQL Server transient error numbers
int[] transientErrors = { 4060, 40197, 40501, 40613, 49918, 49919, 49920, 11001 };
return transientErrors.Contains(ex.Number);
}
public static void LogSqlException(SqlException ex, string operation)
{
Console.WriteLine($"SQL Error in {operation}:");
Console.WriteLine($"Message: {ex.Message}");
Console.WriteLine($"Number: {ex.Number}");
Console.WriteLine($"State: {ex.State}");
Console.WriteLine($"Class: {ex.Class}");
Console.WriteLine($"Server: {ex.Server}");
Console.WriteLine($"Procedure: {ex.Procedure}");
Console.WriteLine($"LineNumber: {ex.LineNumber}");
}
}
}
public class CommonPitfalls
{
public static class MemoryLeaks
{
public static void AvoidConnectionLeaks()
{
// BAD: Connection not disposed
// var connection = new SqlConnection(connectionString);
// GOOD: Using statement ensures disposal
using var connection = new SqlConnection(connectionString);
}
public static void AvoidDataReaderLeaks()
{
// BAD: Reader not disposed
// var reader = command.ExecuteReader();
// GOOD: Using statement ensures disposal
using var reader = command.ExecuteReader();
}
public static void AvoidLargeObjectAllocations()
{
// BAD: Loading large data into memory
// var hugeList = context.BigTable.ToList();
// GOOD: Stream data
// await foreach (var item in StreamDataAsync()) { }
}
}
public static class PerformanceIssues
{
public static void AvoidNPlusOneQueries()
{
// BAD: Separate query for each item
// foreach (var user in users) {
// var orders = GetOrdersForUser(user.Id);
// }
// GOOD: Single query with JOIN or WHERE IN
// var allOrders = GetOrdersForUsers(userIds);
}
public static void UseAppropriateIsolationLevels()
{
// BAD: Unnecessarily high isolation level
// using var transaction = connection.BeginTransaction(IsolationLevel.Serializable);
// GOOD: Use appropriate isolation level
using var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
}
public static void OptimizeBatchSizes()
{
// BAD: Too small or too large batch sizes
// bulkCopy.BatchSize = 100000; // Too large
// bulkCopy.BatchSize = 10; // Too small
// GOOD: Optimal batch size
// bulkCopy.BatchSize = 5000; // Just right
}
}
public static class ResourceManagement
{
public static void ManageParallelism()
{
// BAD: Unlimited parallelism
// Parallel.ForEach(items, item => Process(item));
// GOOD: Controlled parallelism
var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.ForEach(items, options, item => Process(item));
}
public static void HandleCancellationProperly()
{
// BAD: Ignoring cancellation
// await LongRunningOperation();
// GOOD: Supporting cancellation
// await LongRunningOperation(cancellationToken);
}
public static void CleanupTemporaryResources()
{
// BAD: Leaving temporary files
// File.WriteAllText(tempFile, data);
// GOOD: Clean up temporary resources
try
{
File.WriteAllText(tempFile, data);
// Process file
}
finally
{
if (File.Exists(tempFile))
File.Delete(tempFile);
}
}
}
}
public class RealWorldScenarioExamples
{
public async Task ProcessLargeCustomerExportAsync()
{
var bestPractices = new List<string>
{
"Use streaming instead of loading all data into memory",
"Process data in batches to manage memory",
"Use async/await for non-blocking operations",
"Implement proper error handling and logging",
"Monitor performance and resource usage",
"Clean up resources properly",
"Support cancellation for long-running operations"
};
// Implementation would go here...
await Task.CompletedTask;
}
public async Task HandleDatabaseMigrationAsync()
{
var migrationSteps = new List<string>
{
"1. Analyze source and target schema differences",
"2. Create efficient data extraction queries",
"3. Implement data transformation logic",
"4. Use bulk operations for data loading",
"5. Implement data validation checks",
"6. Create rollback strategy",
"7. Test with production-like data volumes",
"8. Monitor performance during migration",
"9. Verify data integrity post-migration",
"10. Document the migration process"
};
// Implementation would go here...
await Task.CompletedTask;
}
public async Task BuildRealTimeDashboardAsync()
{
var dashboardComponents = new List<string>
{
"Use efficient queries with proper indexes",
"Implement data caching where appropriate",
"Use streaming for real-time data updates",
"Implement proper connection management",
"Use parameterized queries for security",
"Monitor query performance regularly",
"Implement graceful degradation",
"Use async operations for responsiveness"
};
// Implementation would go here...
await Task.CompletedTask;
}
}
}
This comprehensive Part 7 covers advanced data streaming and bulk operations with SqlClient, providing real-world examples, performance optimizations, and best practices for handling high-volume data scenarios in ASP.NET Core applications. The examples demonstrate efficient memory usage, parallel processing, and enterprise-grade ETL pipeline implementations.
📘ASP.NET Core Mastery with Latest Features : 40-Part Series