Intermediate

Rate Limiting

Implement per-user, per-team, and token-based rate limits with Redis sliding windows.

Rate Limiter

# src/limiter.py
import redis.asyncio as redis
import time
import json
from typing import Optional

class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.limits = {}

    def set_limit(self, key_prefix, requests_per_minute=60,
                  tokens_per_minute=100000):
        self.limits[key_prefix] = {
            "rpm": requests_per_minute,
            "tpm": tokens_per_minute,
        }

    async def check(self, user_id: str, team_id: Optional[str] = None,
                    estimated_tokens: int = 0) -> dict:
        """Check if request is within rate limits."""
        now = time.time()
        window = 60  # 1-minute window
        results = {}

        # Check user limit
        user_key = f"ratelimit:user:{user_id}"
        user_ok = await self._check_window(
            user_key, now, window,
            self.limits.get("user", {}).get("rpm", 60))
        results["user"] = user_ok

        # Check team limit
        if team_id:
            team_key = f"ratelimit:team:{team_id}"
            team_ok = await self._check_window(
                team_key, now, window,
                self.limits.get("team", {}).get("rpm", 300))
            results["team"] = team_ok

        # Check token limit
        token_key = f"ratelimit:tokens:{user_id}"
        token_ok = await self._check_tokens(
            token_key, now, window, estimated_tokens,
            self.limits.get("user", {}).get("tpm", 100000))
        results["tokens"] = token_ok

        allowed = all(r["allowed"] for r in results.values())
        return {"allowed": allowed, "details": results}

    async def _check_window(self, key, now, window, limit):
        pipe = self.redis.pipeline()
        pipe.zremrangebyscore(key, 0, now - window)
        pipe.zadd(key, {str(now): now})
        pipe.zcard(key)
        pipe.expire(key, window * 2)
        _, _, count, _ = await pipe.execute()

        return {
            "allowed": count <= limit,
            "current": count,
            "limit": limit,
            "remaining": max(0, limit - count),
        }

    async def _check_tokens(self, key, now, window,
                            tokens, limit):
        current = await self.redis.get(key)
        current = int(current) if current else 0
        new_total = current + tokens

        if new_total > limit:
            return {"allowed": False, "current": current,
                    "limit": limit, "remaining": max(0, limit-current)}

        await self.redis.incrby(key, tokens)
        await self.redis.expire(key, window)
        return {"allowed": True, "current": new_total,
                "limit": limit, "remaining": limit - new_total}

    async def get_usage(self, user_id):
        now = time.time()
        user_key = f"ratelimit:user:{user_id}"
        count = await self.redis.zcount(user_key, now - 60, now)
        token_key = f"ratelimit:tokens:{user_id}"
        tokens = await self.redis.get(token_key)
        return {"requests_last_minute": count,
                "tokens_last_minute": int(tokens) if tokens else 0}
📦
Sliding window: Uses Redis sorted sets to track request timestamps. Old entries are removed automatically. This gives more accurate rate limiting than fixed windows.