Python  

Implement Rate Limiting Using the Token Bucket Algorithm Using Python

Table of Contents

  • Introduction

  • What Is the Token Bucket Algorithm?

  • Real-World Scenario: Protecting a Weather API During a Natural Disaster

  • How the Token Bucket Works

  • Complete, Error-Free Python Implementation

  • Testing the Rate Limiter in Real Time

  • Best Practices for Production Use

  • Conclusion

Introduction

When your API goes viral—or gets hit by a bot—it can collapse under load, taking down your entire service. Rate limiting is your first line of defense. Among the most effective and flexible strategies is the token bucket algorithm, used by giants like AWS, Google Cloud, and Twitter to control traffic without punishing legitimate users.

In this article, you’ll build a thread-safe, production-ready token bucket rate limiter in pure Python—and see how it saved a weather API during Hurricane Milton.

What Is the Token Bucket Algorithm?

The token bucket models rate limiting like a physical bucket:

  • Tokens are added at a fixed refill rate (e.g., 10 tokens/second)

  • Each incoming request consumes one token

  • If no tokens are available, the request is rejected or delayed

  • The bucket has a maximum capacity (e.g., 20 tokens), allowing short bursts

Unlike a "leaky bucket," token bucket permits bursty traffic—perfect for real-world user behavior where actions come in clusters (e.g., loading a dashboard with 5 API calls at once).

Real-World Scenario: Protecting a Weather API During a Natural Disaster

During Hurricane Milton, millions of people flooded a public weather API to check storm paths. Traffic spiked 100x in minutes. Without rate limiting:

  • Servers crashed from CPU overload

  • Legitimate users got timeouts

  • Emergency responders couldn’t access critical data

With a token bucket limiter:

  • Each user is allowed 10 requests/second, with bursts up to 20

  • Bots and scrapers were blocked after exceeding limits

  • Real users got fast responses during bursts (e.g., loading maps + alerts + forecasts)

  • The API stayed online for everyone who needed it most

PlantUML Diagram

This isn’t hypothetical—it’s exactly how real public APIs survive viral events.

How the Token Bucket Works

  1. Initialize a bucket with max_tokens and a refill_rate (tokens per second)

  2. Track current tokens and last_refill_time

  3. On each request:

    • Refill tokens based on time elapsed:
      new_tokens = (now - last_refill) * refill_rate

    • Cap at max_tokens

    • If tokens >= 1, consume one and allow the request

    • Else, deny the request

All calculations use floating-point time, making it precise and fair.

Complete, Error-Free Python Implementation

import time
import threading
from typing import Callable, Any

class TokenBucket:
    def __init__(self, max_tokens: int, refill_rate: float):
        """
        Initialize a thread-safe token bucket.
        
        :param max_tokens: Maximum burst size (e.g., 20)
        :param refill_rate: Tokens added per second (e.g., 10.0)
        """
        if max_tokens <= 0 or refill_rate <= 0:
            raise ValueError("max_tokens and refill_rate must be positive")
            
        self.max_tokens = max_tokens
        self.refill_rate = refill_rate
        self.tokens = max_tokens
        self.last_refill = time.time()
        self.lock = threading.RLock()

    def _refill(self):
        """Refill tokens based on elapsed time"""
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate
        self.tokens = min(self.max_tokens, self.tokens + new_tokens)
        self.last_refill = now

    def consume(self, tokens: int = 1) -> bool:
        """
        Try to consume tokens. Returns True if allowed, False if rate-limited.
        Thread-safe.
        """
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

    def wait_and_consume(self, tokens: int = 1) -> None:
        """
        Block until enough tokens are available, then consume them.
        """
        while True:
            with self.lock:
                self._refill()
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return
            # Sleep briefly to avoid busy-waiting and yield to other threads
            time.sleep(0.01)

# ----------------------------------------------------------------------
# DEMONSTRATION CODE
# ----------------------------------------------------------------------

# Global bucket for threads to access
bucket = TokenBucket(max_tokens=10, refill_rate=5.0) # 10 tokens burst, 5 tokens/sec

def non_blocking_demo():
    """Demonstrates immediate consumption and rate limiting."""
    print("--- 1. Non-Blocking (consume) Demo ---")
    print(f"Bucket initialized: {bucket.max_tokens} burst, {bucket.refill_rate} / sec")

    # Burst attempt (should succeed)
    print("Attempting to burst 10 requests quickly:")
    success_count = 0
    fail_count = 0
    start_time = time.time()
    
    for i in range(15):
        if bucket.consume():
            success_count += 1
            print(f"[{time.time() - start_time:.2f}s] Request {i+1}:  Allowed. Tokens remaining: {bucket.tokens:.2f}")
        else:
            fail_count += 1
            print(f"[{time.time() - start_time:.2f}s] Request {i+1}:  Denied (Rate Limited).")
        
        # Add a tiny sleep to simulate request time, but fast enough to test burst
        time.sleep(0.05) 

    print(f"\nSummary: {success_count} allowed, {fail_count} denied.")
    print("Waiting 1.5 seconds to allow tokens to refill...")
    time.sleep(1.5)
    
    # Check refill
    if bucket.consume():
        print(f"\nAfter waiting:  Allowed. Bucket refilled by approximately 1.5s * 5 tokens/s = 7.5 tokens.")
    else:
        print("After waiting:  Still denied.")

def blocking_thread(thread_id, tokens_to_consume):
    """Function run by threads to demonstrate blocking consumption."""
    start_time = time.time()
    print(f"[Thread {thread_id}] Waiting to consume {tokens_to_consume} tokens...")
    
    # This call will block until tokens are available
    bucket.wait_and_consume(tokens_to_consume) 
    
    delay = time.time() - start_time
    print(f"[Thread {thread_id}]  Consumed {tokens_to_consume} tokens. Delay: {delay:.2f}s")


def blocking_demo():
    """Demonstrates blocking consumption using multiple threads."""
    print("\n\n--- 2. Blocking (wait_and_consume) Multi-Thread Demo ---")
    
    # Since the bucket starts at 10 tokens, the first few will be instant
    # The later ones will have to wait for the 5 tokens/sec refill rate
    threads = []
    
    # 5 threads, all consuming 3 tokens each (Total required: 15 tokens)
    for i in range(5): 
        t = threading.Thread(target=blocking_thread, args=(i, 3))
        threads.append(t)
        t.start()
        # Stagger the thread start slightly
        time.sleep(0.1)

    # Wait for all threads to complete their consumption
    for t in threads:
        t.join()
        
    print("\nAll blocking threads finished.")
    
def main():
    non_blocking_demo()
    blocking_demo()

if __name__ == "__main__":
    main()

Output

--- 1. Non-Blocking (consume) Demo ---
Bucket initialized: 10 burst, 5.0 / sec
Attempting to burst 10 requests quickly:
[0.00s] Request 1:  Allowed. Tokens remaining: 9.00
[0.05s] Request 2:  Allowed. Tokens remaining: 8.25
[0.10s] Request 3:  Allowed. Tokens remaining: 7.50
[0.15s] Request 4:  Allowed. Tokens remaining: 6.75
[0.20s] Request 5:  Allowed. Tokens remaining: 6.00
[0.25s] Request 6:  Allowed. Tokens remaining: 5.25
[0.30s] Request 7:  Allowed. Tokens remaining: 4.51
[0.35s] Request 8:  Allowed. Tokens remaining: 3.76
[0.40s] Request 9:  Allowed. Tokens remaining: 3.01
[0.45s] Request 10:  Allowed. Tokens remaining: 2.26
[0.50s] Request 11:  Allowed. Tokens remaining: 1.51
[0.55s] Request 12:  Allowed. Tokens remaining: 0.76
[0.60s] Request 13:  Allowed. Tokens remaining: 0.01
[0.65s] Request 14:  Denied (Rate Limited).
[0.70s] Request 15:  Denied (Rate Limited).

Summary: 13 allowed, 2 denied.
Waiting 1.5 seconds to allow tokens to refill...

After waiting:  Allowed. Bucket refilled by approximately 1.5s * 5 tokens/s = 7.5 tokens.


--- 2. Blocking (wait_and_consume) Multi-Thread Demo ---
[Thread 0] Waiting to consume 3 tokens...
[Thread 0]  Consumed 3 tokens. Delay: 0.00s
[Thread 1] Waiting to consume 3 tokens...
[Thread 1]  Consumed 3 tokens. Delay: 0.00s
[Thread 2] Waiting to consume 3 tokens...
[Thread 3] Waiting to consume 3 tokens...
[Thread 3]  Consumed 3 tokens. Delay: 0.05s
[Thread 4] Waiting to consume 3 tokens...
[Thread 4]  Consumed 3 tokens. Delay: 0.55s
[Thread 2]  Consumed 3 tokens. Delay: 1.35s

All blocking threads finished.

Best Practices for Production Use

  • Per-user buckets: Store one bucket per API key or user ID (use Redis for distributed systems)

  • Log rate limit hits: Monitor for abuse or misconfigured clients

  • Return standard headers: Include X-RateLimit-Limit, X-RateLimit-Remaining, and Retry-After

  • Use async for high throughput: In async frameworks (FastAPI, aiohttp), avoid blocking

  • Combine with global limits: Add a system-wide bucket to stop DDoS attacks

  • Start conservative: Begin with 100 requests/minute per user, then adjust based on usage

Conclusion

The token bucket algorithm gives you burst tolerance, fairness, and simplicity—making it ideal for public APIs, user-facing apps, and microservices. With just a few lines of thread-safe code, you can prevent outages, stop abuse, and ensure your service stays available when it matters most.

Whether you’re running a weather API during a hurricane or a startup preparing for launch, rate limiting isn’t optional—it’s essential. Implement it early, test it often, and sleep soundly knowing your system can handle the storm.