Apache Kafka with Dapr Bindings in .NET

Previous Article

What is Apache Kafka?

Apache Kafka is an open-source distributed event streaming platform used for building real-time data pipelines and streaming applications. Originally developed by LinkedIn and later open-sourced as an Apache Software Foundation project, Kafka provides a scalable, fault-tolerant, and high-throughput platform for handling real-time data feeds.

Key features of Apache Kafka

  1. Publish-Subscribe Messaging System: Kafka follows the publish-subscribe messaging pattern. Producers publish messages to topics, and consumers subscribe to those topics to receive messages.
  2. Distributed: Kafka is designed to be distributed across multiple nodes or clusters for scalability and fault tolerance. It allows you to horizontally scale your system by adding more broker nodes.
  3. Partitioning: Topics in Kafka are divided into partitions, which allow data to be distributed and processed in parallel across multiple nodes. Each partition can be hosted on a different broker.
  4. Durability: Kafka is designed for durability, meaning that once data is written to a partition, it persists and can be retrieved even if a broker or node fails.
  5. Fault Tolerance: Kafka provides fault tolerance by replicating data across multiple brokers. If a broker fails, another broker can take over the responsibility of serving the data.
  6. High Throughput: Kafka is known for its high throughput and low-latency capabilities, making it suitable for use cases that require real-time data processing.
  7. Connectivity: Kafka has client libraries for various programming languages, making it easy to integrate with different applications and systems.
  8. Streaming APIs: Kafka includes streaming APIs that allow developers to build stream processing applications, enabling the transformation and analysis of data in real time.
  9. Ecosystem: Kafka has a rich ecosystem that includes connectors for integrating with various data sources and sinks, as well as tools like Kafka Streams for building stream processing applications.

What is Dapr?

Dapr, or Distributed Application Runtime, is an open-source runtime that simplifies the development of microservices and distributed applications. It provides a set of building blocks, libraries, and best practices for developers to build scalable, resilient, and portable applications. Dapr abstracts away the complexities of distributed systems, allowing developers to focus on building business logic rather than dealing with the intricacies of distributed architectures.

Key features and components of Dapr

  1. Service Invocation: Dapr enables microservices to invoke each other using a simple and consistent API, abstracting away the underlying communication mechanisms (HTTP, gRPC, etc.).
  2. State Management: Dapr provides a distributed state management API, allowing applications to store and retrieve state consistently across multiple microservices.
  3. Pub/Sub Messaging: Dapr supports the publish-subscribe pattern for messaging, allowing microservices to communicate asynchronously through events.
  4. Event-driven Programming Model: Dapr promotes an event-driven programming model, where microservices can publish and subscribe to events, enabling loosely coupled architectures.
  5. Secrets Management: Dapr provides a secrets management API for securely accessing sensitive information, such as API keys, connection strings, etc.
  6. Bindings: Dapr introduces the concept of bindings, which allows microservices to interact with external systems (like databases, message brokers, etc.) through a simple and consistent API.
  7. Middleware: Dapr supports middleware, allowing developers to add cross-cutting concerns (like authentication, logging, and tracing) to their applications.
  8. Actors: Dapr includes an actor model for building stateful microservices, providing a programming model similar to frameworks like Microsoft Orleans.
  9. Observability: Dapr integrates with popular observability tools, making it easier to monitor and trace microservices in production.
  10. Cross-language and Cross-framework: Dapr is designed to be language-agnostic and can be used with any programming language. It also integrates with various frameworks and platforms.
  11. Cloud Agnostic: Dapr is cloud-agnostic, allowing applications to be deployed across various cloud providers or on-premises environments.
  12. Open Source and Community-Driven: Dapr is an open-source project with an active community of contributors. It is part of the Cloud Native Computing Foundation (CNCF).

What is Dapr Bindings?

Dapr Bindings are a set of abstractions and building blocks that simplify the integration of Dapr applications with external services, systems, and events. Bindings provides a consistent and language-agnostic way for Dapr applications to interact with various external resources without having to worry about the underlying implementation details of each specific service.

Input Binding Component-

Key features and concepts of Dapr Bindings include

  1. Abstraction Layer: Bindings abstract away the complexity of working with external services by providing a standard programming model. Developers interact with the bindings using a consistent API regardless of the specific service being used.
  2. Supported Bindings: Dapr comes with built-in bindings for various external systems, including message brokers (like Kafka and RabbitMQ), databases (like Azure Cosmos DB and MongoDB), and other services (like Azure Blob Storage and Twilio). Additional bindings can be created and added by the community.
  3. Input and Output Bindings
    • Input Bindings: Allow Dapr applications to consume events or data from external sources.Kafka
    • Output Bindings: Enable Dapr applications to send events or data to external destinations.
      Service-.jpg
  4. Configuration: Bindings are configured through Dapr's configuration files (YAML or JSON). Configuration specifies details such as the type of binding, connection parameters, and other settings.
  5. Language-Agnostic: Dapr Bindings are designed to be language-agnostic. Whether you're writing your microservices in Java, .NET, Python, or any other supported language, you can interact with bindings using a consistent API.
  6. Event-Driven Model: Bindings are inherently event-driven. They allow Dapr applications to consume and produce events using the publish-subscribe pattern. This aligns well with Dapr's broader event-driven programming model.
  7. Extension Points: Developers can create custom bindings for their specific needs or integrate with services not covered by the built-in bindings. This flexibility allows Dapr to be extensible and adaptable to various scenarios.

Benefits of using Dapr Bindings for Kafka

  1. Consistent Abstraction: Dapr Bindings abstracts away the complexities of interacting with Kafka, providing a consistent programming model across different languages and platforms.
  2. Input and Output Bindings: Dapr supports both Input and Output Bindings for Kafka.
    • Input Bindings: Allow Dapr applications to consume events from Kafka topics.
    • Output Bindings: Enable Dapr applications to produce events to Kafka topics.
  3. Event-Driven Programming: Dapr promotes an event-driven programming model, and Kafka fits well into this paradigm. Applications can publish events to Kafka topics or subscribe to topics to consume events.
  4. Configuration with Dapr Components: Kafka bindings are configured through Dapr components using YAML or JSON files. Configuration includes details such as Kafka brokers, topics, and authentication settings.
  5. Language-Agnostic: Dapr Bindings are designed to be language-agnostic. Whether you're using Java, .NET, Python, or other supported languages, you can interact with Kafka bindings using a consistent API.
  6. Built-In Kafka Binding: Dapr comes with a built-in Kafka binding that simplifies the configuration and interaction with Kafka topics. Developers can use this binding out of the box or create custom bindings for specific use cases.
  7. Event Sourcing and Stateful Processing: Kafka can be used for event-sourcing architectures, where changes to the state of an application are captured as events. Dapr's state management and actors can complement Kafka in building stateful microservices.
  8. Flexibility and Extensibility: Dapr Bindings provide flexibility and extensibility. Developers can use the built-in Kafka binding or create custom bindings for unique scenarios, allowing Dapr to integrate with a wide range of services and systems.
  9. Fault Tolerance and Scalability: Kafka's built-in features for fault tolerance and scalability align well with Dapr's goals. Dapr applications can benefit from Kafka's distributed and resilient nature.
  10. Integration with Dapr Ecosystem: Kafka bindings seamlessly integrate with the broader Dapr ecosystem, including service invocation, state management, and other Dapr building blocks. This allows developers to build comprehensive and scalable distributed applications.

Let's start implementing Kafka with Dapr Bindings using .Net

Prerequisites: To get started with Dapr, the system has to be installed with Dapr CLI. Refer to the link 'Install-dapr' to install dapr.

Step 1. Setting up Kakfaand running at your system.

Follow steps 1 - 5 from my previous article "Introduction to Apache Kafka with .Net" which has details of setting up the Kafka.

Step 2. Now, let's create a WebAPI project with .Net Minimal API.

Step 3. Install the below NuGet packages for the project.

  • Dapr.AspNetCore
  • modeling

Step 4. Create a folder called Components and place it below yaml files. These YAML files are used by Dapr for input and output bindings.

Component

  • binding-kafka-input.yaml
    Use Dapr input bindings to trigger event-driven applications. With input bindings, you can trigger your application when an event from an external resource occurs. An external resource could be a queue, messaging pipeline, cloud service, filesystem, etc. An optional payload and metadata may be sent with the request. Input bindings are ideal for event-driven processing, data pipelines, or generally reacting to events and performing further processing. Dapr input bindings allow you to.
  • Receive events without including specific SDKs or libraries
  • Replace bindings without changing your code
  • Focus on business logic and not the event resource implementation
    Within the metadata section, configure the following Kafka-related properties
  • The topic to which you’ll publish the message
  • The broker
    As mentioned above, below is the configuration for Dapr Input binding. This YAML file consists of info about Kafka Broker, Kafka Topic, Group, Direction, and authentication type.
    apiVersion: dapr.io/v1alpha1
    kind: Component
    metadata:
      name: kafka-binding-input
    spec:
      type: bindings.kafka
      version: v1
      metadata:
        # Kafka broker connection setting
        - name: brokers
          value: "localhost:29092"
        # consumer configuration: topic and consumer group
        - name: topics 
          value: "kafka_binging_11,kafka_binging_12"
        - name: consumerGroup
          value: "group2"
        - name: authRequired
          value: "false"
        - name: authType
          value: "none"
        - name: initialOffset # Optional. Used for input bindings.
          value: "newest"
        - name: version # Optional.
          value: "2.0.0"
        - name: direction
          value: "input"
  • binding-kafka-output.yaml
    Invoke external systems with output bindings. With output bindings, you can invoke external resources. An optional payload and metadata can be sent with the invocation request. Within the metadata section, configure the following Kafka-related properties.
  • The topic to which you’ll publish the message
  • The broker
    As mentioned above, below is the configuration for Dapr Output binding. This YAML file consists of info about Kafka Broker, Kafka Topic, Direction, and authentication type.
    apiVersion: dapr.io/v1alpha1
    kind: Component
    metadata:
      name: kafka-binding-output
    spec:
      type: bindings.kafka
      version: v1
      metadata:
        # Kafka broker connection setting
        - name: brokers
          value: "localhost:29092"
        # publisher configuration: topic
        - name: publishTopic 
          value: "kafka_binging_12"
        - name: authRequired
          value: "false"
        - name: authType
          value: "none"
        - name: direction
          value: "output"
    
  • configuration.yaml
    apiVersion: dapr.io/v1alpha1
    kind: Configuration
    metadata:
      name: configuration
    spec:
      features:
        - name: PubSub.Routing
          enabled: true
        - name: Resiliency
          enabled: true
    

Step 5. Create a folder 'Models' and add the below objects.

Models-

Client. cs

public class Client
{
    public int OrderId { get; set; }
    public string ClientName { get; set; }
}

Customer. cs

 public class Customer
 {
     public int OrderId { get; set; }
     public string CustomerName { get; set; }
 }

Step 6. Create a folder 'Services' and add the below service into it.

Services-.jpg

IAcl.cs

 public interface IAcl
 {
     Client MapToClientModel(Customer customer);
 }

Acl. cs

 public class Acl : IAcl
 {
     public Client MapToClientModel(Customer customer)
     {
         var client = new Client
         {
             OrderId = customer.OrderId,
             ClientName = customer.CustomerName,
         };

         return client;
     }
 }

Step 7. Create a folder called API and add the below files.

IWebApi.cs: This interface is used to register APIs

 public interface IWebApi
 {
     void Register(WebApplication app);
 }

 public interface IWebApiAsync
 {
     Task RegisterAsync(WebApplication app);
 }

WebApiSetup.cs

 public static class WebApiSetup
 {
     public static void AddWebApi(this IServiceCollection services, Type markerType)
     {
         services.RegisterImplementationsOf<IWebApi>(markerType);
         services.RegisterImplementationsOf<IWebApiAsync>(markerType);
     }

     public static async Task RegisterWebApisAsync(this WebApplication app)
     {

         using (var scope = app.Services.CreateScope())
         {
             var scopedProvider = scope.ServiceProvider;

             var webApis = scopedProvider.GetServices<IWebApi>();
             foreach (var webApi in webApis)
             {
                 webApi.Register(app);
             }

             var asyncWebApis = scopedProvider.GetServices<IWebApiAsync>();
             await Task.WhenAll(asyncWebApis.Select(x => x.RegisterAsync(app)));
         }
     }
 }

ServiceCollectionExtensions.cs

public static class ServiceCollectionExtensions
{
    public static void RegisterImplementationsOf<T>(this IServiceCollection services, Type markerType, ServiceLifetime lifetime = ServiceLifetime.Transient) =>
        services.RegisterImplementationsOf(markerType, typeof(T), lifetime);

    public static void RegisterImplementationsOf(this IServiceCollection services, Type markerType, Type interfaceType, ServiceLifetime lifetime = ServiceLifetime.Transient) =>
        markerType.Assembly.GetTypes()
            .Where(x => x.DoesImplementInterfaceType(interfaceType))
            .ForEach(x => services.Add(new ServiceDescriptor(x.GetInterfaces()
                .First(y => y.IsGenericType ? y.GetGenericTypeDefinition() == interfaceType : y == interfaceType), x, lifetime)));

    public static bool DoesImplementInterfaceType(this Type type, Type interfaceType) =>
        !type.IsAbstract &&
        type.IsClass &&
        type.GetInterfaces().Any(y => y.IsGenericType ? y.GetGenericTypeDefinition() == interfaceType : y == interfaceType);
}

KafkaPublishviaDapr.cs

The below file is used to publish the topic with the help of Dapr Output bindings. Here we specify, the metadata name of Dapr Output binding and the Operation = create is used to publish the topic.

public class KafkaPublishviaDapr : IWebApi
{
    private readonly IAcl acl;

    public KafkaPublishviaDapr(IAcl acl)
    {
        this.acl = acl;
    }
    public void Register(WebApplication app)
    {
        app.MapPost("/publish-topics", async ([FromBody] Customer wcaModel) =>
        {
            var newModel = acl.MapToClientModel(wcaModel);

            string BINDING_NAME = "kafka-binding-output";
            string BINDING_OPERATION = "create";
            try
            {
                using var daprClient = new DaprClientBuilder().Build();
                await daprClient.InvokeBindingAsync(BINDING_NAME, BINDING_OPERATION, newModel
                    , metadata: 
                    new Dictionary<string, string>
                    {                       
                        { "partitionKey", "key1"}
                    }
                    );
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error during binding invocation: {ex.Message}");
                return Results.BadRequest();
            }
            return Results.Ok();
        })
       .WithMetadata(new EndpointNameMetadata("publish-topics"));
    }
}

KafkaConsumeviaDapr.cs

The below code is used to consume the message via the Dapr Input Binding published with the topic. The Endpoint of the API should match with theMetaData name of the Dapr Input Binding. if it not matching, then input binding will not work.

public class KafkaConsumeviaDapr : IWebApi
{
    private readonly IAcl acl;

    public KafkaConsumeviaDapr(IAcl acl)
    {
        this.acl = acl;
    }
    public void Register(WebApplication app)
    {
        app.MapPost("/kafka-binding-input",  ([FromBody] Client message, [FromServices] DaprClient daprClient) =>
        {
            try
            {
                Console.WriteLine($"Received message: {message.ClientName}");
                Console.WriteLine($"Received message: {message.OrderId}");
                return Results.Ok();
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error during binding invocation: {ex.Message}");
                return Results.BadRequest();
            }
        })
       .WithMetadata(new EndpointNameMetadata("kafka-binding-input"));
    }
}

Step 8. Add the below code into the Program. cs.

In the below code, I added DaprClient, WebApi, and ACL services.

public class Program
{
    public static async Task Main(string[] args)
    {
        var builder = WebApplication.CreateBuilder(args);
        builder.Services.AddDaprClient();
        builder.Services.AddEndpointsApiExplorer();
        builder.Services.AddSwaggerGen();
        builder.Services.AddWebApi(typeof(Program));       
        builder.Services.AddScoped<IAcl, Acl>();          
        var app = builder.Build();
        if (app.Environment.IsDevelopment())
        {
            app.UseSwagger();
            app.UseSwaggerUI();
        }
        app.UseHttpsRedirection();
        await app.RegisterWebApisAsync();
        await app.RunAsync();
    } 
}

Step 9. Run the project and it should run without any error. But the Swagger page will not load. This is because we are not running the application directly. Instead, we have attached Dapr sidecare to the application and we are running the application via Dapr. So if the paper is attached to the application, then that application should run using the dapr command or else through the child process debugger.

This site can't be reached-

Step 10. Runs the dapr attached application through the child Process debugger. Download and install the Chile Process debugger through the official Microsoft link: Download Child Process debugger.

Step 11. Once the debugger is installed, then navigate to Debug -> other Debug Targets -> Child Process Debugging settings and make the following settings. Even Breakpoints can be attached if the application is running through a child process debugger.

Child Process Debugging settings-.jpg

Step 12. Make the following changes at Properties -> launchSettings.json for the Profiles section.

{
  "$schema": "https://json.schemastore.org/launchsettings.json",
  "iisSettings": {
    "windowsAuthentication": false,
    "anonymousAuthentication": true,
    "iisExpress": {
      "applicationUrl": "http://localhost:16456",
      "sslPort": 44385
    }
  },
  "profiles": {
    "Kafka_publish_subscript_via_DaprBindings": {
      "commandName": "Executable",
      "executablePath": "pwsh",
      "commandLineArgs": "-Command \"Start-Process http://localhost:5013/swagger; dapr run --app-id Kafka_publish_subscript_via_DaprBindings --resources-path components/ --config components/configuration.yaml --app-port 5013 --dapr-http-port 3512 --dapr-grpc-port 50013 --log-level Debug -- dotnet run --no-build --urls http://localhost:5013\"",
      "workingDirectory": ".",
      "environmentVariables": {
        "ASPNETCORE_ENVIRONMENT": "Development"
      },
      "nativeDebugging": true
    },
    "IIS Express": {
      "commandName": "IISExpress",
      "launchBrowser": true,
      "launchUrl": "swagger",
      "environmentVariables": {
        "ASPNETCORE_ENVIRONMENT": "Development"
      }
    }
  }
}

Step 13. Now run the application in normal mode like how other applications will be run. Now, the Swagger page will not load. Instead of that, use Postman or other tools to send the request. And observe the console that has opened. There we can see both dapr and application logs.

Command prompt-13(1)

Command prompt-13(2)

Command prompt-13(3)

Command prompt-13(4)

Step 14. Place the breakpoint at both the APIs and send the request via Postman.

Localhost-14

Step 15. In the first place, the request will hit KafkaPublishviaDapr.cs file where it publishes the message to Kafka through Dapr output binding and then automatically makes a call to API present inKafkaConsumeviaDapr.cs via Dapr input binding.

Kafka Consumevia Dapr-15

Step 16. The output can be verified at the console and Kafka UI.

Console

Console-16

Kafka UI

As mentioned in Kafka-binding-output. yaml, the message is published on the topic mentioned.

Kafka UI

As mentioned in Kafka-binding-input.yaml, The API binding with that yaml file is listening to the topics mentioned and available in the mentioned group.

Kafka Binding Input

Invoking through Dapr End-point without running the solution

The Above method is through an HTTP call for the API publish-topics. But we can directly invoke dapr bindings by making a call to the dapr binding end-point as below.

Step 1. Open Powershell and navigate to the project folder.

Open Powershell

Step 2. Run the below command to call start dapr.

dapr run --app-id dapr_output_binding --resources-path components/ --app-port 6010 --dapr-http-port 3610 --dapr-grpc-port 60010 --app-ssl --log-level Debug dotnet run

Administrator Powershell

Administrator Powershell(2)

Step 3. Open Postman, paste the below command in the URL, and send the request.

curl -X POST http://localhost:3610/v1.0/bindings/kafka-binding-output \
  -H "Content-Type: application/json" \
  -d '{
    "data": {
      "orderId": 1,
      "customerName": "chethan_1"
    },
    "metadata": {
      "partitionKey": "key1"
    },
    "operation": "create"
}'

Postman

Postman Localhost

Verify the above request at KafKa UI

Verify Kafka UI-.jpg

Other additional links can be referred to for a deeper understanding

Summary

Using Apache Kafka with Dapr Bindings provides a simplified and consistent way to integrate Dapr applications with Kafka, a powerful distributed event streaming platform. Dapr Bindings simplifies the integration of Dapr applications with Apache Kafka, providing a high-level abstraction for working with Kafka topics in a consistent and language-agnostic manner. This integration enables developers to focus on building resilient and scalable distributed systems without being burdened by the intricacies of Kafka's implementation details.


Similar Articles