Exploring the BlockingCollection<T> Class in .NET

Introduction

In the world of concurrent programming in .NET, developers often encounter scenarios where multiple threads need to communicate and synchronize access to shared data structures. The BlockingCollection<T> class, introduced in the System.Collections.Concurrent namespace, offers a powerful solution for such scenarios. In this article, we'll delve into the BlockingCollection<T> class, explore its features, and learn how it simplifies concurrent programming in .NET.

Understanding BlockingCollection<T>

The BlockingCollection<T> class is a thread-safe collection designed for scenarios involving producer-consumer patterns and concurrent access by multiple threads. It provides a wrapper around another collection type (such as ConcurrentQueue<T> or ConcurrentStack<T>) and offers blocking operations that automatically handle synchronization and thread coordination.

Key Features and Benefits

  1. Thread-Safe Operations: BlockingCollection<T> ensures thread safety by providing synchronized access to its underlying collection, allowing multiple threads to add and remove items concurrently without explicit locking mechanisms.
  2. Blocking Operations: BlockingCollection<T> offers blocking methods such as Add(), Take(), and TryTake(), which automatically block or wait for items to become available or space to become available in the collection, eliminating the need for busy-waiting or manual synchronization.
  3. Bounded and Unbounded Modes: BlockingCollection<T> supports both bounded and unbounded modes. In bounded mode, the collection has a maximum capacity, and adding an item blocks until space becomes available. In unbounded mode, the collection grows dynamically as needed.
  4. Integration with Parallel Programming: BlockingCollection<T> seamlessly integrates with other concurrent programming constructs in .NET, such as tasks and parallel loops, making it a versatile tool for building scalable and efficient parallel and concurrent applications.

Example Usage

The following example shows how to add and take items concurrently from a blocking collection:

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

class BlockingCollectionDemo
{
    static async Task Main()
    {
        await AddTakeDemo.BC_AddTakeCompleteAdding();
        TryTakeDemo.BC_TryTake();
        FromToAnyDemo.BC_FromToAny();
        await ConsumingEnumerableDemo.BC_GetConsumingEnumerable();
        Console.WriteLine("Press any key to exit.");
        Console.ReadKey();
    }
}
class AddTakeDemo
{
    // Demonstrates:
    //      BlockingCollection<T>.Add()
    //      BlockingCollection<T>.Take()
    //      BlockingCollection<T>.CompleteAdding()
    public static async Task BC_AddTakeCompleteAdding()
    {
        using (BlockingCollection<int> bc = new BlockingCollection<int>())
        {
            // Spin up a Task to populate the BlockingCollection
            Task t1 = Task.Run(() =>
            {
                bc.Add(1);
                bc.Add(2);
                bc.Add(3);
                bc.CompleteAdding();
            });

            // Spin up a Task to consume the BlockingCollection
            Task t2 = Task.Run(() =>
            {
                try
                {
                    // Consume the BlockingCollection
                    while (true) Console.WriteLine(bc.Take());
                }
                catch (InvalidOperationException)
                {
                    // An InvalidOperationException means that Take() was called on a completed collection
                    Console.WriteLine("That's All!");
                }
            });

            await Task.WhenAll(t1, t2);
        }
    }
}

class TryTakeDemo
{
    // Demonstrates:
    //      BlockingCollection<T>.Add()
    //      BlockingCollection<T>.CompleteAdding()
    //      BlockingCollection<T>.TryTake()
    //      BlockingCollection<T>.IsCompleted
    public static void BC_TryTake()
    {
        // Construct and fill our BlockingCollection
        using (BlockingCollection<int> bc = new BlockingCollection<int>())
        {
            int NUMITEMS = 10000;
            for (int i = 0; i < NUMITEMS; i++) bc.Add(i);
            bc.CompleteAdding();
            int outerSum = 0;

            // Delegate for consuming the BlockingCollection and adding up all items
            Action action = () =>
            {
                int localItem;
                int localSum = 0;

                while (bc.TryTake(out localItem)) localSum += localItem;
                Interlocked.Add(ref outerSum, localSum);
            };

            // Launch three parallel actions to consume the BlockingCollection
            Parallel.Invoke(action, action, action);

            Console.WriteLine("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2));
            Console.WriteLine("bc.IsCompleted = {0} (should be true)", bc.IsCompleted);
        }
    }
}

class FromToAnyDemo
{
    // Demonstrates:
    //      Bounded BlockingCollection<T>
    //      BlockingCollection<T>.TryAddToAny()
    //      BlockingCollection<T>.TryTakeFromAny()
    public static void BC_FromToAny()
    {
        BlockingCollection<int>[] bcs = new BlockingCollection<int>[2];
        bcs[0] = new BlockingCollection<int>(5); // collection bounded to 5 items
        bcs[1] = new BlockingCollection<int>(5); // collection bounded to 5 items

        // Should be able to add 10 items w/o blocking
        int numFailures = 0;
        for (int i = 0; i < 10; i++)
        {
            if (BlockingCollection<int>.TryAddToAny(bcs, i) == -1) numFailures++;
        }
        Console.WriteLine("TryAddToAny: {0} failures (should be 0)", numFailures);

        // Should be able to retrieve 10 items
        int numItems = 0;
        int item;
        while (BlockingCollection<int>.TryTakeFromAny(bcs, out item) != -1) numItems++;
        Console.WriteLine("TryTakeFromAny: retrieved {0} items (should be 10)", numItems);
    }
}

class ConsumingEnumerableDemo
{
    // Demonstrates:
    //      BlockingCollection<T>.Add()
    //      BlockingCollection<T>.CompleteAdding()
    //      BlockingCollection<T>.GetConsumingEnumerable()
    public static async Task BC_GetConsumingEnumerable()
    {
        using (BlockingCollection<int> bc = new BlockingCollection<int>())
        {
            // Kick off a producer task
            var producerTask = Task.Run(async () =>
            {
                for (int i = 0; i < 10; i++)
                {
                    bc.Add(i);
                    Console.WriteLine($"Producing: {i}");

                    await Task.Delay(100); // sleep 100 ms between adds
                }

                // Need to do this to keep foreach below from hanging
                bc.CompleteAdding();
            });

            // Now consume the blocking collection with foreach.
            // Use bc.GetConsumingEnumerable() instead of just bc because the
            // former will block waiting for completion and the latter will
            // simply take a snapshot of the current state of the underlying collection.
            foreach (var item in bc.GetConsumingEnumerable())
            {
                Console.WriteLine($"Consuming: {item}");
            }
            await producerTask; // Allow task to complete cleanup
        }
    }
}

Remarks

BlockingCollection<T> is a thread-safe collection class that provides the following:

Conclusion

The BlockingCollection<T> class in .NET provides a robust and efficient mechanism for building thread-safe producer-consumer scenarios and simplifying concurrent programming. By encapsulating synchronized access and offering blocking operations, BlockingCollection<T> streamlines the development of parallel and concurrent applications, enabling developers to focus on application logic rather than low-level synchronization concerns. Whether in bounded or unbounded mode, BlockingCollection<T> empowers developers to write scalable, responsive, and efficient code in .NET applications.


Similar Articles