Receiving Event Data From An Azure Eventhub

Introduction

 
This article is a continuation of my Sending Event Data To Azure Event Hub article. Here we are going to be building an application that receives data from an Azure EventHub in real time. The scenario we previously built was that we are sending data from a weather station collected via sensors to an event hub. Now we are at the end where data will be sent to from the EventHub. Here after we receive the data we can do anything with it to simply just storing it in an excel file or building complex graphs and reports using that data. Let’s get started.
 

Road Map

  1. Build a .NET Core Console App and Initialize an EventHubClient
  2. Create a PartitionReceiver to Receive Events
  3. Transformation of data.

Build a .Net Core Console App and initialize an EventHubClient

 
Start off by creating a .net core console application. After that include the following nuget package to your project: Microsoft.Azure.EventHubs. This will be the library we will use to interact with the azure EventHub.
 
Start of by adding two class libraries to your project. One library will be storing our models. In the other, we will be writing all our logic which handles the receiving of the data from the EventHub. We'll also include the files as named in the following image.
 
Receiving Event Data From An Azure Eventhub
 
Add your model class to the WeatherData.cs file.
  1. public class WeatherData  
  2.     {  
  3.         public int Temperature { getset; }  
  4.         public int WindSpeed { getset; }  
  5.         public Direction WindDirection { getset; }  
  6.   
  7.         public enum Direction   
  8.         {  
  9.             North, South, East, West   
  10.         }  
  11.   
  12.         public override string ToString()  
  13.         {  
  14.             return "{ Temp: " + Temperature.ToString() + " *C "   
  15.                  + "| Windspeed: " + WindSpeed.ToString() + " km/h "  
  16.                  + "| Wind Direction: " + WindDirection.ToString() + " }";  
  17.         }  
  18.     }  

Create a PartitionReceiver to Receive Events

 
So in order to receive events we have to write a ParitionReceiver. An event hub can contain more than 1 partition that acts as a queue where event data is stored. The EventHub manages by itself to which partition it wants to send the newly received data. It uses a round robin method of distributing the messages.
 
Thus when receiving data from an event hub we have to subscribe to each partition so that we can receive all the data that is being sent to the EventHub and not just parts of it that is being sent only to the partition that we might have subscribed to.
 
Additionally, an EventHub consists of consumer groups. A Consumer Group is sort of like a gateway into the EventHub’s partitions. The app into which you have to receive the data from the EventHub firstly needs to have a consumer group associated with it through which the app will be contacting the EventHub. For more details on consumer groups try this.
 
Add the following code to your Program.cs file in WeatherData.EventHub.Receivers.Direct project.
  1. class Program  
  2.   {  
  3.     const string EventHubConnectionString = "<your event hub connectionstring>";     
  4.     static void Main(string[] args)  
  5.     {  
  6.       MainAsync().Wait();  
  7.     }  
  8.     private static async Task MainAsync()  
  9.     {  
  10.       Console.WriteLine("Connecting to the Event Hub...");  
  11.       var EventHubClient =  
  12.         EventHubClient.CreateFromConnectionString(EventHubConnectionString);  
  13.       var runtimeInformation = await EventHubClient.GetRuntimeInformationAsync();  
  14.       var partitionReceivers = runtimeInformation.PartitionIds.Select(partitionId =>  
  15.           EventHubClient.CreateReceiver("$Default",  
  16.           partitionId, DateTime.Now)).ToList();  
  17.       Console.WriteLine("Waiting for incoming events...");  
  18.       foreach (var partitionReceiver in partitionReceivers)  
  19.       {  
  20.         partitionReceiver.SetReceiveHandler(  
  21.           new WeatherDataPartitionReceiveHandler(partitionReceiver.PartitionId));  
  22.       }  
  23.       Console.WriteLine("Press any key to shutdown");  
  24.       Console.ReadLine();  
  25.       await EventHubClient.CloseAsync();  
  26.     }  
  27.   }  
Now in this code, the line that contains “$Default” is the line where we are using the default consumer group that our EventHub contains and through that consumer group we are getting all the partitions of our event hub and storing their ids in a list.
 
You can find your existing consumer groups and create new ones in the consumer groups option once you go into the options for you event hub on the Azure portal.
 
Receiving Event Data From An Azure Eventhub
 
Simply add a new consumer group and put the name of that consumer group in place of $Default in your code. This also provides some insight on the parallelism of EventHubs. You can have multiple applications listening in on the same EventHub through different consumer groups and do different tasks with that data all at the same time.
 

Transformation of Data

 
Add the following code into your WeatherDataPartitionReceiveHandler.cs file.
  1. public class WeatherDataPartitionReceiveHandler : IPartitionReceiveHandler  
  2.   {  
  3.     public WeatherDataPartitionReceiveHandler(string partitionId)  
  4.     {  
  5.       PartitionId = partitionId;  
  6.     }  
  7.     public int MaxBatchSize => 10;  
  8.     public string PartitionId { get; }  
  9.     public Task ProcessErrorAsync(Exception error)  
  10.     {  
  11.       Console.WriteLine($"Exception: {error.Message}");  
  12.       return Task.CompletedTask;  
  13.     }  
  14.     public async Task ProcessEventsAsync(IEnumerable<EventData> eventDatas)  
  15.     {  
  16.       if (eventDatas != null)  
  17.       {  
  18.         foreach (var eventData in eventDatas)  
  19.         {  
  20.           var dataAsJson = Encoding.UTF8.GetString(eventData.Body.Array);  
  21.           var weatherData =  
  22.             JsonConvert.DeserializeObject<WeatherData>(dataAsJson);  
  23.           Console.WriteLine(weatherData);                
  24.         }  
  25.       }  
  26.     }  
  27.   }  
This logic is responsible for actually processing the received data, transforming it and then presenting it on the console. Thus, through this process we have accomplished reading data from an EventHub.
 
Now since this app will be asynchronously listening on the EventHub for any new messages in the partitions you can leave it running and send messages to your event hub.
 
This application will receive messages from the EventHub as soon as the EventHub receives the messages.
 

Summary

 
In this article we looked at what are partitions, consumer groups and how an app we created can use these two things to receive data from an Azure event hub.


Similar Articles