Security & Compliance
Your AI gateway handles the most sensitive data in the organization — customer conversations, internal documents, proprietary code, and business logic embedded in prompts. This lesson builds the security infrastructure that prevents data leaks, enforces compliance, and creates the audit trail that regulators and security auditors require.
API Key Management
The gateway manages two types of keys: provider keys (OpenAI, Anthropic) stored in a vault, and gateway keys issued to each team. Applications never see provider keys:
import secrets
import hashlib
import time
from dataclasses import dataclass, field
@dataclass
class GatewayKey:
key_id: str # Stored, searchable (e.g., "key_a1b2c3d4")
key_hash: str # SHA-256 of the raw key (never store raw key)
team_id: str
app_name: str
created_at: float
expires_at: float # Auto-expire in 90 days
scopes: list[str] # ["chat", "embeddings", "images"]
allowed_models: list[str] # Empty = all models
allowed_ips: list[str] # Empty = all IPs
is_active: bool = True
class KeyManager:
"""Issue, validate, and rotate gateway API keys."""
def __init__(self, db):
self.db = db
def create_key(self, team_id: str, app_name: str,
scopes: list = None, ttl_days: int = 90) -> tuple[str, GatewayKey]:
"""Create a new gateway key. Raw key returned ONCE."""
raw_key = f"gw-{secrets.token_urlsafe(32)}"
key_record = GatewayKey(
key_id=f"key_{secrets.token_hex(6)}",
key_hash=hashlib.sha256(raw_key.encode()).hexdigest(),
team_id=team_id,
app_name=app_name,
created_at=time.time(),
expires_at=time.time() + (ttl_days * 86400),
scopes=scopes or ["chat", "embeddings"],
allowed_models=[],
allowed_ips=[],
)
self.db.store(key_record)
return raw_key, key_record
def validate(self, raw_key: str, client_ip: str = None) -> GatewayKey:
"""Validate a key on every request. Returns None if invalid."""
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
record = self.db.find_by_hash(key_hash)
if not record or not record.is_active:
return None
if time.time() > record.expires_at:
return None
if record.allowed_ips and client_ip not in record.allowed_ips:
return None
return record
def rotate(self, key_id: str) -> tuple[str, GatewayKey]:
"""Rotate: create new key, expire old in 24h (grace period)."""
old = self.db.find_by_id(key_id)
new_raw, new_record = self.create_key(
old.team_id, old.app_name, old.scopes
)
old.expires_at = time.time() + 86400 # 24h grace
self.db.update(old)
return new_raw, new_record
PII Filtering
The gateway is the last checkpoint before data reaches an external API. PII filtering catches sensitive data that application code missed:
import re
from dataclasses import dataclass
from enum import Enum
class PIIAction(Enum):
REDACT = "redact" # Replace with placeholder
BLOCK = "block" # Reject entire request
HASH = "hash" # Replace with one-way hash
WARN = "warn" # Allow but log warning
@dataclass
class PIIRule:
name: str
pattern: str
action: PIIAction
replacement: str = "[REDACTED]"
class PIIFilter:
"""Detect and handle PII in LLM requests."""
RULES = [
PIIRule("ssn", r'\b\d{3}-\d{2}-\d{4}\b', PIIAction.BLOCK),
PIIRule("credit_card", r'\b(?:\d{4}[-\s]?){3}\d{4}\b', PIIAction.BLOCK),
PIIRule("email", r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b',
PIIAction.REDACT, "[EMAIL]"),
PIIRule("phone", r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b',
PIIAction.REDACT, "[PHONE]"),
PIIRule("aws_key", r'AKIA[0-9A-Z]{16}', PIIAction.BLOCK),
PIIRule("api_key", r'(?:sk-|pk_live_|rk_live_)[a-zA-Z0-9]{20,}',
PIIAction.BLOCK),
PIIRule("ip_address", r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
PIIAction.REDACT, "[IP]"),
]
def __init__(self, extra_rules: list[PIIRule] = None):
all_rules = self.RULES + (extra_rules or [])
self._compiled = [(rule, re.compile(rule.pattern, re.I)) for rule in all_rules]
def scan_and_scrub(self, body: dict) -> dict:
"""Scan all message content, redact or block as configured."""
findings = []
should_block = False
for msg in body.get("messages", []):
content = msg.get("content", "")
if isinstance(content, str):
scrubbed, msg_findings = self._process_text(content)
msg["content"] = scrubbed
findings.extend(msg_findings)
elif isinstance(content, list):
for part in content:
if part.get("type") == "text":
scrubbed, msg_findings = self._process_text(part["text"])
part["text"] = scrubbed
findings.extend(msg_findings)
should_block = any(f["action"] == "block" for f in findings)
body["_pii"] = {
"findings": findings,
"blocked": should_block,
"scrubbed": len([f for f in findings if f["action"] == "redact"]) > 0,
}
return body
def _process_text(self, text: str) -> tuple[str, list]:
findings = []
for rule, regex in self._compiled:
matches = list(regex.finditer(text))
for match in reversed(matches):
findings.append({
"type": rule.name,
"action": rule.action.value,
"length": len(match.group()),
})
if rule.action == PIIAction.REDACT:
text = text[:match.start()] + rule.replacement + text[match.end():]
elif rule.action == PIIAction.HASH:
hashed = hashlib.sha256(match.group().encode()).hexdigest()[:12]
text = text[:match.start()] + f"[{hashed}]" + text[match.end():]
return text, findings
Audit Logging
Every gateway request generates an audit record. For SOC2 and HIPAA, audit logs must be immutable, timestamped, and retained for at least one year:
import json
import uuid
from datetime import datetime, timezone
@dataclass
class AuditRecord:
"""Immutable audit record for every gateway interaction."""
audit_id: str
timestamp: str # ISO 8601 UTC
event_type: str # "llm_request", "key_created", "pii_blocked"
team_id: str
user_id: str
app_name: str
source_ip: str
model: str
provider: str
input_tokens: int
output_tokens: int
cost_usd: float
latency_ms: float
status: str # "success", "blocked", "rate_limited", "error"
pii_detected: list[str] # ["email", "phone"] or []
was_cached: bool
# Never log prompt content for sensitive workloads
# Hash the prompt for correlation without exposing content
prompt_hash: str
class AuditLogger:
"""Append-only audit logging for compliance."""
def __init__(self, writers: list):
"""writers: list of output destinations (S3, CloudWatch, DB)."""
self.writers = writers
def log(self, **kwargs) -> AuditRecord:
record = AuditRecord(
audit_id=str(uuid.uuid4()),
timestamp=datetime.now(timezone.utc).isoformat(),
**kwargs
)
for writer in self.writers:
writer.write(record)
return record
# Production setup: write to multiple destinations
# - S3 with Object Lock (immutable, tamper-proof, cheap long-term storage)
# - CloudWatch/Datadog (real-time alerting and dashboards)
# - PostgreSQL (complex queries for investigations)
class S3AuditWriter:
def write(self, record):
key = f"audit/{record.timestamp[:10]}/{record.audit_id}.json"
# s3.put_object(Bucket="audit-logs", Key=key,
# Body=json.dumps(record.__dict__),
# ObjectLockMode="COMPLIANCE", ObjectLockRetainUntilDate=...)
class PostgresAuditWriter:
def write(self, record):
# INSERT INTO audit_log (...) VALUES (...) - append only, no UPDATE/DELETE
pass
Data Residency Routing
Route requests to provider endpoints in specific geographic regions based on where the data originates or where it must stay:
# Data residency rules
RESIDENCY_RULES = {
"eu_users": {
"condition": "user_region in ['EU', 'EEA', 'UK']",
"allowed_providers": ["anthropic-eu", "azure-eu-west"],
"reason": "GDPR: EU user data must stay in EU"
},
"healthcare": {
"condition": "data_classification == 'PHI'",
"allowed_providers": ["azure-us-hipaa"], # BAA required
"reason": "HIPAA: PHI can only go to BAA-covered providers"
},
"confidential": {
"condition": "data_classification == 'confidential'",
"allowed_providers": ["local-llm"], # On-premise only
"reason": "Company policy: confidential data stays on-premise"
},
}
def apply_residency_rules(request: dict, user_metadata: dict) -> list[str]:
"""Filter providers based on data residency requirements."""
allowed = None
for rule_name, rule in RESIDENCY_RULES.items():
if eval_condition(rule["condition"], user_metadata):
rule_providers = set(rule["allowed_providers"])
allowed = rule_providers if allowed is None else allowed & rule_providers
return list(allowed) if allowed else None # None = no restriction
Compliance Patterns Reference
| Requirement | SOC2 | HIPAA | Gateway Component |
|---|---|---|---|
| Access control | CC6.1 | 164.312(a) | Gateway keys with scopes, IP allowlists, key rotation |
| Audit trails | CC7.2 | 164.312(b) | Immutable audit log in S3 with Object Lock retention |
| Data minimization | CC6.5 | 164.502(b) | PII filter redacts unnecessary sensitive data before API call |
| Encryption in transit | CC6.7 | 164.312(e) | TLS 1.3 on all connections, mTLS for internal services |
| Encryption at rest | CC6.7 | 164.312(a)(2)(iv) | AES-256 for cache, logs, and configuration data |
| BAA with vendors | N/A | 164.308(b) | Only route PHI to BAA-covered providers (Azure OpenAI) |
Key Takeaways
- Applications get gateway keys, never provider keys. Store provider keys in a secrets vault, rotate gateway keys every 90 days.
- PII filtering at the gateway catches data that application-level filtering misses. Block SSNs, credit cards, and API keys. Redact emails and phone numbers.
- Audit logs must be immutable (S3 Object Lock), timestamped, and retained for 1+ year for SOC2 and HIPAA compliance.
- Data residency routing is non-negotiable for GDPR and HIPAA. Route based on user region and data classification to compliant endpoints.
- For HIPAA, only route PHI to providers with signed Business Associate Agreements (Azure OpenAI). Standard OpenAI and Anthropic APIs do not have BAAs.
What Is Next
In the next lesson, we will build caching and performance — exact match caching, semantic caching to catch paraphrased queries, response streaming through the gateway, and latency optimizations that reduce costs by 30-60%.