Intermediate

Rate Limiting & Quota Management

Without rate limiting, one team's batch job can exhaust your entire organization's API quota in minutes. This lesson covers how to build a multi-level rate limiting system with Redis that enforces per-user, per-team, and per-app limits using both request counts and token budgets.

Why Token-Based Limits Matter

Traditional API rate limiting counts requests per second. For AI APIs, this is insufficient because one request can consume 100 tokens or 100,000 tokens:

Request TypeTokens UsedCost (GPT-4o)Requests/min OK?
Short chat reply ~500 $0.004 Yes, 60 RPM is fine
Document summarization ~30,000 $0.15 60 RPM = $9/min = $540/hr
Code generation with context ~100,000 $0.50 60 RPM = $30/min = $1800/hr

You need both request-per-minute (RPM) limits AND tokens-per-minute (TPM) limits.

Multi-Level Rate Limiting Architecture

Production gateways enforce limits at three levels simultaneously:

# Rate limit hierarchy (all checked on every request)
RATE_LIMITS = {
    # Level 1: Organization-wide (protects provider rate limits)
    "org": {
        "rpm": 10000,          # 10K requests/min across all teams
        "tpm": 2_000_000,      # 2M tokens/min org-wide
    },
    # Level 2: Per-team (fair allocation)
    "teams": {
        "frontend": {"rpm": 2000, "tpm": 500_000, "daily_budget_usd": 200},
        "backend":  {"rpm": 3000, "tpm": 800_000, "daily_budget_usd": 500},
        "data-sci": {"rpm": 1000, "tpm": 1_000_000, "daily_budget_usd": 1000},
        "internal": {"rpm": 500,  "tpm": 200_000, "daily_budget_usd": 50},
    },
    # Level 3: Per-app/key (prevents runaway scripts)
    "app_defaults": {
        "rpm": 100,            # Single app: 100 RPM
        "tpm": 100_000,        # Single app: 100K TPM
    }
}

Production Rate Limiter with Redis

Here is a complete sliding window rate limiter using Redis. It supports both RPM and TPM limits with atomic operations:

import time
import redis.asyncio as redis
from dataclasses import dataclass

@dataclass
class RateLimitResult:
    allowed: bool
    remaining_rpm: int
    remaining_tpm: int
    retry_after_seconds: float = 0
    limit_type: str = ""       # "rpm", "tpm", or "" if allowed


class TokenAwareRateLimiter:
    """
    Sliding window rate limiter with both RPM and TPM tracking.
    Uses Redis for distributed state across gateway instances.
    """

    # Lua script for atomic check-and-increment
    SLIDING_WINDOW_SCRIPT = """
    local key = KEYS[1]
    local window_ms = tonumber(ARGV[1])
    local limit = tonumber(ARGV[2])
    local now_ms = tonumber(ARGV[3])
    local increment = tonumber(ARGV[4])

    -- Remove entries outside the window
    redis.call('ZREMRANGEBYSCORE', key, 0, now_ms - window_ms)

    -- Get current count
    local current = redis.call('ZCARD', key)

    -- For token limits, sum the scores instead of counting entries
    if increment > 1 then
        local entries = redis.call('ZRANGE', key, 0, -1, 'WITHSCORES')
        current = 0
        for i = 2, #entries, 2 do
            current = current + tonumber(entries[i])
        end
    end

    if current + increment > limit then
        -- Over limit: return time until oldest entry expires
        local oldest = redis.call('ZRANGE', key, 0, 0, 'WITHSCORES')
        local retry_after = 0
        if #oldest > 0 then
            retry_after = (tonumber(oldest[2]) + window_ms - now_ms) / 1000
        end
        return {0, limit - current, retry_after}
    end

    -- Under limit: add entry and set expiry
    redis.call('ZADD', key, now_ms, now_ms .. ':' .. math.random(1000000))
    redis.call('PEXPIRE', key, window_ms)
    return {1, limit - current - increment, 0}
    """

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self._script_sha = None

    async def _ensure_script(self):
        if not self._script_sha:
            self._script_sha = await self.redis.script_load(
                self.SLIDING_WINDOW_SCRIPT
            )

    async def check_and_consume(
        self,
        app_id: str,
        team_id: str,
        org_id: str,
        estimated_tokens: int,
        limits: dict
    ) -> RateLimitResult:
        """
        Check rate limits at all three levels (app, team, org).
        Returns immediately if any level is exceeded.
        """
        await self._ensure_script()
        now_ms = int(time.time() * 1000)
        window_ms = 60_000  # 1-minute sliding window

        # Check all levels in order: org -> team -> app
        checks = [
            (f"rl:rpm:{org_id}", limits["org"]["rpm"], 1, "org_rpm"),
            (f"rl:tpm:{org_id}", limits["org"]["tpm"], estimated_tokens, "org_tpm"),
            (f"rl:rpm:{org_id}:{team_id}", limits["teams"][team_id]["rpm"], 1, "team_rpm"),
            (f"rl:tpm:{org_id}:{team_id}", limits["teams"][team_id]["tpm"], estimated_tokens, "team_tpm"),
            (f"rl:rpm:{org_id}:{team_id}:{app_id}", limits["app_defaults"]["rpm"], 1, "app_rpm"),
            (f"rl:tpm:{org_id}:{team_id}:{app_id}", limits["app_defaults"]["tpm"], estimated_tokens, "app_tpm"),
        ]

        for key, limit, increment, limit_type in checks:
            result = await self.redis.evalsha(
                self._script_sha, 1, key,
                str(window_ms), str(limit), str(now_ms), str(increment)
            )
            allowed, remaining, retry_after = result

            if not allowed:
                return RateLimitResult(
                    allowed=False,
                    remaining_rpm=0,
                    remaining_tpm=0,
                    retry_after_seconds=float(retry_after),
                    limit_type=limit_type,
                )

        return RateLimitResult(
            allowed=True,
            remaining_rpm=int(remaining),
            remaining_tpm=int(remaining),
        )

    async def get_usage(self, key_prefix: str) -> dict:
        """Get current usage for monitoring dashboards."""
        now_ms = int(time.time() * 1000)
        window_ms = 60_000

        rpm_key = f"rl:rpm:{key_prefix}"
        tpm_key = f"rl:tpm:{key_prefix}"

        pipe = self.redis.pipeline()
        pipe.zcount(rpm_key, now_ms - window_ms, now_ms)
        pipe.zcount(tpm_key, now_ms - window_ms, now_ms)
        rpm_count, tpm_count = await pipe.execute()

        return {"current_rpm": rpm_count, "current_tpm": tpm_count}
💡
Apply at work: Use Redis sorted sets with timestamps as scores for sliding window rate limiting. This avoids the boundary problem of fixed windows (where a burst at the window boundary can double your effective rate). The Lua script ensures atomicity even with multiple gateway instances.

Token Estimation

You need to estimate token count before sending the request to check TPM limits. Here is a fast estimator that works without calling the tokenizer API:

import tiktoken

# Load tokenizer once at startup (fast after first load)
_encoders = {}

def estimate_tokens(body: dict, model: str = "gpt-4o") -> int:
    """
    Estimate total tokens (input + expected output) for rate limiting.
    Uses tiktoken for accuracy, falls back to character-based estimate.
    """
    try:
        if model not in _encoders:
            _encoders[model] = tiktoken.encoding_for_model(model)
        enc = _encoders[model]

        # Count input tokens from messages
        input_tokens = 0
        for msg in body.get("messages", []):
            content = msg.get("content", "")
            if isinstance(content, str):
                input_tokens += len(enc.encode(content))
            elif isinstance(content, list):
                # Vision messages with image_url
                for part in content:
                    if part.get("type") == "text":
                        input_tokens += len(enc.encode(part["text"]))
                    elif part.get("type") == "image_url":
                        input_tokens += 765  # ~avg for high-detail image

        # Estimate output tokens
        max_tokens = body.get("max_tokens", 4096)
        # Assume 50% of max_tokens will actually be used
        estimated_output = min(max_tokens, max(input_tokens, 500)) // 2

        return input_tokens + estimated_output

    except Exception:
        # Fallback: ~4 chars per token for English
        text = str(body)
        return len(text) // 4 + body.get("max_tokens", 4096) // 2

Burst Handling and Priority Queues

When a team hits their rate limit, you have three options: reject, queue, or borrow:

from enum import Enum

class BurstPolicy(Enum):
    REJECT = "reject"        # Return 429 immediately
    QUEUE = "queue"          # Queue and process when capacity available
    BORROW = "borrow"        # Borrow from org-level unused capacity

async def handle_rate_limited_request(
    request: dict,
    team_id: str,
    result: RateLimitResult,
    policy: BurstPolicy = BurstPolicy.QUEUE
) -> dict:
    """Handle a request that exceeded rate limits."""

    if policy == BurstPolicy.REJECT:
        return {
            "error": {
                "type": "rate_limit_exceeded",
                "message": f"Rate limit exceeded: {result.limit_type}",
                "retry_after": result.retry_after_seconds,
            }
        }

    elif policy == BurstPolicy.QUEUE:
        # Add to priority queue (higher priority teams processed first)
        await request_queue.enqueue(
            request,
            priority=TEAM_PRIORITIES.get(team_id, 5),
            max_wait_seconds=30
        )
        # Worker processes queue as capacity becomes available
        return await request_queue.wait_for_result(request["id"], timeout=60)

    elif policy == BurstPolicy.BORROW:
        # Check if org has unused capacity from other teams
        org_usage = await rate_limiter.get_usage(f"org:{org_id}")
        org_limit = RATE_LIMITS["org"]["tpm"]
        available = org_limit - org_usage["current_tpm"]

        if available > request["estimated_tokens"]:
            # Borrow from org pool, log for chargeback
            await audit_log.record_burst(team_id, request["estimated_tokens"])
            return await execute_request(request)
        else:
            # Org pool also full, reject
            return {"error": {"type": "rate_limit_exceeded"}}

Quota Allocation Strategies

Static Allocation

Each team gets a fixed quota (e.g., 500K TPM). Simple to understand and budget. Downside: unused capacity is wasted when one team is idle while another needs more.

Proportional Allocation

Quotas proportional to team size or budget contribution. A 50-person team gets 5x the quota of a 10-person team. Fair but does not account for actual usage patterns.

Dynamic Rebalancing

Redistribute unused quotas in real-time. If Team A uses 20% of their quota, the excess flows to Team B. Requires monitoring and can be unpredictable for teams that need guaranteed capacity.

Priority Tiers

Guaranteed base quota plus burst capacity. Team gets 200K TPM guaranteed, can burst to 500K TPM when org capacity is available. Best balance of predictability and utilization.

📝
Production reality: Start with static allocation and add priority tiers later. Most organizations overestimate their rate limiting needs. Set initial limits generously (2x expected usage), monitor for 2 weeks, then tighten based on actual patterns.

Key Takeaways

  • AI APIs need both RPM and TPM limits — one large request can cost more than 100 small ones.
  • Use Redis sorted sets with sliding windows for distributed rate limiting across gateway instances.
  • Enforce limits at three levels: organization (provider protection), team (fair allocation), and app (runaway prevention).
  • Estimate tokens before sending requests using tiktoken, with a character-based fallback.
  • Start with static quotas at 2x expected usage, monitor for two weeks, then adjust based on real patterns.

What Is Next

In the next lesson, we will build cost control and budgeting — tracking real-time spend per request, setting department budgets with alerts, and implementing chargeback models so each team pays for what they use.