Integrating Kafka Producers and Consumers with Celery

Apache Kafka

Apache Kafka is a distributed streaming platform capable of handling trillions of events a day. It enables you to publish and subscribe to streams of records, store records in a fault-tolerant way, and process them as they occur. Kafka is widely used for stream processing, log aggregation, and real-time analytics.

Celery

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation but supports scheduling as well. The execution units, called tasks, are executed concurrently on one or more worker nodes using multiprocessing, Eventlet, or Gevent.

Setup and Requirements

Before diving into the implementation, ensure you have a running Kafka cluster and a Redis or RabbitMQ service for Celery. You'll also need Python installed on your system. This guide assumes basic knowledge of Python and familiarity with command-line tools.

Step 1. Installing Dependencies

Install the necessary Python libraries for Kafka and Celery:

pip install confluent-kafka celery redis

Step 2. Creating a Kafka Producer

A Kafka producer sends messages to Kafka topics. The following Python script demonstrates a simple producer sending a message to a specific topic:

from confluent_kafka import Producer

config = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(**config)

def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()}')

topic = 'test_topic'
message = 'Hello, Kafka!'
producer.produce(topic, message.encode('utf-8'), callback=delivery_report)
producer.flush()

Step 3. Setting Up Celery

Define your Celery app and tasks in a celery_app.py file. The task will process messages consumed from Kafka:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def process_message(message):
    print(f"Processing message: {message}")
    # Insert message processing logic here

Step 4. Creating a Kafka Consumer to Offload Tasks to Celery

The Kafka consumer script consumes messages from the Kafka topic and uses Celery to process them asynchronously:

from confluent_kafka import Consumer
from celery_app import process_message

config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(**config)
consumer.subscribe(['test_topic'])

try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        process_message.delay(msg.value().decode('utf-8'))
finally:
    consumer.close()

Step 5. Running the System

  1. Start the Kafka cluster and ensure the topic test_topic exists.
  2. Run the Celery worker: Navigate to your project directory and start a Celery worker:
    celery -A celery_app worker --loglevel=info
    
  3. Run the Kafka producer script to send messages to Kafka.
  4. Run the Kafka consumer script; it consumes messages from Kafka and offloads them to Celery for processing.

Conclusion

Integrating Kafka with Celery provides a powerful architecture for processing data streams asynchronously. Kafka handles the high-throughput, distributed messaging, while Celery allows for flexible, distributed task processing. This setup is ideal for applications requiring real-time data processing and analytics, scalable microservices, and asynchronous task execution in distributed systems.


Similar Articles