Intermediate
Rate Limiting
Implement per-user, per-team, and token-based rate limits with Redis sliding windows.
Rate Limiter
# src/limiter.py
import redis.asyncio as redis
import time
import json
from typing import Optional
class RateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
self.limits = {}
def set_limit(self, key_prefix, requests_per_minute=60,
tokens_per_minute=100000):
self.limits[key_prefix] = {
"rpm": requests_per_minute,
"tpm": tokens_per_minute,
}
async def check(self, user_id: str, team_id: Optional[str] = None,
estimated_tokens: int = 0) -> dict:
"""Check if request is within rate limits."""
now = time.time()
window = 60 # 1-minute window
results = {}
# Check user limit
user_key = f"ratelimit:user:{user_id}"
user_ok = await self._check_window(
user_key, now, window,
self.limits.get("user", {}).get("rpm", 60))
results["user"] = user_ok
# Check team limit
if team_id:
team_key = f"ratelimit:team:{team_id}"
team_ok = await self._check_window(
team_key, now, window,
self.limits.get("team", {}).get("rpm", 300))
results["team"] = team_ok
# Check token limit
token_key = f"ratelimit:tokens:{user_id}"
token_ok = await self._check_tokens(
token_key, now, window, estimated_tokens,
self.limits.get("user", {}).get("tpm", 100000))
results["tokens"] = token_ok
allowed = all(r["allowed"] for r in results.values())
return {"allowed": allowed, "details": results}
async def _check_window(self, key, now, window, limit):
pipe = self.redis.pipeline()
pipe.zremrangebyscore(key, 0, now - window)
pipe.zadd(key, {str(now): now})
pipe.zcard(key)
pipe.expire(key, window * 2)
_, _, count, _ = await pipe.execute()
return {
"allowed": count <= limit,
"current": count,
"limit": limit,
"remaining": max(0, limit - count),
}
async def _check_tokens(self, key, now, window,
tokens, limit):
current = await self.redis.get(key)
current = int(current) if current else 0
new_total = current + tokens
if new_total > limit:
return {"allowed": False, "current": current,
"limit": limit, "remaining": max(0, limit-current)}
await self.redis.incrby(key, tokens)
await self.redis.expire(key, window)
return {"allowed": True, "current": new_total,
"limit": limit, "remaining": limit - new_total}
async def get_usage(self, user_id):
now = time.time()
user_key = f"ratelimit:user:{user_id}"
count = await self.redis.zcount(user_key, now - 60, now)
token_key = f"ratelimit:tokens:{user_id}"
tokens = await self.redis.get(token_key)
return {"requests_last_minute": count,
"tokens_last_minute": int(tokens) if tokens else 0}
Sliding window: Uses Redis sorted sets to track request timestamps. Old entries are removed automatically. This gives more accurate rate limiting than fixed windows.
Lilly Tech Systems