Concurrency And The ConcurrentBag In C#

Introduction

 
In today’s article we will look at an the ConcurrentBag and how we can use this object for storing items that need to be accessed from different threads/tasks. We will also look at a scenario where the use of a concurrent bag alone will not make our code thread safe. We will then look at how to fix the solution.

The ConcurrentBag in C#

 
The ConcurrentBag is one of the thread safe collections that was introduced in .NET 4.0. This collection allows us to store objects in an unordered manner and allows for duplicates. It is useful in a scenario where we do not need to worry about the order in which we would retrieve the objects from the collection. To investigate basic features of the ConcurrentBag and how to add and remove items, please refer to the MSDN documentation.

Using the ConcurrentBag

 
Let us create a simple console application using .NET Core 3.1 in Visual Studio Community Edition as below,
  1. using System;  
  2. using System.Collections.Concurrent;  
  3. using System.Threading.Tasks;  
  4. namespace ConcurrentBag {  
  5.     class Program {  
  6.         static void Main(string[] args) {  
  7.             ConcurrentBag < int > bag = new ConcurrentBag < int > ();  
  8.             for (inti = 1; i <= 50; ++i) {  
  9.                 bag.Add(i);  
  10.             }  
  11.             var task1 = Task.Factory.StartNew(() => {  
  12.                 while (bag.IsEmpty == false) {  
  13.                     intitem;  
  14.                     if (bag.TryTake(out item)) {  
  15.                         Console.WriteLine($ "{item} was picked by {Task.CurrentId}");  
  16.                     }  
  17.                 }  
  18.             });  
  19.             var task2 = Task.Factory.StartNew(() => {  
  20.                 while (bag.IsEmpty == false) {  
  21.                     intitem;  
  22.                     if (bag.TryTake(out item)) {  
  23.                         Console.WriteLine($ "{item} was picked by {Task.CurrentId}");  
  24.                     }  
  25.                 }  
  26.             });  
  27.             Task.WaitAll(task1, task2);  
  28.             Console.WriteLine("DONE");  
  29.         }  
  30.     }  
  31. }  
In the above code, we create a ConcurrentBag of int type and add 50 values to it. Then, we create two Tasks that extract values from this bag. When, we run the code, we get the below,
 
Concurrency and the ConcurrentBag in C#
 
Each Task will pick distinct values due to the thread safe nature of the ConcurrentBag. In this example, each task just prints the value it has picked on the console. However, we can use this logic to process an object in a complex way. This is a scenario where the ConcurrentBag works perfectly.

Using the ConcurrentBag still gives an issue

 
Let us create another simple console application using .NET Core 3.1 in Visual Studio Community Edition as below,
  1. using System;  
  2. using System.Collections.Concurrent;  
  3. using System.Threading.Tasks;  
  4. namespace Console App ConcurrentBag {  
  5.     public class Program {  
  6.         static void Main(string[] args) {  
  7.             Record Processsorrp = new RecordProcesssor(25);  
  8.             var a = Task.Factory.StartNew(() => {  
  9.                 for (vari = 0; i < 100000; i++) {  
  10.                     rp.Add(i);  
  11.                 }  
  12.             });  
  13.             var b = Task.Factory.StartNew(() => {  
  14.                 for (vari = 200000; i < 300000; i++) {  
  15.                     rp.Add(i);  
  16.                 }  
  17.             });  
  18.             Task.WaitAll(a, b);  
  19.             Console.WriteLine($ "Code run complete");  
  20.             Console.ReadLine();  
  21.         }  
  22.     }  
  23.     public class Record Processsor {  
  24.         private int _recordsToProcess = 0;  
  25.         private ConcurrentBag < int > numbers = new ConcurrentBag < int > ();  
  26.         public Record Processsor(intrecordsToProcess) {  
  27.             _recordsToProcess = recordsToProcess;  
  28.         }  
  29.         public void Add(int number) {  
  30.             numbers.Add(number);  
  31.             if (numbers.Count == _recordsToProcess) {  
  32.                 Console.WriteLine($ "Number is {numbers.Count}");  
  33.                 numbers.Clear();  
  34.             }  
  35.             elseif(numbers.Count > _recordsToProcess) {  
  36.                 Console.WriteLine($ "Number is {numbers.Count}");  
  37.             }  
  38.         }  
  39.     }  
  40. }  
In the above code, we are adding numbers from two tasks into a ConcurrentBag and when the numbers get to a certain value (25 in this case), we simply print the number on the console, clear the ConcurrentBag and continue adding numbers.
 
This application runs fine but will eventually hit an error as below,
 
Concurrency and the ConcurrentBag in C#
 
If we look at the “numbers.Count” value, it will be 26. But we have specific code to clear the ConcurrentBag when the count is 25, then how did this happen? Well, the problem here is not the ConcurrentBag but the gap between adding an item to the ConcurrentBag and the IF statement on the next line. What happened is that when the count got to 25 on one thread/task, the other thread/taskadded another value before the IF statement on the next line was executed and hence, we see the value of 26.
 
So, what do we do in this scenario? Well, we simply must apply the concurrency ourselves. I have done this in the below code using a lock.
  1. using System;  
  2. using System.Collections.Concurrent;  
  3. using System.Threading.Tasks;  
  4. name space Console App ConcurrentBag {  
  5.     public class Program {  
  6.         static void Main(string[] args) {  
  7.             Record Processsorrp = newRecordProcesssor(25);  
  8.             var a = Task.Factory.StartNew(() => {  
  9.                 for (vari = 0; i < 100000; i++) {  
  10.                     rp.Add(i);  
  11.                 }  
  12.             });  
  13.             var b = Task.Factory.StartNew(() => {  
  14.                 for (vari = 200000; i < 300000; i++) {  
  15.                     rp.Add(i);  
  16.                 }  
  17.             });  
  18.             Task.WaitAll(a, b);  
  19.             Console.WriteLine($ "Code run complete");  
  20.             Console.ReadLine();  
  21.         }  
  22.     }  
  23.     public class RecordProcesssor {  
  24.         private readonlyobjectbalanceLock = newobject();  
  25.         private int _recordsToProcess = 0;  
  26.         private ConcurrentBag < int > numbers = new ConcurrentBag < int > ();  
  27.         public RecordProcesssor(intrecordsToProcess) {  
  28.             _recordsToProcess = recordsToProcess;  
  29.         }  
  30.         public void Add(int number) {  
  31.             lock(balanceLock) {  
  32.                 numbers.Add(number);  
  33.                 if (numbers.Count == _recordsToProcess) {  
  34.                     Console.WriteLine($ "Number is {numbers.Count}");  
  35.                     numbers.Clear();  
  36.                 }  
  37.                 elseif(numbers.Count > _recordsToProcess) {  
  38.                     Console.WriteLine($ "Number is {numbers.Count}");  
  39.                 }  
  40.             }  
  41.         }  
  42.     }  
  43. }  
In the above code, the statements to add a value and then check if it has reached the threshold and then clear it are all in a locked land of code, meaning only one thread/task can execute this code at a time. Hence, if any thread adds the 25th value, the IF statement will be executed before the other thread/task can add any more values.
 
The result is as below every time,
 
Concurrency and the ConcurrentBag in C#
 

Summary

 
In this article, we looked at the ConcurrentBag and how it can be used for scenarios where we need to access the collection from multiple threads/tasks. We looked at a scenario where it is used and works perfectly and looked at another scenario where we face an issue, although not because of the ConcurrentBag itself. We then looked at how this can be fixed. The same rule would apply to other Concurrent collection classes as well. Happy Coding!


Similar Articles