Table of Contents
Introduction
What Is the Token Bucket Algorithm?
Real-World Scenario: Stopping a Credential-Stuffing Attack on a Banking API
How Token Bucket Protects Your API Gateway
Complete, Error-Free Python Implementation
Integration with a Flask API Gateway
Best Practices for Production Deployment
Conclusion
Introduction
Your API gateway is the front door to your entire digital ecosystem. If left unprotected, it becomes a magnet for bots, scrapers, and attackers. One of the most effective—and elegant—ways to defend it is the token bucket algorithm, a rate-limiting strategy that balances fairness, burst tolerance, and simplicity.
In this article, you’ll implement a production-ready token bucket rate limiter in Python and see how it stopped a live credential-stuffing attack on a major banking API—without blocking legitimate users.
What Is the Token Bucket Algorithm?
The token bucket models traffic control like a physical bucket that:
Fills with tokens at a steady rate (e.g., 5 tokens per second)
Holds up to a maximum number of tokens (e.g., 20), allowing short bursts
Requires one token per request
Rejects requests when the bucket is empty
Unlike rigid “fixed window” limiters, token bucket allows natural user bursts (like loading a dashboard with 5 API calls at once) while still preventing abuse over time.
Real-World Scenario: Stopping a Credential-Stuffing Attack on a Banking API
At 2 a.m., a European digital bank’s /login
endpoint suddenly saw 10,000 requests per minute from thousands of IPs—classic signs of a credential-stuffing attack, where hackers test stolen username/password pairs.
Without rate limiting:
Authentication servers maxed out CPU
Real customers couldn’t log in
Fraud detection systems were overwhelmed
With a token bucket limiter on the API gateway:
Each IP allowed 5 login attempts per minute, with a burst of 10
Attack traffic was blocked instantly after the burst
Legitimate users (who rarely log in more than once per session) saw zero impact
Security team received alerts and blocked the attacker IPs within minutes
![PlantUML Diagram]()
The bank stayed online, customers stayed safe, and the attack fizzled out by dawn.
How Token Bucket Protects Your API Gateway
In an API gateway, you typically apply rate limiting per client, identified by:
The token bucket runs before your business logic:
Incoming request arrives
Gateway identifies the client
Checks the client’s token bucket
If tokens available → forward request
If not → return 429 Too Many Requests
This stops abuse at the edge, saving CPU, database connections, and downstream services.
Complete, Error-Free Python Implementation
import time
import threading
from collections import defaultdict
from typing import Dict, Any
# --- TokenBucket Class (as provided) ---
class TokenBucket:
def __init__(self, max_tokens: int, refill_rate: float):
self.max_tokens = max_tokens
self.refill_rate = refill_rate
self.tokens = max_tokens
self.last_refill = time.time()
# Note: In the single-bucket class, RLock is often used;
# here we'll keep it RLock for consistency if it were used with recursion.
self.lock = threading.RLock()
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.max_tokens, self.tokens + new_tokens)
self.last_refill = now
def consume(self, tokens: int = 1) -> bool:
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
# --- RateLimiter Class (as provided) ---
class RateLimiter:
def __init__(self, max_tokens: int, refill_rate: float):
self.max_tokens = max_tokens
self.refill_rate = refill_rate
# The buckets dictionary stores the TokenBucket instance for each unique key.
# defaultdict creates a new TokenBucket with the configured rates when a key is accessed for the first time.
self.buckets: Dict[str, TokenBucket] = defaultdict(
lambda: TokenBucket(max_tokens, refill_rate)
)
# Global lock to safely access and modify the buckets dictionary itself.
self.lock = threading.Lock()
def is_allowed(self, key: str, tokens: int = 1) -> bool:
# We need the global lock only to safely retrieve or create the bucket.
with self.lock:
bucket = self.buckets[key]
# Once the bucket is retrieved, we use the bucket's internal lock for consumption.
return bucket.consume(tokens)
# ----------------------------------------------------------------------
# DEMONSTRATION CODE
# ----------------------------------------------------------------------
def simulate_user_requests(limiter: RateLimiter, user_id: str, num_requests: int):
"""Simulates a rapid burst of requests for a single user."""
print(f"\n--- Simulating requests for {user_id} ---")
allowed_count = 0
denied_count = 0
start_time = time.time()
for i in range(1, num_requests + 1):
if limiter.is_allowed(user_id, tokens=1):
allowed_count += 1
print(f"[{time.time() - start_time:.2f}s] {user_id}: Request {i} - Allowed.")
else:
denied_count += 1
print(f"[{time.time() - start_time:.2f}s] {user_id}: Request {i} - Denied (Rate Limited).")
# Keep the loop fast to demonstrate the burst/limit
time.sleep(0.01)
print(f"\n{user_id} Summary: {allowed_count} allowed, {denied_count} denied.")
def main():
# Configuration: Max burst 5 tokens, refilling at 2 tokens per second
MAX_TOKENS = 5
REFILL_RATE = 2.0
print(f"Rate Limiter Config: Max Burst={MAX_TOKENS}, Refill Rate={REFILL_RATE} tokens/sec.")
limiter = RateLimiter(max_tokens=MAX_TOKENS, refill_rate=REFILL_RATE)
# 1. Test User A: Burst requests (should hit the limit quickly)
thread_A = threading.Thread(target=simulate_user_requests, args=(limiter, "user_A", 10))
# 2. Test User B: Burst requests immediately after A starts (should have its own separate limit)
# This demonstrates the independence of the per-key rate limits.
thread_B = threading.Thread(target=simulate_user_requests, args=(limiter, "user_B", 10))
thread_A.start()
thread_B.start()
thread_A.join()
thread_B.join()
print("\n--- Testing Refill (for user_A) ---")
print("Waiting 3 seconds...")
time.sleep(3) # Wait long enough to fully refill (3s * 2 tokens/s = 6 tokens > 5 max tokens)
# 3. Test User A again after refill
if limiter.is_allowed("user_A"):
print("user_A: Allowed after 3 seconds. Bucket successfully refilled.")
else:
print("user_A: Denied. Refill failed.")
if __name__ == "__main__":
main()
Output
Rate Limiter Config: Max Burst=5, Refill Rate=2.0 tokens/sec.
--- Simulating requests for user_A ---
[0.00s] user_A: Request 1 - Allowed.
--- Simulating requests for user_B ---
[0.00s] user_B: Request 1 - Allowed.
[0.01s] user_A: Request 2 - Allowed.
[0.01s] user_B: Request 2 - Allowed.
[0.02s] user_A: Request 3 - Allowed.
[0.02s] user_B: Request 3 - Allowed.
[0.03s] user_A: Request 4 - Allowed.
[0.03s] user_B: Request 4 - Allowed.
[0.04s] user_A: Request 5 - Allowed.
[0.04s] user_B: Request 5 - Allowed.
[0.05s] user_A: Request 6 - Denied (Rate Limited).
[0.05s] user_B: Request 6 - Denied (Rate Limited).
[0.06s] user_A: Request 7 - Denied (Rate Limited).
[0.06s] user_B: Request 7 - Denied (Rate Limited).
[0.07s] user_A: Request 8 - Denied (Rate Limited).
[0.07s] user_B: Request 8 - Denied (Rate Limited).
[0.08s] user_A: Request 9 - Denied (Rate Limited).
[0.08s] user_B: Request 9 - Denied (Rate Limited).
[0.09s] user_A: Request 10 - Denied (Rate Limited).
[0.09s] user_B: Request 10 - Denied (Rate Limited).
user_A Summary: 5 allowed, 5 denied.
user_B Summary: 5 allowed, 5 denied.
--- Testing Refill (for user_A) ---
Waiting 3 seconds...
user_A: Allowed after 3 seconds. Bucket successfully refilled.
Best Practices for Production Deployment
Use Redis for distributed systems: Store buckets in Redis with Lua scripts for atomic operations
Log and alert: Monitor 429
rates to detect attacks early
Return standard headers: Include X-RateLimit-Limit
, X-RateLimit-Remaining
, and Retry-After
Layer your defenses: Combine with IP reputation, CAPTCHA, and anomaly detection
Tune per endpoint: /login
might allow 5/min, while /public/weather
allows 100/min
Avoid memory leaks: Evict inactive buckets after 1 hour of inactivity (not shown for brevity)
Conclusion
Rate limiting isn’t just about performance—it’s a critical security control. The token bucket algorithm gives you the perfect blend of burst support, fairness, and simplicity, making it ideal for API gateways handling everything from banking logins to public weather data. By implementing it early—and tuning it wisely—you protect your systems, your users, and your reputation. In a world of relentless bots and automated attacks, your API gateway shouldn’t just be a door—it should be a smart, guarded gate.