Introduction to Apache Kafka with .NET

Introduction

Apache Kafka is a distributed event streaming platform developed by the Apache Software Foundation. It is designed to handle real-time data feeds and provides a publish-subscribe model for building real-time data pipelines and streaming applications.

Key features of Apache Kafka

  1. Distributed System: Kafka is designed as a distributed system, allowing it to scale horizontally across multiple servers and clusters. This ensures fault tolerance and high availability.
  2. Publish-Subscribe Model: Kafka follows a publish-subscribe messaging pattern. Producers publish messages to topics, and consumers subscribe to topics to receive those messages.
  3. Fault Tolerance: Kafka provides fault tolerance by replicating data across multiple brokers (servers). If one broker fails, another can take over, ensuring continuous availability of the data.
  4. Scalability: Kafka scales easily by adding more brokers to the cluster. This makes it suitable for handling large amounts of data and high-throughput scenarios.
  5. Durability: Kafka retains messages for a configurable retention period. This durability ensures that even if a consumer goes down temporarily, it can catch up on missed messages when it comes back online.
  6. Streams Processing: Kafka Streams, a component of Kafka, enables real-time processing of data streams. This allows developers to build applications that can process and react to data as it flows through the Kafka cluster.
  7. Connectors: Kafka Connect provides a framework for building and running reusable connectors that connect Kafka with various external systems such as databases, file systems, and more.
  8. Open Source: Kafka is an open-source project, meaning its source code is freely available, and it benefits from a large community of developers contributing to its development and improvement.

Installation and Setup of Kafka

Step 1. Docker is one of the most popular container engines used in the software industry to create, package, and deploy applications.

Set up Docker Desktop on your machine and make sure the Docker engine is up and running.

Containers

Step 2. Single node Kafka broker setup would meet most of the local development needs. To start an Apache Kafka server, we’d first need to start a Zookeeper server.

We can configure this dependency in a docker-compose.yml file (Attached to this article), which will ensure that the Zookeeper server always starts before the Kafka server and stops after it.

Let’s create a simple docker-compose.yml file with two services, namely Zookeeper, and Kafka. Place the docker-compose.yml file in a folder on your machine.

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

In this setup, our Zookeeper server is listening on port=2181 for the Kafka service, which is defined within the same container setup. However, for any client running on the host, it’ll be exposed on port 22181.

Similarly, the Kafka service is exposed to the host applications through port 29092, but it is actually advertised on port 9092 within the container environment configured by the KAFKA_ADVERTISED_LISTENERS property.

Step 3. Start Kafka Server, let’s start the Kafka server by spinning up the containers using the docker-compose command:

Open Windows PowerShell -> navigate to the folder where the above docker-compose.yml file is present and Execute the below command.

docker-compose up -d

Output

Kafka Default

Step 4. Now let’s use the nc command to verify that both the servers are listening on the respective ports.

$ nc -z localhost 22181
Connection to localhost port 22181 [tcp/*] succeeded!
$ nc -z localhost 29092
Connection to localhost port 29092 [tcp/*] succeeded!

Additionally, we can also check the verbose logs while the containers are starting up and verify that the Kafka server is up:

$ docker-compose logs kafka | grep -i started
kafka_1      | [2021-04-10 22:57:40,413] DEBUG [ReplicaStateMachine controllerId=1] Started replica state machine with initial state -> HashMap() (kafka.controller.ZkReplicaStateMachine)
kafka_1      | [2021-04-10 22:57:40,418] DEBUG [PartitionStateMachine controllerId=1] Started partition state machine with initial state -> HashMap() (kafka.controller.ZkPartitionStateMachine)
kafka_1      | [2021-04-10 22:57:40,447] INFO [SocketServer brokerId=1] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT) (kafka.network.SocketServer)
kafka_1      | [2021-04-10 22:57:40,448] INFO [SocketServer brokerId=1] Started socket server acceptors and processors (kafka.network.SocketServer)
kafka_1      | [2021-04-10 22:57:40,458] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

With that, our Kafka setup is ready for use.

Step 5. Verify the same in the Docker desktop.

Kafka server

Step 6. Connection Using Kafka Tool.

Finally, let’s use the Kafka Tool GUI utility to establish a connection with our newly created Kafka server, and later, we’ll visualize this setup:

Kafka LocalZoo

We must note that we need to use the Bootstrap servers property to connect to the Kafka server listening at port 29092 for the host machine.

Finally, we should be able to visualize the connection on the left sidebar:

Cluster

As such, the entries for Topics and Consumers are empty because it’s a new setup. Once the topics are created, we should be able to visualize data across partitions. Moreover, if there are active consumers connected to our Kafka server, we can view their details too.

Step 7. Now, let's create a WebAPI project with .Net Minimal API.

Step 8. Add the below changes to the appsettings.json file to configure the Kakfa broker.

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "KafkaProducerConfig": {
    "bootstrapServers": "localhost:29092"
  },
  "KafkaConsumerConfig": {
    "GroupId": "localhost:22181",
    "BootstrapServers": "localhost:29092",
    "AutoOffsetRest": "Earliest",
    "EnableAutoOffsetStore": false
  }
}

Step 9. Install below Nuget packages below for the project.

Confluent.Kafka
Newtonsoft.Json

Step 10. Create a folder called 'Kafka' and add interface and class files for Kafka Producer and Consumer.

IKafkaProducer.cs

public interface IKafkaProducer
{
    public void SendMessage<T>(string topic, string key, T message);
}

KafkaProducer.cs: This is a Topic producer class, which is used to publish the Topic into Kafka broker with the help of Confluent.Kafka package and with the method SendMessage.

public class KafkaProducer : IKafkaProducer
{
    private readonly IProducer<string, string> kafkaProducer;

    public KafkaProducer(string bootstrapServers)
    {
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = bootstrapServers,
        };

        this.kafkaProducer = new ProducerBuilder<string, string>(producerConfig).Build();
    }

    public void SendMessage<T>(string topic, string key, T message)
    {
        var serialized_message = JsonConvert.SerializeObject(message);
        kafkaProducer.Produce(topic, new Message<string, string> { Key = key, Value = serialized_message });
    }

    public void Dispose()
    {
        kafkaProducer?.Dispose();
    }
}

IKafkaConsumer.cs

public interface IKafkaConsumer
{
    public void StartConsuming(string topic);
}

KafkaConsumer.cs: This is a Topic Consumer class, which is used to Read the Topic of Krom Kafka broker with the help of Confluent.Kafka package and with the method StartConsuming.

public class KafkaConsumer : IKafkaConsumer
{
    private readonly IConsumer<string, string> kafkaConsumer;

    public KafkaConsumer(string bootstrapServers, string groupId, string autoOffsetRest, string enableAutoOffsetStore)
    {
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = bootstrapServers,
            GroupId = groupId,
            AutoOffsetReset = ConvertToAutoOffsetReset(autoOffsetRest),// AutoOffsetReset.Earliest, 
            EnableAutoOffsetStore = Convert.ToBoolean(enableAutoOffsetStore),
        };

        this.kafkaConsumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
    }

    static AutoOffsetReset ConvertToAutoOffsetReset(string value)
    {
        return value.ToLower() switch
        {
            "earliest" => AutoOffsetReset.Earliest,
            "latest" => AutoOffsetReset.Latest,
            "error" => AutoOffsetReset.Error,
            _ => throw new ArgumentException($"Invalid AutoOffsetReset value: {value}")
        };
    }

    public void StartConsuming(string topic)
    {
        // Subscribe to the Kafka topic
        kafkaConsumer.Subscribe(topic);

        // Start an infinite loop to consume messages
        while (true)
        {
            try
            {
                // Consume messages from the Kafka topic
                var consumeResult = kafkaConsumer.Consume(CancellationToken.None);

                // Process the consumed message (implement your logic here)
                Console.WriteLine($"Received message: {consumeResult.Offset} \n");
                Console.WriteLine($"Received message: {consumeResult.Message.Key} \n") ;
                Console.WriteLine($"Received message: {consumeResult.Message.Value} \n\n\n");
            }
            catch (ConsumeException ex)
            {
                // Handle any errors that occur during message consumption
                Console.WriteLine($"Error consuming message: {ex.Error.Reason}");
            }
        }
    }

    public void Dispose()
    {
        // Properly dispose of the Kafka consumer
        kafkaConsumer?.Dispose();
    }
}

Step 11. Let's add 2 APIs for publishing and consuming the Topic to and from Kafka in the Program.cs file.

a) add the below code to read configuration related to Kafka from appsettings.json in Program.cs file.


var objBuilder = new ConfigurationBuilder()
          .SetBasePath(Directory.GetCurrentDirectory())
          .AddJsonFile("appSettings.json", optional: true, reloadOnChange: true);
IConfiguration configuration = objBuilder.Build();

//kafka
builder.Services.AddSingleton<IKafkaProducer>(provider =>
{
    var configuration = provider.GetRequiredService<IConfiguration>();
    var bootstrapServers = configuration["KafkaProducerConfig:bootstrapServers"];

    return new KafkaProducer(bootstrapServers);
});

builder.Services.AddSingleton<IKafkaConsumer>(provider =>
{
    var configuration = provider.GetRequiredService<IConfiguration>();
    var bootstrapServers = configuration["KafkaConsumerConfig:BootstrapServers"];
    var groupId = configuration["KafkaConsumerConfig:GroupId"];
    var autoOffsetRest = configuration["KafkaConsumerConfig:AutoOffsetRest"];
    var enableAutoOffsetStore = configuration["KafkaConsumerConfig:EnableAutoOffsetStore"];

    return new KafkaConsumer(bootstrapServers, groupId, autoOffsetRest, enableAutoOffsetStore);
});

b) Use existing weatherforecast API which got created along with the project to publish the Topic for Kafka.

In the below code, the Message is published with the Topic: message_weatherforecast with Key: Key_1.

app.MapGet("/weatherforecast", async([FromServices]IKafkaProducer kafkaProducer) =>
{
    var forecast = Enumerable.Range(1, 2).Select(index =>
        new WeatherForecast
        (
            DateTime.Now.AddDays(index),
            Random.Shared.Next(-20, 55),
            summaries[Random.Shared.Next(summaries.Length)]
        ))
        .ToArray();

    var topic = "message_weatherforecast";
    var key = "Key_1";
    kafkaProducer.SendMessage(topic, key, forecast);

    return forecast;
})
.WithName("GetWeatherForecast");

internal record WeatherForecast(DateTime Date, int TemperatureC, string? Summary)
{
    public int TemperatureF => 32 + (int)(TemperatureC / 0.5556);
}

c) Create a New API 'getForecastDetails' to read the Message that is published to Kafka.

In the below code, the Message is read with the Topic: message_weatherforecast.

app.MapGet("/getForecastDetails", async ([FromServices] IKafkaConsumer kafkaConsumer) =>
{    
    var topic = "message_weatherforecast";    

    kafkaConsumer.StartConsuming(topic);
})
.WithName("GetForecastDetails");

Step 12. Run the project and we should get below the swagger page.

GET

Step 13. Now execute the 'WeatherForecast' API and check the response has appeared and that same has been published to Kafka via the Kafka UI tool with the topic name.

GET Code

Responses

Verify Topic has been published in Kafka UI.

Kafka UI

Verify the message present in the topic in the Data tab.

Data tab

If the value is displayed in an encoded format, then modify Content Types to string at the Properties tab.

String

Step 14. Now try executing the 'GetForecastDetails' API and it will be running continuously to consume the message available in Kafka for the registered Topic.

Registered topic

The result of the above API can be viewed in the console. This will be the same as the data present in the Value column of the Kafka UI.

API

Summary

Kafka is widely used in industries such as finance, telecommunications, retail, and more for various use cases, including log aggregation, data integration, real-time analytics, and event-driven microservices architectures.