.Net Core - C# Web API with Apache Kafka Integration

Introduction

 
Here are the simple steps to write  KAFKA consumer listeners using the C#.Net Core program.  I also explain the steps to integrate this with an existing web API.
 

Simple steps to create Kafka Consumer

  • Create a .NET Core console application  on an existing/new solution and add a class Class “MyKafkaConsumer”
  • Nuget install “Confluent.Kafka” - Confluent's .NET Client for Apache Kafka - is required with ConsumerConfig
  • Write the following lines on Startup.cs file
  1. //// start kafka consumer    
  2. var consumerConfig = new ConsumerConfig();  
  3. Configuration.Bind("consumer", consumerConfig);  
  4. services.AddSingleton < ConsumerConfig > (consumerConfig);  
  5. //Note :- Please make sure all the other related service(s) which you are using // part of your business logic are added here like below;    
  6. services.AddTransient < interface.IMyBusinessServices, Implementations.MyBusinessServices > ();  
  7. services.AddHostedService < MyKafkaConsumer > (); //important   
Modify the class “MyKafkaConsumer” by implementing base class Microsoft.Extensions.Hosting.BackgroundService and implementing the following base class overload methods;
  1. Task StartAsync(CancellationToken cancellationToken)  
  2. ask ExecuteAsync(CancellationToken stoppingToken); 
Write a constructor and inject and do a dependency injection to set ConsumerConfig, Microsoft.Extensions.Options. IOptions< AppSettings> and any additional business logic / service.
 
Example
  1. public MyKafkaConsumer(ConsumerConfig consumerConfig, IMyBusinessServices mybusinessServices, IOptions < AppSettings > appSettings) {  
  2.  _consumerConfig = consumerConfig;  
  3.  _mybusinessServices = mybusinessServices,  
  4.   _appSettings = appSettings  
Write the below in StartConsumer method;
  1. private async Task StartConsumer(CancellationToken stoppingToken) {  
  2.  while (!stoppingToken.IsCancellationRequested) { //read user from kafka  
  3.   using(var consumer = new ConsumerBuilder < Ignore, string > (_consumerConfig).Build()) {  
  4.    consumer.Subscribe("kafkaListenTopic_From_Producer");  
  5.    var consumeResult = consumer.Consume().Value;  
  6.    if (!consumeResult.IsNullOrEmpty()) { //write your business logic to invoke IMyBusinessServices here  
  7.    }  
  8.   }  
  9.  }  
  10. }
Write the below code on ExecuteAsync method.
  1. protected override Task ExecuteAsync(CancellationToken stoppingToken) {  
  2.  Task.Run(() => StartConsumer(stoppingToken));  
  3.  return Task.CompletedTask;  
Now the Kafka service is ready to test with a sample message. Your console application can be run independently.
 
Follow the below steps if you want to test the application;
 
Download Kafka from kafka.apache.org website  - and follow steps 1 to 4 from - https://kafka.apache.org/quickstart and start the application – MyKafkaConsumer which you wrote.
 
Place a debugger point inside the StartConsumer method. You can see messages are coming to Kafka reader when a producer sends, and your logic should work as normal.
 
Now your Kafka code is ready to sync with WEB API. Follow the below steps to integrate with Web API
  • Create a regular  .Net Core web API project and add to a solution with default configurations.

  • Create another console application/class library project within the same solution (you can create a console application as well to test the service independently to validate the service is listening and up and running). This is the class for which we are going to write the logic for Kafka consumer background service.

  • Create Kafka consumer service by using base class BackgroundService.

  • Perform individual tests of the Kafka consumer service project by using Kafka producer service inputs and validate the consumer listening logic. After successful validation of individual component, we can integrate Kafka background service with web API.

  • Move all the configuration entries (such as consumer bootstrapservers, groupid, etc.) from the Kafka class library/console application to the original Web API appconfig.json file. This file will be the one and only place to read all your configuration entries.

  • Now we need to make some modification on your startup.cs file like below:
  1. services.AddHostedService<MyKafkaConsumer>();  
  2. // MyKafkaConsumer is the listener class to listen to Kafka messages. 
You will not able to instantiate EF Core context directly if you are using background service. You need to inject EF Core on your database repository. You can use it like below on constructor:
  1. Public class MyClassRepository{  
  2. MyDatabaseContext _myDatabaseContext ;  
  3. private IServiceScopeFactory _serviceScopeFactory;  
  4. Public MyClassRepository(  
  5. public MyClassRepository(IServiceScopeFactory serviceScopeFactory){  
  6. _serviceScopeFactory = serviceScopeFactory;  
  7. IServiceScope scope = _serviceScopeFactory.CreateScope();  
  8. _myDatabaseContext = scope.ServiceProvider.GetRequiredService<MyDatabaseContext>();  
  9. }  
  10. // now your dbcontext is ready to use  

Summary

 
In this article, we learned about .Net Core - C# web API with Apache Kafka integration.