Building Real Time Applications Using .Net Core And Kafka (Continuation)

In the previous article Building Real-Time Applications Using .Net Core And Kafka, we discussed the business scenario and saw how we can set up Kafka locally and discussed our code for centralizing Kafka producer and consumer.
 
In this article, we will be discussing further topics mentioned below:
  • Understanding Producer Application
  • Understanding Consumer Application
  • Running Kafka Locally
  • Producing and Consuming the message 
So, let’s get started.
 

Understanding Producer Application

 
To describe the producer application, we have created a .Net Core Web API project and kept it simple thus added a User Controller which exposes an endpoint that can be used to produce a message in RegisterUser topic inside Kafka.
 
Also, as mentioned in the previous article, we have created a .Net Core Class Library project where we have centralized the code related to Kafka and this project can be independently packaged as a NuGet and can be installed in all services interacting with Kafka brokers.
 
Now, let’s see our UsersController Code, Here we can see that on getting the request to register the User, we are passing all this Info and creating a message in RegisterUser topic which can be consumed by some other service to do the actual work.
  1. using System.Threading.Tasks;  
  2. using Kafka.Constants;  
  3. using Kafka.Interfaces;  
  4. using Kafka.Messages.UserRegistration;  
  5. using Microsoft.AspNetCore.Http;  
  6. using Microsoft.AspNetCore.Mvc;  
  7. using Swashbuckle.AspNetCore.Annotations;  
  8.   
  9. namespace Kafka_ProducerApplication.Controllers  
  10. {  
  11.     [ApiController]  
  12.     [Route("api/[controller]")]  
  13.   
  14.     public class UsersController : ControllerBase  
  15.     {  
  16.         private readonly IKafkaProducer<string, RegisterUser> _kafkaProducer;  
  17.         public UsersController(IKafkaProducer<string, RegisterUser> kafkaProducer)  
  18.         {  
  19.             _kafkaProducer = kafkaProducer;  
  20.         }  
  21.            
  22.         [HttpPost]  
  23.         [Route("Register")]  
  24.         [ProducesResponseType(StatusCodes.Status200OK)]  
  25.         [SwaggerOperation("Register User""This endpoint can be used to register a User ,but for demo produces dummy message in Kafka Topic")]  
  26.         public async Task<IActionResult> ProduceMessage(RegisterUser request)  
  27.         {  
  28.             await _kafkaProducer.ProduceAsync(KafkaTopics.RegisterUser, null, request);  
  29.   
  30.             return Ok("User Registration In Progress");  
  31.         }  
  32.     }  
  33. }   
Similarly in the real world, we can have different services which will be doing their own local transactions and once they are done with it, they can produce message(s) in Kafka topics which can eventually be consumed by some other services in order to complete the distributed transaction.
 

Understanding Consumer Application

 
As our consumer app will be consuming the message and we need this in real-time, so we have created a .Net Core Class Library project, where we have written a background service that will be pulling any new messages coming to the Kafka topic.
 
Here we have created a Consumer class that is bound to a Kafka topic and a Handler where we will be writing the actual code to handle the message that has been consumed by the consumer.
 
Similar to the Producer app, we have also created one .Net Core Class Library project for Kafka.
 
This is the background service code,
  1. using System;  
  2. using System.Net;  
  3. using System.Threading;  
  4. using System.Threading.Tasks;  
  5. using Kafka.Constants;  
  6. using Kafka.Interfaces;  
  7. using Kafka.Messages.UserRegistration;  
  8. using Microsoft.Extensions.Hosting;  
  9.   
  10. namespace Kafka_ConsumerApplication.Core.kafkaEvents.UserRegistration.Consumers  
  11. {  
  12.     public class RegisterUserConsumer : BackgroundService  
  13.     {  
  14.         private readonly IKafkaConsumer<string, RegisterUser> _consumer;  
  15.         public RegisterUserConsumer(IKafkaConsumer<string, RegisterUser> kafkaConsumer)  
  16.         {  
  17.             _consumer = kafkaConsumer;  
  18.         }  
  19.         protected override async Task ExecuteAsync(CancellationToken stoppingToken)  
  20.         {  
  21.             try  
  22.             {  
  23.                 await _consumer.Consume(KafkaTopics.RegisterUser, stoppingToken);  
  24.             }  
  25.             catch (Exception ex)  
  26.             {  
  27.                 Console.WriteLine($"{(int)HttpStatusCode.InternalServerError} ConsumeFailedOnTopic - {KafkaTopics.RegisterUser}, {ex}");  
  28.             }  
  29.         }  
  30.   
  31.         public override void Dispose()  
  32.         {  
  33.             _consumer.Close();  
  34.             _consumer.Dispose();  
  35.   
  36.             base.Dispose();  
  37.         }  
  38.     }  
  39. }  
And Handler code,
  1. using System;  
  2. using System.Threading.Tasks;  
  3. using Kafka.Constants;  
  4. using Kafka.Interfaces;  
  5. using Kafka.Messages.UserRegistration;  
  6.   
  7. namespace Kafka_ConsumerApplication.Core.kafkaEvents.UserRegistration.Handlers  
  8. {  
  9.     public class RegisterUserHandler : IKafkaHandler<string, RegisterUser>  
  10.     {  
  11.         private readonly IKafkaProducer<string, UserRegistered> _producer;  
  12.   
  13.         public RegisterUserHandler(IKafkaProducer<string, UserRegistered> producer)  
  14.         {  
  15.             _producer = producer;  
  16.         }  
  17.         public Task HandleAsync(string key, RegisterUser value)  
  18.         {  
  19.             // Here we can actually write the code to register a User  
  20.             Console.WriteLine($"Consuming UserRegistered topic message with the below data\n FirstName: {value.FirstName}\n LastName: {value.LastName}\n UserName: {value.UserName}\n EmailId: {value.EmailId}");  
  21.   
  22.             //After successful operation, suppose if the registered user has User Id as 1 the we can produce message for other service's consumption  
  23.             _producer.ProduceAsync(KafkaTopics.UserRegistered, ""new UserRegistered { UserId = 1 });  
  24.   
  25.             return Task.CompletedTask;  
  26.         }  
  27.     }  
  28. }  
Here you can see that after reading the message and printing it on console we are producing another message in a different topic named UserRegistered to inform that a user has been registered.
 
Now, this topic can be consumed by some service that may be notifying the User about the completion of the registration. So, we can define our own workflows depending on the requirements and can get benefit from Kafka. 
 

Running Kafka Locally

 
As so far we have understood what has been written inside our Producer and Consumer Apps, so let's quickly start the Kafka Broker and Zookeeper locally and see this transaction happening, but the pre-requisite is that you must have docker installed on your system.
 
So to run the Kafka locally, go to the location where you have kept the docker-compose.yml file (As shared and mentioned in the previous article) and execute the below command.
  1. docker-compose up  
If you already have these containers, simply run them again.
 
Building Real Time Applications Using .Net Core And Kafka (Continuation)
 
And we can see that both are running
 
Building Real Time Applications Using .Net Core And Kafka (Continuation)
 
Producing and Consuming the message
 
As our groundwork is done, so let’s run both of our producer and consumer apps and see this in action.
 
Below is the interface of our Producer App.
 
Building Real Time Applications Using .Net Core And Kafka (Continuation)
 
Here using Swagger, we are submitting our request to register a User with the above-mentioned details.
 
We can see that the topic RegisterUser has been created successfully and we have a message produced in the topic as well.
 
Building Real Time Applications Using .Net Core And Kafka (Continuation)
 
As soon as the message is produced, Our background service in the Consumer app immediately picked up the message and printed that on the console.
 
Building Real Time Applications Using .Net Core And Kafka (Continuation)
 
And after printing the message, we are producing another message in the UserRegistered topic informing that a User has been registered successfully with the UserId as 1.
 
Now, let's see if we have the new topic UserRegistered created?
 
Building Real Time Applications Using .Net Core And Kafka (Continuation)
 
Yes, we can see the topic is created successfully and the message is also produced by the consumer app.
 
Building Real Time Applications Using .Net Core And Kafka (Continuation)
 
So, we have successfully established communication b/w our different apps to complete the process of User registration.
 
NOTE
All the commands which we have used have already been explained in the previous article. Also, the entire source code is attached to this article.
 

SUMMARY

 
To sum up we can say that, we have built two applications using .Net Core, which are communicating with each other using Kafka as a message streaming platform that sits in the middle of both the applications.
 
I hope you find this article helpful. Stay tuned for more … Cheers!!
 
You can also check out some of my previous articles on Kafka mentioned below,