In Focus

Azure Event Hubs: An Overview

This blog is about Event Hub, a Big Data streaming platform to send and receive event data.

Introduction

As opposed to the Azure Service Bus (my article on service bus queue) where messages wait in a queue to be consumed, event hubs broadcast the data to subscribers. Event Hubs can ingest millions of records per second which makes it favorable for telemetry (from IoT devices) and logging purposes.

Event Hubs is best for one-way traffic from publisher to subscriber; i.e., no acknowledgment from the subscriber to publisher.

Azure Event Hubs work on the concept of “Partition Consumer Pattern”. This pattern decouples the message delivery by implementing partitions. Partitions are physical storage that keeps the event data from the producer temporarily for a finite time (a couple of hours to a few days). Increasing the number of partitions increases the physical storage, hence no data overflow. Producer chooses the partition to send event data.

The consumer, on the other hand, reads data from one or more partitions. It keeps track of the offset, i.e., until what position the event data has been read in a partition. In case the consumer stops, it starts from where it left. The consumer traverses through the partition in the round-robin manner, i.e., in this instance, the consumer will go and read from Partition 1 and then Partition 2

We can increase the number of consumers, however, at any given point in time, the total number of consumers cannot be greater than the number of partitionsAzure Event Hub
Background

I will use the same “Pizza Order Delivery” system for demonstration. In the context of Event Hubs, I will modernize the delivery system by providing regular updates to the customer.

For example, when you order a pizza and opt for home delivery, you will be provided with a complete update on each event, i.e., how much the pizza is cooked, when it is ready for delivery and tracking of the order on the way to home delivery. To demonstrate how this works, I will develop a console application “producer” to produce events and another app “consumer” to consume events.

Implementation

Let’s start with creating a namespace for Event Hubs in Azure. I named it “OrderProcessingEventHub” and pricing tier to be basic.

Azure Event Hub 

Click "Create" and it will do the necessary background work to create a namespace. 

Then, select “Event Hubs” from Entities and create one for the project.

Azure Event Hub 

I have created it with the name “ordereventhub”. Note down the event hub name and “Connection string – primary key”. Notice the partition count default 2. You can make to this count before you proceed as later, it cannot be changed. So consider setting it to maximum 32.

Azure Event Hub 
Azure Event Hub 

Event hub setup in Azure is complete. Now, we need one producer/publisher to publish events.

For that, let’s create an application to generate the event data.

I will create a console application as it is simple to create to demonstrate the concept. Add a NuGet reference “WindowsAzure.ServiceBus”.

Azure Event Hub 

Update the “Program.cs” class to be like following. Replace the “Connection string – Primary Key” with the one you copied before.

This program, once run, will send the random Event Data to Event Hub in the interval of 1 second.

  1. class Program  
  2.     {  
  3.         static void Main(string[] args)  
  4.         {  
  5.             GenerateRandomMessages();  
  6.         }  
  7.   
  8.         public static void GenerateRandomMessages()  
  9.         {  
  10.             var primaryConnectionString = <connection string - primary key>;  
  11.             var client = EventHubClient.CreateFromConnectionString(primaryConnectionString);  
  12.             Random randomNumberGenerator = new Random();  
  13.   
  14.   
  15.             while (true)  
  16.             {  
  17.                 try  
  18.                 {  
  19.                     // generate any random numbet between 1 and 1000  
  20.                     var randomMessage = string.Format("Message {0}", randomNumberGenerator.Next(1,1000));  
  21.   
  22.                     Console.WriteLine("Generated message: {0}", randomMessage);  
  23.   
  24.                     client.Send(new EventData(Encoding.UTF8.GetBytes(randomMessage)));  
  25.                 }  
  26.                 catch (Exception exception)  
  27.                 {  
  28.                     Console.WriteLine(exception.Message);  
  29.                 }  
  30.   
  31.                 Thread.Sleep(1000);  
  32.             }  
  33.         }  
  34.     }  

That’s it for a producer/publisher. Now, we will create a consumer/subscriber.

As I stated before, the event gets stored in a partition where the subscriber consumes it. So, it is obvious that it should be stored in some storage for it to work.

So, I will go ahead and search for “Storage” in Azure.

 Azure Event Hub

Then, select the Storage account.

Azure Event Hub 

I will create a storage with the name “eventprochoststorage”. Note down the storage name.

Azure Event Hub 

Once the storage gets created, copy the “Access Key”. In this case, I have copied “key 1”.

Azure Event Hub 

Next, go to Services in Storage and select “Blobs”. Create one container. In this case, I named “eventprochostcontainer”. Note down the container name as well.

Azure Event Hub 

Create and Assign access policy for the container.

Now let’s create a consumer application. I will go ahead and create another console application which will act like a consumer.

Add the reference to Microsoft.Azure.Eventhubs and Microsoft.Azure.Eventhubs.Processor.

Azure Event Hub 
Azure Event Hub 

Once done, add a class “Processor” implementing “IEventProcessor”. Modify the class to be like the following.

  1. public class Processor : IEventProcessor  
  2.     {  
  3.         public Task CloseAsync(PartitionContext context, CloseReason reason)  
  4.         {  
  5.             Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");  
  6.             return Task.CompletedTask;  
  7.         }  
  8.   
  9.         public Task OpenAsync(PartitionContext context)  
  10.         {  
  11.             Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");  
  12.             return Task.CompletedTask;  
  13.         }  
  14.   
  15.         public Task ProcessErrorAsync(PartitionContext context, Exception error)  
  16.         {  
  17.             Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");  
  18.             return Task.CompletedTask;  
  19.         }  
  20.   
  21.         public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)  
  22.         {  
  23.             foreach (var eventData in messages)  
  24.             {  
  25.                 var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);  
  26.                 Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");  
  27.             }  
  28.   
  29.             return context.CheckpointAsync();  
  30.         }  
  31.     }  

This class provides a mechanism to read data from the partition. Now, let’s access the event data through “Processor” class by changing the “Program” class.

  1. class Program    
  2.     {    
  3.         private const string EventHubConnectionString = <connection string - primary key>;    
  4.         private const string EventHubName = "ordereventhub";    
  5.     
  6.         private const string StorageContainerName = "eventprochostcontainer";    
  7.         private const string StorageAccountName = "eventprochoststorage";    
  8.         private const string StorageAccountKey = <key 1>;  
  9.     
  10.         private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);    
  11.     
  12.         private static async Task MainAsync(string[] args)    
  13.         {    
  14.             Console.WriteLine("Registering EventProcessor...");    
  15.     
  16.             var eventProcessorHost = new EventProcessorHost(    
  17.                 EventHubName,    
  18.                 PartitionReceiver.DefaultConsumerGroupName,    
  19.                 EventHubConnectionString,    
  20.                 StorageConnectionString,    
  21.                 StorageContainerName);    
  22.     
  23.             // Registers the Event Processor Host and starts receiving messages    
  24.             await eventProcessorHost.RegisterEventProcessorAsync<Processor>();    
  25.     
  26.             Console.WriteLine("Receiving. Press ENTER to stop worker.");    
  27.             Console.ReadLine();    
  28.     
  29.             // Disposes of the Event Processor Host    
  30.             await eventProcessorHost.UnregisterEventProcessorAsync();    
  31.         }    
  32.     
  33.         static void Main(string[] args)    
  34.         {    
  35.             MainAsync(args).GetAwaiter().GetResult();    
  36.         }    
  37.     }    

Compile the program.

Once both producer and consumer are set up, run both the applications simultaneously.

If everything is alright, you will see the producer producing messages and at the same time, consumer receiving messages.
Happy programming!!!