Building Real Time Applications Using .Net Core And Kafka

In the previous article Kafka features and use cases, we discussed different features and the use cases of Kafka.
 
In this article, we will be building one real-time application using Kafka and .Net Core and we will be covering the below topics,
  • Understanding business scenario
  • Setting up Kafka broker and Zookeeper locally
  • Working with Kafka using Command Line
  • Centralizing Kafka Producer and Consumer Code 
Before we get started let’s first understand the business scenario,
 

Business Scenario

 
Here we are going to develop 2 applications, one will act as a producer and the other one will act as a consumer.
 
We will be registering a user on suppose X website and for that, we will be producing a message in Kafka topic containing the data required to register a User and on the other hand there will be a consumer service which will consume this data and will do the real work of User registration. Now we can also think and map it to some complex operation where there will be multiple stages and each stage will be catered by a different service in the entire request chain. You can take the example of creating orders on amazon.com.
 
I have taken this simple flow of User registration just to simplify the process and help you understand the core concept. Thus, not designing the workflow with a lot of producers and consumers as it may be difficult to understand all of that in the beginning.
 
So, the pictorial representation will look like below,
 
 
Here we can see that we will be talking about the happy path scenario for now and our consumer service post user registration will produce another message in topic UserRegistered, which can later be consumed by some other service to show message or send some notification about successful registration to the user who did registration.
 
As we have understood by now the workflow which we are going to create so let’s quickly set up Kafka locally so that we can actually get to see this in action.
 

Setting up Kafka broker and ZooKeeper locally

 
To set up Kafka locally, we have written all the configurations in the docker-compose file and you just need to go to the location where the file resides and run the below command.
  1. docker-compose up  
We can see that both Broker and Zookeeper are running successfully.
 
 

Working with Kafka using Command Line

 
As our set up is done so now quickly see some command which will be really helpful to work with Kafka.
 
First of all, see all running containers using the below command
  1. docker ps  
 
 
Then, go inside broker container using the below command,
  1. docker exec -it b8e163422dfc  bash  
Once we are in, let’s see if we have any topics created. So to LIST ALL KAKFA TOPICS use the below command.
  1. kafka-topics --list --bootstrap-server localhost:9092  
 
We can see as of now we just have some default data.
 
So let’s CREATE NEW KAKFA TOPIC using command line with the below command
  1. kafka-console-producer --bootstrap-server  localhost:9092 --topic TestTopic  
 
Now we can check if our topic “TestTopic” has some saved message. To READ VALUE FROM KAFKA TOPIC, use the below command. 
  1. kafka-console-consumer --bootstrap-server localhost:9092 --topic TestTopic --from-beginning  
 

Centralizing Kafka Producer and Consumer Code

 
We created one separate project to have all the code related to Kafka producer and consumer so that in every service we don’t have to write that again and again. As of now, we have placed this Kafka project in both our producer and consumer application, but we can make a NuGet package out of this and then install that Nuget package in all our services wherever we are going to produce or consume messages from Kafka.
 
We have written generic producers and consumers so that we just need to pass the key and value based on which we can either add the messages to the topic or consume the message from the topic.
 
Producer Code
  1. using System;  
  2. using System.Threading.Tasks;  
  3. using Confluent.Kafka;  
  4. using Kafka.Interfaces;  
  5.   
  6. namespace Kafka.Producer  
  7. {  
  8.     /// <summary>  
  9.     /// Base class for implementing Kafka Producer.  
  10.     /// </summary>  
  11.     /// <typeparam name="TKey">Indicates message's key in Kafka topic</typeparam>  
  12.     /// <typeparam name="TValue">Indicates message's value in Kafka topic</typeparam>  
  13.     public class KafkaProducer<TKey, TValue> : IDisposable, IKafkaProducer<TKey,TValue> where TValue : class  
  14.     {  
  15.         private readonly IProducer<TKey, TValue> _producer;  
  16.   
  17.         /// <summary>  
  18.         /// Initializes the producer  
  19.         /// </summary>  
  20.         /// <param name="config"></param>  
  21.         public KafkaProducer(ProducerConfig config)  
  22.         {  
  23.             _producer = new ProducerBuilder<TKey, TValue>(config).SetValueSerializer(new KafkaSerializer<TValue>()).Build();  
  24.         }  
  25.   
  26.         /// <summary>  
  27.         /// Triggered when the service creates Kafka topic.  
  28.         /// </summary>  
  29.         /// <param name="topic">Indicates topic name</param>  
  30.         /// <param name="key">Indicates message's key in Kafka topic</param>  
  31.         /// <param name="value">Indicates message's value in Kafka topic</param>  
  32.         /// <returns></returns>  
  33.         public async Task ProduceAsync(string topic,TKey key, TValue value)  
  34.         {  
  35.             await _producer.ProduceAsync(topic, new Message<TKey, TValue> { Key = key, Value = value });  
  36.         }  
  37.   
  38.         /// <summary>  
  39.         ///   
  40.         /// </summary>  
  41.         public void Dispose()  
  42.         {  
  43.             _producer.Flush();  
  44.             _producer.Dispose();  
  45.         }  
  46.     }  
  47. }  
Consumer Code
  1. using System;  
  2. using System.Threading;  
  3. using System.Threading.Tasks;  
  4. using Confluent.Kafka;  
  5. using Kafka.Interfaces;  
  6. using Microsoft.Extensions.DependencyInjection;  
  7.   
  8. namespace Kafka.Consumer  
  9. {  
  10.     /// <summary>  
  11.     /// Base class for implementing Kafka Consumer.  
  12.     /// </summary>  
  13.     /// <typeparam name="TKey"></typeparam>  
  14.     /// <typeparam name="TValue"></typeparam>  
  15.     public class KafkaConsumer<TKey, TValue> : IKafkaConsumer<TKey, TValue> where TValue : class  
  16.     {  
  17.         private readonly ConsumerConfig _config;  
  18.         private IKafkaHandler<TKey, TValue> _handler;  
  19.         private IConsumer<TKey, TValue> _consumer;  
  20.         private string _topic;  
  21.   
  22.         private readonly IServiceScopeFactory _serviceScopeFactory;  
  23.   
  24.         /// <summary>  
  25.         /// Indicates constructor to initialize the serviceScopeFactory and ConsumerConfig  
  26.         /// </summary>  
  27.         /// <param name="config">Indicates the consumer configuration</param>  
  28.         /// <param name="serviceScopeFactory">Indicates the instance for serviceScopeFactory</param>  
  29.         public KafkaConsumer(ConsumerConfig config, IServiceScopeFactory serviceScopeFactory)  
  30.         {  
  31.             _serviceScopeFactory = serviceScopeFactory;  
  32.             _config = config;  
  33.         }  
  34.   
  35.         /// <summary>  
  36.         /// Triggered when the service is ready to consume the Kafka topic.  
  37.         /// </summary>  
  38.         /// <param name="topic">Indicates Kafka Topic</param>  
  39.         /// <param name="stoppingToken">Indicates stopping token</param>  
  40.         /// <returns></returns>  
  41.         public async Task Consume(string topic, CancellationToken stoppingToken)  
  42.         {  
  43.             using var scope = _serviceScopeFactory.CreateScope();  
  44.   
  45.             _handler = scope.ServiceProvider.GetRequiredService<IKafkaHandler<TKey, TValue>>();  
  46.             _consumer = new ConsumerBuilder<TKey, TValue>(_config).SetValueDeserializer(new KafkaDeserializer<TValue>()).Build();  
  47.             _topic = topic;  
  48.   
  49.             await Task.Run(() => StartConsumerLoop(stoppingToken), stoppingToken);  
  50.         }  
  51.   
  52.         /// <summary>  
  53.         /// This will close the consumer, commit offsets and leave the group cleanly.  
  54.         /// </summary>  
  55.         public void Close()  
  56.         {  
  57.             _consumer.Close();  
  58.         }  
  59.   
  60.         /// <summary>  
  61.         /// Releases all resources used by the current instance of the consumer  
  62.         /// </summary>  
  63.         public void Dispose()  
  64.         {  
  65.             _consumer.Dispose();  
  66.         }  
  67.   
  68.         private async Task StartConsumerLoop(CancellationToken cancellationToken)  
  69.         {  
  70.             _consumer.Subscribe(_topic);  
  71.   
  72.             while (!cancellationToken.IsCancellationRequested)  
  73.             {  
  74.                 try  
  75.                 {  
  76.                     var result = _consumer.Consume(cancellationToken);  
  77.   
  78.                     if (result != null)  
  79.                     {  
  80.                         await _handler.HandleAsync(result.Message.Key, result.Message.Value);  
  81.                     }  
  82.                 }  
  83.                 catch (OperationCanceledException)  
  84.                 {  
  85.                     break;  
  86.                 }  
  87.                 catch (ConsumeException e)  
  88.                 {  
  89.                     // Consumer errors should generally be ignored (or logged) unless fatal.  
  90.                     Console.WriteLine($"Consume error: {e.Error.Reason}");  
  91.   
  92.                     if (e.Error.IsFatal)  
  93.                     {  
  94.                         break;  
  95.                     }  
  96.                 }  
  97.                 catch (Exception e)  
  98.                 {  
  99.                     Console.WriteLine($"Unexpected error: {e}");  
  100.                     break;  
  101.                 }  
  102.             }  
  103.         }  
  104.     }  
  105. }  
This was just a glimpse of our own generic producers and consumers. I will be sharing the entire code in the next articles.
 

SUMMARY

 
In this article we have discussed the business scenario and also saw how we can actually set up Kafka locally. We also tried our hands on some of the commands using which we can interact with Kafka locally and saw our generically written producer and consumer. So in the next article we will be writing our producer and consumer app code and will see the event/message streaming happening.
 
I hope you find this article helpful. Stay tuned for more … Cheers!!


Similar Articles