Rate Limiting & Quota Management
Without rate limiting, one team's batch job can exhaust your entire organization's API quota in minutes. This lesson covers how to build a multi-level rate limiting system with Redis that enforces per-user, per-team, and per-app limits using both request counts and token budgets.
Why Token-Based Limits Matter
Traditional API rate limiting counts requests per second. For AI APIs, this is insufficient because one request can consume 100 tokens or 100,000 tokens:
| Request Type | Tokens Used | Cost (GPT-4o) | Requests/min OK? |
|---|---|---|---|
| Short chat reply | ~500 | $0.004 | Yes, 60 RPM is fine |
| Document summarization | ~30,000 | $0.15 | 60 RPM = $9/min = $540/hr |
| Code generation with context | ~100,000 | $0.50 | 60 RPM = $30/min = $1800/hr |
You need both request-per-minute (RPM) limits AND tokens-per-minute (TPM) limits.
Multi-Level Rate Limiting Architecture
Production gateways enforce limits at three levels simultaneously:
# Rate limit hierarchy (all checked on every request)
RATE_LIMITS = {
# Level 1: Organization-wide (protects provider rate limits)
"org": {
"rpm": 10000, # 10K requests/min across all teams
"tpm": 2_000_000, # 2M tokens/min org-wide
},
# Level 2: Per-team (fair allocation)
"teams": {
"frontend": {"rpm": 2000, "tpm": 500_000, "daily_budget_usd": 200},
"backend": {"rpm": 3000, "tpm": 800_000, "daily_budget_usd": 500},
"data-sci": {"rpm": 1000, "tpm": 1_000_000, "daily_budget_usd": 1000},
"internal": {"rpm": 500, "tpm": 200_000, "daily_budget_usd": 50},
},
# Level 3: Per-app/key (prevents runaway scripts)
"app_defaults": {
"rpm": 100, # Single app: 100 RPM
"tpm": 100_000, # Single app: 100K TPM
}
}
Production Rate Limiter with Redis
Here is a complete sliding window rate limiter using Redis. It supports both RPM and TPM limits with atomic operations:
import time
import redis.asyncio as redis
from dataclasses import dataclass
@dataclass
class RateLimitResult:
allowed: bool
remaining_rpm: int
remaining_tpm: int
retry_after_seconds: float = 0
limit_type: str = "" # "rpm", "tpm", or "" if allowed
class TokenAwareRateLimiter:
"""
Sliding window rate limiter with both RPM and TPM tracking.
Uses Redis for distributed state across gateway instances.
"""
# Lua script for atomic check-and-increment
SLIDING_WINDOW_SCRIPT = """
local key = KEYS[1]
local window_ms = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local now_ms = tonumber(ARGV[3])
local increment = tonumber(ARGV[4])
-- Remove entries outside the window
redis.call('ZREMRANGEBYSCORE', key, 0, now_ms - window_ms)
-- Get current count
local current = redis.call('ZCARD', key)
-- For token limits, sum the scores instead of counting entries
if increment > 1 then
local entries = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES')
current = 0
for i = 2, #entries, 2 do
current = current + tonumber(entries[i])
end
end
if current + increment > limit then
-- Over limit: return time until oldest entry expires
local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
local retry_after = 0
if #oldest > 0 then
retry_after = (tonumber(oldest[2]) + window_ms - now_ms) / 1000
end
return {0, limit - current, retry_after}
end
-- Under limit: add entry and set expiry
redis.call('ZADD', key, now_ms, now_ms .. ':' .. math.random(1000000))
redis.call('PEXPIRE', key, window_ms)
return {1, limit - current - increment, 0}
"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self._script_sha = None
async def _ensure_script(self):
if not self._script_sha:
self._script_sha = await self.redis.script_load(
self.SLIDING_WINDOW_SCRIPT
)
async def check_and_consume(
self,
app_id: str,
team_id: str,
org_id: str,
estimated_tokens: int,
limits: dict
) -> RateLimitResult:
"""
Check rate limits at all three levels (app, team, org).
Returns immediately if any level is exceeded.
"""
await self._ensure_script()
now_ms = int(time.time() * 1000)
window_ms = 60_000 # 1-minute sliding window
# Check all levels in order: org -> team -> app
checks = [
(f"rl:rpm:{org_id}", limits["org"]["rpm"], 1, "org_rpm"),
(f"rl:tpm:{org_id}", limits["org"]["tpm"], estimated_tokens, "org_tpm"),
(f"rl:rpm:{org_id}:{team_id}", limits["teams"][team_id]["rpm"], 1, "team_rpm"),
(f"rl:tpm:{org_id}:{team_id}", limits["teams"][team_id]["tpm"], estimated_tokens, "team_tpm"),
(f"rl:rpm:{org_id}:{team_id}:{app_id}", limits["app_defaults"]["rpm"], 1, "app_rpm"),
(f"rl:tpm:{org_id}:{team_id}:{app_id}", limits["app_defaults"]["tpm"], estimated_tokens, "app_tpm"),
]
for key, limit, increment, limit_type in checks:
result = await self.redis.evalsha(
self._script_sha, 1, key,
str(window_ms), str(limit), str(now_ms), str(increment)
)
allowed, remaining, retry_after = result
if not allowed:
return RateLimitResult(
allowed=False,
remaining_rpm=0,
remaining_tpm=0,
retry_after_seconds=float(retry_after),
limit_type=limit_type,
)
return RateLimitResult(
allowed=True,
remaining_rpm=int(remaining),
remaining_tpm=int(remaining),
)
async def get_usage(self, key_prefix: str) -> dict:
"""Get current usage for monitoring dashboards."""
now_ms = int(time.time() * 1000)
window_ms = 60_000
rpm_key = f"rl:rpm:{key_prefix}"
tpm_key = f"rl:tpm:{key_prefix}"
pipe = self.redis.pipeline()
pipe.zcount(rpm_key, now_ms - window_ms, now_ms)
pipe.zcount(tpm_key, now_ms - window_ms, now_ms)
rpm_count, tpm_count = await pipe.execute()
return {"current_rpm": rpm_count, "current_tpm": tpm_count}
Token Estimation
You need to estimate token count before sending the request to check TPM limits. Here is a fast estimator that works without calling the tokenizer API:
import tiktoken
# Load tokenizer once at startup (fast after first load)
_encoders = {}
def estimate_tokens(body: dict, model: str = "gpt-4o") -> int:
"""
Estimate total tokens (input + expected output) for rate limiting.
Uses tiktoken for accuracy, falls back to character-based estimate.
"""
try:
if model not in _encoders:
_encoders[model] = tiktoken.encoding_for_model(model)
enc = _encoders[model]
# Count input tokens from messages
input_tokens = 0
for msg in body.get("messages", []):
content = msg.get("content", "")
if isinstance(content, str):
input_tokens += len(enc.encode(content))
elif isinstance(content, list):
# Vision messages with image_url
for part in content:
if part.get("type") == "text":
input_tokens += len(enc.encode(part["text"]))
elif part.get("type") == "image_url":
input_tokens += 765 # ~avg for high-detail image
# Estimate output tokens
max_tokens = body.get("max_tokens", 4096)
# Assume 50% of max_tokens will actually be used
estimated_output = min(max_tokens, max(input_tokens, 500)) // 2
return input_tokens + estimated_output
except Exception:
# Fallback: ~4 chars per token for English
text = str(body)
return len(text) // 4 + body.get("max_tokens", 4096) // 2
Burst Handling and Priority Queues
When a team hits their rate limit, you have three options: reject, queue, or borrow:
from enum import Enum
class BurstPolicy(Enum):
REJECT = "reject" # Return 429 immediately
QUEUE = "queue" # Queue and process when capacity available
BORROW = "borrow" # Borrow from org-level unused capacity
async def handle_rate_limited_request(
request: dict,
team_id: str,
result: RateLimitResult,
policy: BurstPolicy = BurstPolicy.QUEUE
) -> dict:
"""Handle a request that exceeded rate limits."""
if policy == BurstPolicy.REJECT:
return {
"error": {
"type": "rate_limit_exceeded",
"message": f"Rate limit exceeded: {result.limit_type}",
"retry_after": result.retry_after_seconds,
}
}
elif policy == BurstPolicy.QUEUE:
# Add to priority queue (higher priority teams processed first)
await request_queue.enqueue(
request,
priority=TEAM_PRIORITIES.get(team_id, 5),
max_wait_seconds=30
)
# Worker processes queue as capacity becomes available
return await request_queue.wait_for_result(request["id"], timeout=60)
elif policy == BurstPolicy.BORROW:
# Check if org has unused capacity from other teams
org_usage = await rate_limiter.get_usage(f"org:{org_id}")
org_limit = RATE_LIMITS["org"]["tpm"]
available = org_limit - org_usage["current_tpm"]
if available > request["estimated_tokens"]:
# Borrow from org pool, log for chargeback
await audit_log.record_burst(team_id, request["estimated_tokens"])
return await execute_request(request)
else:
# Org pool also full, reject
return {"error": {"type": "rate_limit_exceeded"}}
Quota Allocation Strategies
Static Allocation
Each team gets a fixed quota (e.g., 500K TPM). Simple to understand and budget. Downside: unused capacity is wasted when one team is idle while another needs more.
Proportional Allocation
Quotas proportional to team size or budget contribution. A 50-person team gets 5x the quota of a 10-person team. Fair but does not account for actual usage patterns.
Dynamic Rebalancing
Redistribute unused quotas in real-time. If Team A uses 20% of their quota, the excess flows to Team B. Requires monitoring and can be unpredictable for teams that need guaranteed capacity.
Priority Tiers
Guaranteed base quota plus burst capacity. Team gets 200K TPM guaranteed, can burst to 500K TPM when org capacity is available. Best balance of predictability and utilization.
Key Takeaways
- AI APIs need both RPM and TPM limits — one large request can cost more than 100 small ones.
- Use Redis sorted sets with sliding windows for distributed rate limiting across gateway instances.
- Enforce limits at three levels: organization (provider protection), team (fair allocation), and app (runaway prevention).
- Estimate tokens before sending requests using tiktoken, with a character-based fallback.
- Start with static quotas at 2x expected usage, monitor for two weeks, then adjust based on real patterns.
What Is Next
In the next lesson, we will build cost control and budgeting — tracking real-time spend per request, setting department budgets with alerts, and implementing chargeback models so each team pays for what they use.