Advanced

Security & Compliance

Your AI gateway handles the most sensitive data in the organization — customer conversations, internal documents, proprietary code, and business logic embedded in prompts. This lesson builds the security infrastructure that prevents data leaks, enforces compliance, and creates the audit trail that regulators and security auditors require.

API Key Management

The gateway manages two types of keys: provider keys (OpenAI, Anthropic) stored in a vault, and gateway keys issued to each team. Applications never see provider keys:

import secrets
import hashlib
import time
from dataclasses import dataclass, field

@dataclass
class GatewayKey:
    key_id: str               # Stored, searchable (e.g., "key_a1b2c3d4")
    key_hash: str             # SHA-256 of the raw key (never store raw key)
    team_id: str
    app_name: str
    created_at: float
    expires_at: float         # Auto-expire in 90 days
    scopes: list[str]         # ["chat", "embeddings", "images"]
    allowed_models: list[str] # Empty = all models
    allowed_ips: list[str]    # Empty = all IPs
    is_active: bool = True


class KeyManager:
    """Issue, validate, and rotate gateway API keys."""

    def __init__(self, db):
        self.db = db

    def create_key(self, team_id: str, app_name: str,
                   scopes: list = None, ttl_days: int = 90) -> tuple[str, GatewayKey]:
        """Create a new gateway key. Raw key returned ONCE."""
        raw_key = f"gw-{secrets.token_urlsafe(32)}"
        key_record = GatewayKey(
            key_id=f"key_{secrets.token_hex(6)}",
            key_hash=hashlib.sha256(raw_key.encode()).hexdigest(),
            team_id=team_id,
            app_name=app_name,
            created_at=time.time(),
            expires_at=time.time() + (ttl_days * 86400),
            scopes=scopes or ["chat", "embeddings"],
            allowed_models=[],
            allowed_ips=[],
        )
        self.db.store(key_record)
        return raw_key, key_record

    def validate(self, raw_key: str, client_ip: str = None) -> GatewayKey:
        """Validate a key on every request. Returns None if invalid."""
        key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
        record = self.db.find_by_hash(key_hash)
        if not record or not record.is_active:
            return None
        if time.time() > record.expires_at:
            return None
        if record.allowed_ips and client_ip not in record.allowed_ips:
            return None
        return record

    def rotate(self, key_id: str) -> tuple[str, GatewayKey]:
        """Rotate: create new key, expire old in 24h (grace period)."""
        old = self.db.find_by_id(key_id)
        new_raw, new_record = self.create_key(
            old.team_id, old.app_name, old.scopes
        )
        old.expires_at = time.time() + 86400  # 24h grace
        self.db.update(old)
        return new_raw, new_record
💡
Apply at work: Store provider API keys in AWS Secrets Manager or HashiCorp Vault, never in environment variables on the gateway host. Set gateway keys to expire every 90 days with automated rotation reminders via Slack. The 24-hour grace period during rotation prevents service disruption.

PII Filtering

The gateway is the last checkpoint before data reaches an external API. PII filtering catches sensitive data that application code missed:

import re
from dataclasses import dataclass
from enum import Enum

class PIIAction(Enum):
    REDACT = "redact"   # Replace with placeholder
    BLOCK = "block"     # Reject entire request
    HASH = "hash"       # Replace with one-way hash
    WARN = "warn"       # Allow but log warning

@dataclass
class PIIRule:
    name: str
    pattern: str
    action: PIIAction
    replacement: str = "[REDACTED]"

class PIIFilter:
    """Detect and handle PII in LLM requests."""

    RULES = [
        PIIRule("ssn", r'\b\d{3}-\d{2}-\d{4}\b', PIIAction.BLOCK),
        PIIRule("credit_card", r'\b(?:\d{4}[-\s]?){3}\d{4}\b', PIIAction.BLOCK),
        PIIRule("email", r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b',
                PIIAction.REDACT, "[EMAIL]"),
        PIIRule("phone", r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b',
                PIIAction.REDACT, "[PHONE]"),
        PIIRule("aws_key", r'AKIA[0-9A-Z]{16}', PIIAction.BLOCK),
        PIIRule("api_key", r'(?:sk-|pk_live_|rk_live_)[a-zA-Z0-9]{20,}',
                PIIAction.BLOCK),
        PIIRule("ip_address", r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
                PIIAction.REDACT, "[IP]"),
    ]

    def __init__(self, extra_rules: list[PIIRule] = None):
        all_rules = self.RULES + (extra_rules or [])
        self._compiled = [(rule, re.compile(rule.pattern, re.I)) for rule in all_rules]

    def scan_and_scrub(self, body: dict) -> dict:
        """Scan all message content, redact or block as configured."""
        findings = []
        should_block = False

        for msg in body.get("messages", []):
            content = msg.get("content", "")
            if isinstance(content, str):
                scrubbed, msg_findings = self._process_text(content)
                msg["content"] = scrubbed
                findings.extend(msg_findings)
            elif isinstance(content, list):
                for part in content:
                    if part.get("type") == "text":
                        scrubbed, msg_findings = self._process_text(part["text"])
                        part["text"] = scrubbed
                        findings.extend(msg_findings)

        should_block = any(f["action"] == "block" for f in findings)
        body["_pii"] = {
            "findings": findings,
            "blocked": should_block,
            "scrubbed": len([f for f in findings if f["action"] == "redact"]) > 0,
        }
        return body

    def _process_text(self, text: str) -> tuple[str, list]:
        findings = []
        for rule, regex in self._compiled:
            matches = list(regex.finditer(text))
            for match in reversed(matches):
                findings.append({
                    "type": rule.name,
                    "action": rule.action.value,
                    "length": len(match.group()),
                })
                if rule.action == PIIAction.REDACT:
                    text = text[:match.start()] + rule.replacement + text[match.end():]
                elif rule.action == PIIAction.HASH:
                    hashed = hashlib.sha256(match.group().encode()).hexdigest()[:12]
                    text = text[:match.start()] + f"[{hashed}]" + text[match.end():]
        return text, findings

Audit Logging

Every gateway request generates an audit record. For SOC2 and HIPAA, audit logs must be immutable, timestamped, and retained for at least one year:

import json
import uuid
from datetime import datetime, timezone

@dataclass
class AuditRecord:
    """Immutable audit record for every gateway interaction."""
    audit_id: str
    timestamp: str             # ISO 8601 UTC
    event_type: str            # "llm_request", "key_created", "pii_blocked"
    team_id: str
    user_id: str
    app_name: str
    source_ip: str
    model: str
    provider: str
    input_tokens: int
    output_tokens: int
    cost_usd: float
    latency_ms: float
    status: str                # "success", "blocked", "rate_limited", "error"
    pii_detected: list[str]    # ["email", "phone"] or []
    was_cached: bool
    # Never log prompt content for sensitive workloads
    # Hash the prompt for correlation without exposing content
    prompt_hash: str


class AuditLogger:
    """Append-only audit logging for compliance."""

    def __init__(self, writers: list):
        """writers: list of output destinations (S3, CloudWatch, DB)."""
        self.writers = writers

    def log(self, **kwargs) -> AuditRecord:
        record = AuditRecord(
            audit_id=str(uuid.uuid4()),
            timestamp=datetime.now(timezone.utc).isoformat(),
            **kwargs
        )
        for writer in self.writers:
            writer.write(record)
        return record


# Production setup: write to multiple destinations
# - S3 with Object Lock (immutable, tamper-proof, cheap long-term storage)
# - CloudWatch/Datadog (real-time alerting and dashboards)
# - PostgreSQL (complex queries for investigations)

class S3AuditWriter:
    def write(self, record):
        key = f"audit/{record.timestamp[:10]}/{record.audit_id}.json"
        # s3.put_object(Bucket="audit-logs", Key=key,
        #               Body=json.dumps(record.__dict__),
        #               ObjectLockMode="COMPLIANCE", ObjectLockRetainUntilDate=...)

class PostgresAuditWriter:
    def write(self, record):
        # INSERT INTO audit_log (...) VALUES (...) - append only, no UPDATE/DELETE
        pass

Data Residency Routing

Route requests to provider endpoints in specific geographic regions based on where the data originates or where it must stay:

# Data residency rules
RESIDENCY_RULES = {
    "eu_users": {
        "condition": "user_region in ['EU', 'EEA', 'UK']",
        "allowed_providers": ["anthropic-eu", "azure-eu-west"],
        "reason": "GDPR: EU user data must stay in EU"
    },
    "healthcare": {
        "condition": "data_classification == 'PHI'",
        "allowed_providers": ["azure-us-hipaa"],  # BAA required
        "reason": "HIPAA: PHI can only go to BAA-covered providers"
    },
    "confidential": {
        "condition": "data_classification == 'confidential'",
        "allowed_providers": ["local-llm"],  # On-premise only
        "reason": "Company policy: confidential data stays on-premise"
    },
}

def apply_residency_rules(request: dict, user_metadata: dict) -> list[str]:
    """Filter providers based on data residency requirements."""
    allowed = None
    for rule_name, rule in RESIDENCY_RULES.items():
        if eval_condition(rule["condition"], user_metadata):
            rule_providers = set(rule["allowed_providers"])
            allowed = rule_providers if allowed is None else allowed & rule_providers
    return list(allowed) if allowed else None  # None = no restriction

Compliance Patterns Reference

RequirementSOC2HIPAAGateway Component
Access control CC6.1 164.312(a) Gateway keys with scopes, IP allowlists, key rotation
Audit trails CC7.2 164.312(b) Immutable audit log in S3 with Object Lock retention
Data minimization CC6.5 164.502(b) PII filter redacts unnecessary sensitive data before API call
Encryption in transit CC6.7 164.312(e) TLS 1.3 on all connections, mTLS for internal services
Encryption at rest CC6.7 164.312(a)(2)(iv) AES-256 for cache, logs, and configuration data
BAA with vendors N/A 164.308(b) Only route PHI to BAA-covered providers (Azure OpenAI)
📝
Production reality: Data residency is the primary reason enterprises build custom gateways. LiteLLM and Portkey support basic PII filtering, but most compliance teams require custom rules for their industry. Healthcare organizations should start with Azure OpenAI (HIPAA BAA available) and self-hosted models for the most sensitive data.

Key Takeaways

  • Applications get gateway keys, never provider keys. Store provider keys in a secrets vault, rotate gateway keys every 90 days.
  • PII filtering at the gateway catches data that application-level filtering misses. Block SSNs, credit cards, and API keys. Redact emails and phone numbers.
  • Audit logs must be immutable (S3 Object Lock), timestamped, and retained for 1+ year for SOC2 and HIPAA compliance.
  • Data residency routing is non-negotiable for GDPR and HIPAA. Route based on user region and data classification to compliant endpoints.
  • For HIPAA, only route PHI to providers with signed Business Associate Agreements (Azure OpenAI). Standard OpenAI and Anthropic APIs do not have BAAs.

What Is Next

In the next lesson, we will build caching and performance — exact match caching, semantic caching to catch paraphrased queries, response streaming through the gateway, and latency optimizations that reduce costs by 30-60%.