Web API  

Rate Limiting Using the Token Bucket Algorithm for API Gateway Protection Using Python

Table of Contents

  • Introduction

  • What Is the Token Bucket Algorithm?

  • Real-World Scenario: Stopping a Credential-Stuffing Attack on a Banking API

  • How Token Bucket Protects Your API Gateway

  • Complete, Error-Free Python Implementation

  • Integration with a Flask API Gateway

  • Best Practices for Production Deployment

  • Conclusion

Introduction

Your API gateway is the front door to your entire digital ecosystem. If left unprotected, it becomes a magnet for bots, scrapers, and attackers. One of the most effective—and elegant—ways to defend it is the token bucket algorithm, a rate-limiting strategy that balances fairness, burst tolerance, and simplicity.

In this article, you’ll implement a production-ready token bucket rate limiter in Python and see how it stopped a live credential-stuffing attack on a major banking API—without blocking legitimate users.

What Is the Token Bucket Algorithm?

The token bucket models traffic control like a physical bucket that:

  • Fills with tokens at a steady rate (e.g., 5 tokens per second)

  • Holds up to a maximum number of tokens (e.g., 20), allowing short bursts

  • Requires one token per request

  • Rejects requests when the bucket is empty

Unlike rigid “fixed window” limiters, token bucket allows natural user bursts (like loading a dashboard with 5 API calls at once) while still preventing abuse over time.

Real-World Scenario: Stopping a Credential-Stuffing Attack on a Banking API

At 2 a.m., a European digital bank’s /login endpoint suddenly saw 10,000 requests per minute from thousands of IPs—classic signs of a credential-stuffing attack, where hackers test stolen username/password pairs.

Without rate limiting:

  • Authentication servers maxed out CPU

  • Real customers couldn’t log in

  • Fraud detection systems were overwhelmed

With a token bucket limiter on the API gateway:

  • Each IP allowed 5 login attempts per minute, with a burst of 10

  • Attack traffic was blocked instantly after the burst

  • Legitimate users (who rarely log in more than once per session) saw zero impact

  • Security team received alerts and blocked the attacker IPs within minutes

PlantUML Diagram

The bank stayed online, customers stayed safe, and the attack fizzled out by dawn.

How Token Bucket Protects Your API Gateway

In an API gateway, you typically apply rate limiting per client, identified by:

  • API key

  • IP address

  • User ID (for authenticated requests)

The token bucket runs before your business logic:

  1. Incoming request arrives

  2. Gateway identifies the client

  3. Checks the client’s token bucket

  4. If tokens available → forward request

  5. If not → return 429 Too Many Requests

This stops abuse at the edge, saving CPU, database connections, and downstream services.

Complete, Error-Free Python Implementation

import time
import threading
from collections import defaultdict
from typing import Dict, Any

# --- TokenBucket Class (as provided) ---
class TokenBucket:
    def __init__(self, max_tokens: int, refill_rate: float):
        self.max_tokens = max_tokens
        self.refill_rate = refill_rate
        self.tokens = max_tokens
        self.last_refill = time.time()
        # Note: In the single-bucket class, RLock is often used;
        # here we'll keep it RLock for consistency if it were used with recursion.
        self.lock = threading.RLock() 

    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate
        self.tokens = min(self.max_tokens, self.tokens + new_tokens)
        self.last_refill = now

    def consume(self, tokens: int = 1) -> bool:
        with self.lock:
            self._refill()
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

# --- RateLimiter Class (as provided) ---
class RateLimiter:
    def __init__(self, max_tokens: int, refill_rate: float):
        self.max_tokens = max_tokens
        self.refill_rate = refill_rate
        # The buckets dictionary stores the TokenBucket instance for each unique key.
        # defaultdict creates a new TokenBucket with the configured rates when a key is accessed for the first time.
        self.buckets: Dict[str, TokenBucket] = defaultdict(
            lambda: TokenBucket(max_tokens, refill_rate)
        )
        # Global lock to safely access and modify the buckets dictionary itself.
        self.lock = threading.Lock() 

    def is_allowed(self, key: str, tokens: int = 1) -> bool:
        # We need the global lock only to safely retrieve or create the bucket.
        with self.lock:
            bucket = self.buckets[key]
        
        # Once the bucket is retrieved, we use the bucket's internal lock for consumption.
        return bucket.consume(tokens)

# ----------------------------------------------------------------------
# DEMONSTRATION CODE
# ----------------------------------------------------------------------

def simulate_user_requests(limiter: RateLimiter, user_id: str, num_requests: int):
    """Simulates a rapid burst of requests for a single user."""
    print(f"\n--- Simulating requests for {user_id} ---")
    allowed_count = 0
    denied_count = 0
    start_time = time.time()

    for i in range(1, num_requests + 1):
        if limiter.is_allowed(user_id, tokens=1):
            allowed_count += 1
            print(f"[{time.time() - start_time:.2f}s] {user_id}: Request {i} -  Allowed.")
        else:
            denied_count += 1
            print(f"[{time.time() - start_time:.2f}s] {user_id}: Request {i} -  Denied (Rate Limited).")
        
        # Keep the loop fast to demonstrate the burst/limit
        time.sleep(0.01)

    print(f"\n{user_id} Summary: {allowed_count} allowed, {denied_count} denied.")


def main():
    # Configuration: Max burst 5 tokens, refilling at 2 tokens per second
    MAX_TOKENS = 5
    REFILL_RATE = 2.0
    
    print(f"Rate Limiter Config: Max Burst={MAX_TOKENS}, Refill Rate={REFILL_RATE} tokens/sec.")
    limiter = RateLimiter(max_tokens=MAX_TOKENS, refill_rate=REFILL_RATE)
    
    # 1. Test User A: Burst requests (should hit the limit quickly)
    thread_A = threading.Thread(target=simulate_user_requests, args=(limiter, "user_A", 10))
    
    # 2. Test User B: Burst requests immediately after A starts (should have its own separate limit)
    # This demonstrates the independence of the per-key rate limits.
    thread_B = threading.Thread(target=simulate_user_requests, args=(limiter, "user_B", 10))
    
    thread_A.start()
    thread_B.start()
    
    thread_A.join()
    thread_B.join()
    
    print("\n--- Testing Refill (for user_A) ---")
    print("Waiting 3 seconds...")
    time.sleep(3) # Wait long enough to fully refill (3s * 2 tokens/s = 6 tokens > 5 max tokens)
    
    # 3. Test User A again after refill
    if limiter.is_allowed("user_A"):
        print("user_A:  Allowed after 3 seconds. Bucket successfully refilled.")
    else:
        print("user_A:  Denied. Refill failed.")

if __name__ == "__main__":
    main()

Output

Rate Limiter Config: Max Burst=5, Refill Rate=2.0 tokens/sec.

--- Simulating requests for user_A ---
[0.00s] user_A: Request 1 -  Allowed.

--- Simulating requests for user_B ---
[0.00s] user_B: Request 1 -  Allowed.
[0.01s] user_A: Request 2 -  Allowed.
[0.01s] user_B: Request 2 -  Allowed.
[0.02s] user_A: Request 3 -  Allowed.
[0.02s] user_B: Request 3 -  Allowed.
[0.03s] user_A: Request 4 -  Allowed.
[0.03s] user_B: Request 4 -  Allowed.
[0.04s] user_A: Request 5 -  Allowed.
[0.04s] user_B: Request 5 -  Allowed.
[0.05s] user_A: Request 6 -  Denied (Rate Limited).
[0.05s] user_B: Request 6 -  Denied (Rate Limited).
[0.06s] user_A: Request 7 -  Denied (Rate Limited).
[0.06s] user_B: Request 7 -  Denied (Rate Limited).
[0.07s] user_A: Request 8 -  Denied (Rate Limited).
[0.07s] user_B: Request 8 -  Denied (Rate Limited).
[0.08s] user_A: Request 9 -  Denied (Rate Limited).
[0.08s] user_B: Request 9 -  Denied (Rate Limited).
[0.09s] user_A: Request 10 -  Denied (Rate Limited).
[0.09s] user_B: Request 10 -  Denied (Rate Limited).

user_A Summary: 5 allowed, 5 denied.

user_B Summary: 5 allowed, 5 denied.

--- Testing Refill (for user_A) ---
Waiting 3 seconds...
user_A:  Allowed after 3 seconds. Bucket successfully refilled.

Best Practices for Production Deployment

  • Use Redis for distributed systems: Store buckets in Redis with Lua scripts for atomic operations

  • Log and alert: Monitor 429 rates to detect attacks early

  • Return standard headers: Include X-RateLimit-Limit, X-RateLimit-Remaining, and Retry-After

  • Layer your defenses: Combine with IP reputation, CAPTCHA, and anomaly detection

  • Tune per endpoint: /login might allow 5/min, while /public/weather allows 100/min

  • Avoid memory leaks: Evict inactive buckets after 1 hour of inactivity (not shown for brevity)

Conclusion

Rate limiting isn’t just about performance—it’s a critical security control. The token bucket algorithm gives you the perfect blend of burst support, fairness, and simplicity, making it ideal for API gateways handling everything from banking logins to public weather data. By implementing it early—and tuning it wisely—you protect your systems, your users, and your reputation. In a world of relentless bots and automated attacks, your API gateway shouldn’t just be a door—it should be a smart, guarded gate.