Azure Event Hub Implementation Using .Net Core Console App

Azure event hub is a data streaming platform and ingesting event service. Event hub can receive streaming data from one or multiple source (event Producers) and it can be saved and processed by one or multiple consumers. Event data can be captured/saved by the Azure Blob storage or Azure Data Lake Storage for long time processing purposes. It can be integrated with Azure Stream Analytics and Azure Functions from the event hub. 
 
Some of the important terms which you need to understand before starting Azure Event Hub implementation include: 
 

Event producer

 
Producers are responsible to generate the event data and send to the event hub. One or multiple producers can send data to the event hub.
 
Event receivers/ consumer
 
Consumers are responsible to listen to the event data from the provided event hub. One or multiple consumers can read one partition or all of the event data from the event hub.
 
Consumer group
 
Event consumers can be grouped based on business logic. Consumer clients need the information of the group in order to read event data from the event hub.
 

Partition

 
As data are sent as  streaming data to the event hub, event hub uses partitions so that consumers can read one partition of the event data based on partition id or can  read all event data. Partition is directly related to the number of consumers who are going to listen the event data.
 
Partition Key
 
Used to generate the hash key of the event data.
 
More details can be found in Azure documentation at the  Microsoft official site.
 
Where to use:
  • Application monitoring.
  • Abnormal detection
  • Streaming data captured and analysis.
Some real-world applications which can be used with Azure Event Hub
 

Reading sensor data

 
There is an Automatic Traffic System has been installed in the city.  The application records the traffic data in real time with its camaras and sensors. In a normal situation an automatic traffic system works as per schedule but if there is some abnormal situation like a traffic jam or natural disasters it should be able to handle this by some automatic mechanism to redirect road traffic to other routes. Also, the system should be able to alert the traffic police to handle the situation. As sensors generate huge data from different sources from all over the city (multiple event sources) at the same time (may be 10 million or more per sec) a normal system will be not able to able to capture all the events.
 
Here Azure Event Hub can ingest all streaming data coming from all the traffic signal centers (city squares) analyze it based on which decision should be taken by Automatic Traffic System. So, in this case, the real time data will be captured by sensors from the automatic traffic system and those data are send to the Azure event hub for data storage and analysis based on which system can make a decision.
 

Ecommerce application to captures user data for boosting ecommerce business

 
Ecommerce applications capture user data like user id, mobile no, device identification, location, items which user have searched & user preference, based on which product promotion can be set on user notification area and also can be displayed to his social sites advertisement area. User can access eCommerce applications from different device like mobile, desktop, laptop and tab etc. It must be able to capture all user data in real time. Again, Azure Event Hub can capture all event data and can save and pass the data to the other systems for analysis based on which product promotion can be sent to individual user's devices.
 
Nowadays, applications are  not only supposed to be able to execute business logic properly (like purchase requests) but they also must be able to lead the business to the next level and provide information based on which applications can predict & suggest items which a user is going to buy in the future from ecommerce platforms.
 
Here is what we are going to learn:
 
Azure event hub implementation using .net core console application.
 
We are going to take ecommerce applications in this demonstration.
 
Azure Event Hub Implementation Using .Net Core Console App
 

How to create Event Hub in Azure

 
In order to create event, first event hub namespace is needed.
  • Login to the Azure portal and navigate to the Azure Event Hub.
  • Now click on add button to create namespace.

    Azure Event Hub Implementation Using .Net Core Console App

  • The rest of the configuration can be done with default setting. Now review and create namespace.

    Azure Event Hub Implementation Using .Net Core Console App
Once Event Hub namespace is created, now we can proceed with event hub creation. Navigate to the newly create Event Hub namespace and click on namespace.
 
Click on Event Hub and provide the mandatory fields. As we are using basic tier option in event hub namespace, some options will be in disabled mode.
 
Azure Event Hub Implementation Using .Net Core Console App
 
Capture option cannot be configured as it is not available in basic tier plan. Partition count is an important property and it is directly related to the event consumers. In this demo we are implementing two consumers to listen the event hub.
 
After creating the namespace and the event hub we need to create SAS(Shared access policies) to access the event hub instance from our console application.
 
There are 3 claims in shared access policy. One policy can have all three options or may be a combination of any two. But per the application needs, I suggest you create policy based on responsibilities.
 
SAS claims
  • Manage
  • Send
  • Listen
For this demo, two policies were created, one for producers with send claim and one for consumers with listen claim. Please note that policies can be defined in Event Hub Instance level and Event namespace level as well. In this demo, policies are defined in event namespace level.
 
Azure Event Hub Implementation Using .Net Core Console App
 
In this example, I will be covering the event producer and event consumers only. I have used .net core 3.1 with Visual Studio 2019. It is always a good practice to implement code with the  latest version.
 
We are going to implement an ecommerce application which collects user data from user devices and sends it to an event hub. User data contains user info, device info and the items which the user wants to buy in the near future, and he is searching for that item in ecommerce applications. We have created two consumers who are going to listen to the event hub and consume the event data. Event data can be further passed to any analysis application to generate the business lead.
 
There are 3 parts in the console application.
 
Entry point or main method
 
Code from Program.cs,
  1. using System;  
  2. using System.Threading;  
  3. using System.Threading.Tasks;  
  4. namespace AzureEventHubMutliProducerConsumer {  
  5.     class Program {  
  6.         static void Main(string[] args) {  
  7.             Program program = new Program();  
  8.             Task[] tasks = new Task[3];  
  9.             tasks[0] = Task.Run(() => {  
  10.                 Thread.Sleep(1000);  
  11.                 //Run the producer  
  12.                 program.RunProducer();  
  13.             });  
  14.             tasks[1] = Task.Run(() => {  
  15.                 Thread.Sleep(1000);  
  16.                 //Run the event consumer  
  17.                 program.RunEventHubConsumerReadEvent();  
  18.             });  
  19.             tasks[2] = Task.Run(() => {  
  20.                 Thread.Sleep(1000);  
  21.                 //Run the event consumer  
  22.                 program.RunEventHubConsumerReadEventPartitionEvent();  
  23.             });  
  24.             Task.WaitAll(tasks);  
  25.             Console.WriteLine("Press any any to end program");  
  26.             Console.ReadKey();  
  27.         }  
  28.         public void RunProducer() {  
  29.             //Run the producer  
  30.             EventProducer eventProducer = new EventProducer();  
  31.             eventProducer.Init();  
  32.             eventProducer.CreatePurchaseRequest().Wait();  
  33.         }  
  34.         public void RunEventHubConsumerReadEvent() {  
  35.             //Run the EventHubConsumerClientDemo  
  36.             EventHubConsumerClientDemo eventHubConsumer = new EventHubConsumerClientDemo();  
  37.             eventHubConsumer.ConsumerReadEvent("$Default").Wait();  
  38.         }  
  39.         public void RunEventHubConsumerReadEventPartitionEvent() {  
  40.             //Run the EventHubConsumerClientDemo  
  41.             EventHubConsumerClientDemo eventHubConsumer = new EventHubConsumerClientDemo();  
  42.             eventHubConsumer.ConsumerReadEventPartitionEvent("$Default""1").Wait();  
  43.         }  
  44.     }  
  45. }  
3 threads will be created by the console program in order to run event producer and two consumers can run in parallel.
 
Event Producer
 
Code for EventProducer.cs

  1. using System;  
  2. using System.Collections.Generic;  
  3. using System.Text;  
  4. using System.Threading.Tasks;  
  5. using Azure.Messaging.EventHubs;  
  6. using Azure.Messaging.EventHubs.Producer;  
  7. namespace AzureEventHubMutliProducerConsumer {  
  8.     public class EventProducer {  
  9.         string connectionString = "—- Get key from azure queue ->Shared access key—";  
  10.         string eventHubName = "—-Event hub name--";  
  11.         EventDataBatch generateData;  
  12.         List < string > device = new List < string > ();  
  13.         EventHubProducerClient producerClient;  
  14.         public void Init() {  
  15.             producerClient = new EventHubProducerClient(connectionString, eventHubName);  
  16.             device.Add("Mobile");  
  17.             device.Add("Laptop");  
  18.             device.Add("Desktop");  
  19.             device.Add("Tablet");  
  20.         }  
  21.         public async Task GenerateEvent() {  
  22.             try {  
  23.                 // send in batch  
  24.                 int partitionId = 0;  
  25.                 foreach(var eachDevice in device) {  
  26.                     StringBuilder strBuilder = new StringBuilder();  
  27.                     var batchOptions = new CreateBatchOptions() {  
  28.                         PartitionId = partitionId.ToString()  
  29.                     };  
  30.                     generateData = producerClient.CreateBatchAsync(batchOptions).Result;  
  31.                     strBuilder.AppendFormat("Search triggered for iPhone 21 from decive {0} ", eachDevice);  
  32.                     var eveData = new EventData(Encoding.UTF8.GetBytes(strBuilder.ToString()));  
  33.                     // All value should be dynamic  
  34.                     eveData.Properties.Add("UserId""UserId");  
  35.                     eveData.Properties.Add("Location""North India");  
  36.                     eveData.Properties.Add("DeviceType", eachDevice);  
  37.                     generateData.TryAdd(eveData);  
  38.                     producerClient.SendAsync(generateData).Wait();  
  39.                     //Reset partitionId as it can be 0 or 1 as we have define in azure event hub  
  40.                     partitionId++;  
  41.                     if (partitionId > 1) partitionId = 0;  
  42.                 }  
  43.                 await Task.CompletedTask;  
  44.             } catch (Exception exp) {  
  45.                 Console.WriteLine("Error occruied {0}. Try again later", exp.Message);  
  46.             }  
  47.         }  
  48.     }  
  49. }  
Init() method will initialize EventHubProducer client with event connection key and event hub name. It will also create and add 4 devices to the device list. In the real world, there might be the millions of the devices and millions of users using ecommerce at the same time. We are trying to replicate the same scenario by using the device list to generate events.
 
GenerateEvent() method will generate event data for each device with some user data. We are using the partition id 0 or 1 which will be used by the consumer to read event data based on partition id. Event data can be sent in a single event data or a batch of event data, here we are sending batch with partition id.
 
Event Consumer
 
Code for EventConsumer.cs

  1. using Azure.Messaging.EventHubs.Consumer;  
  2. using System;  
  3. using System.Text;  
  4. using System.Threading;  
  5. using System.Threading.Tasks;  
  6. namespace AzureEventHubMutliProducerConsumer {  
  7.     public class EventHubConsumerClientDemo {  
  8.         string connectionString = "—- Get key from azure queue ->Shared access key—";  
  9.         string eventHubName = "—-Event hub name--";  
  10.         //Read all events. No blol container needed directly consumer can use the method to procees event  
  11.         public async Task ConsumerReadEvent(string consumerGroup) {  
  12.             try {  
  13.                 CancellationTokenSource cancellationSource = new CancellationTokenSource();  
  14.                 cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));  
  15.                 EventHubConsumerClient eventConsumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);  
  16.                 await foreach(PartitionEvent partitionEvent in eventConsumer.ReadEventsAsync(cancellationSource.Token)) {  
  17.                     Console.WriteLine("---Execution from ConsumerReadEvent method---");  
  18.                     Console.WriteLine("------");  
  19.                     Console.WriteLine("Event Data recieved {0} ", Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray()));  
  20.                     if (partitionEvent.Data != null) {  
  21.                         Console.WriteLine("Event Data {0} ", Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray()));  
  22.                         if (partitionEvent.Data.Properties != null) {  
  23.                             foreach(var keyValue in partitionEvent.Data.Properties) {  
  24.                                 Console.WriteLine("Event data key = {0}, Event data value = {1}", keyValue.Key, keyValue.Value);  
  25.                             }  
  26.                         }  
  27.                     }  
  28.                 }  
  29.                 Console.WriteLine("ConsumerReadEvent end");  
  30.                 await Task.CompletedTask;  
  31.             } catch (Exception exp) {  
  32.                 Console.WriteLine("Error occruied {0}. Try again later", exp.Message);  
  33.             }  
  34.         }  
  35.         //Read all events based on partitionId  
  36.         public async Task ConsumerReadEventPartitionEvent(string consumerGroup, string partitionId) {  
  37.             try {  
  38.                 CancellationTokenSource cancellationSource = new CancellationTokenSource();  
  39.                 cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));  
  40.                 EventHubConsumerClient eventConsumer = new EventHubConsumerClient(consumerGroup, connectionString, eventHubName);  
  41.                 ReadEventOptions readEventOptions = new ReadEventOptions() {  
  42.                     MaximumWaitTime = TimeSpan.FromSeconds(30)  
  43.                 };  
  44.                 await foreach(PartitionEvent partitionEvent in eventConsumer.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest, readEventOptions, cancellationSource.Token)) {  
  45.                     Console.WriteLine("---Execution from ConsumerReadEventPartitionEvent method---");  
  46.                     Console.WriteLine("------");  
  47.                     if (partitionEvent.Data != null) {  
  48.                         Console.WriteLine("Event Data recieved {0} ", Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray()));  
  49.                         if (partitionEvent.Data.Properties != null) {  
  50.                             foreach(var keyValue in partitionEvent.Data.Properties) {  
  51.                                 Console.WriteLine("Event data key = {0}, Event data value = {1}", keyValue.Key, keyValue.Value);  
  52.                             }  
  53.                         }  
  54.                     }  
  55.                 }  
  56.                 await Task.CompletedTask;  
  57.             } catch (Exception exp) {  
  58.                 Console.WriteLine("Error occruied {0}. Try again later", exp.Message);  
  59.             }  
  60.         }  
  61.     }  
  62. }  

Event consumer with ReadEventsAsync method

 
ConsumerReadEvent() method takes consumer group name as  input params and connects to the event hub with connection key and event hub name. There is a CancellationTokenSource used so that after waiting from provided time period for event data, it should be terminated automatically. Also, ReadEventOptions provides time period. In order to read event from even hub, EventHub- ConsumerClient has been used. The method ReadEventsAsync method will read all the event data from event hub.
  1. await foreach (PartitionEvent partitionEvent in eventConsumer.ReadEventsAsync(cancellationSource.Token))  
The rest of the method is used to process the event data and its properties and display in console. After consuming all event data from the event hub, consumer will wait for 30 seconds as provided in cancellation token and after that the application will throw an exceptions and the application will be terminated.
 

Event consumer with ReadEventsFromPartitionAsync method

 
It is similar to the ConsumerReadEvent method, but the main difference is the input params and method which is going to read event data by partition id. While defining the partition in the Azure Event Hub namespace in the Azure portal we have set only 2 partitions, so only 0 or 1 can be the partition id. Based on the partition id provided in the method, data will be retrieved from event hub. The rest of the partition will be not consumed by the consumer.
  1. await foreach (PartitionEvent partitionEvent in eventConsumer.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest, readEventOptions, cancellationSource.Token))  
EventPosition can be set to latest, early and some other option as well.
 
In summary, we are running to two consumer clients in parallel. One will read all event data and the other consumer will only read event data based on partition id.
 

How to run applications with this code

 
Option -1
 
Prerequisite- Visual Studio 2019 and .net core 3.1. Update your “Shared access key” in producer and consumer class
  • Open visual studio and click on new -> project.
  • Select .net core from left end and select the Console App(.net core) template.
  • Mention the path where solution will be created and click one ok button.

    Azure Event Hub Implementation Using .Net Core Console App 

    New console app project will be created with default Program.cs file.

    Now from Nuget package manager, search Azure.Messaging.EventHubs.Processor and install for the selected project.

    Azure Event Hub Implementation Using .Net Core Console App
  • Add new .cs file to the project and name it as EventProducer.cs
  • Copy the content from section “Code for EventProducer.cs” and paste in as EventProducer.cs file
  • Add new .cs file to the project and name it as EventHubConsumerClientDemo.cs
  • Copy the content from section “Code for EventHubConsumerClientDemo.cs” and paste in as EventHubConsumerClientDemo.cs file
  • Copy the content of main entry point (only c# code) and paste it in the Program.cs file.
Build the solution and run the application.
 
Option -2
 
Download the zip file from here and unzip in your local system. Open the solution file, build the project, and run the application.
 
Thanks, and happy coding 😊😊😊😊