Intermediate
Multi-Provider Routing
Route requests to OpenAI, Anthropic, and local models with automatic fallback chains.
Provider Router
# src/router.py
import openai
import anthropic
import asyncio
import time
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class ProviderConfig:
name: str
api_key: str
models: List[str]
priority: int = 1
healthy: bool = True
last_error: Optional[str] = None
error_count: int = 0
max_errors: int = 3
cooldown_until: float = 0
class ProviderRouter:
def __init__(self):
self.providers: Dict[str, ProviderConfig] = {}
self.model_to_provider: Dict[str, str] = {}
self.clients = {}
def add_provider(self, config: ProviderConfig):
self.providers[config.name] = config
for model in config.models:
self.model_to_provider[model] = config.name
if config.name == "openai":
self.clients["openai"] = openai.AsyncOpenAI(api_key=config.api_key)
elif config.name == "anthropic":
self.clients["anthropic"] = anthropic.AsyncAnthropic(api_key=config.api_key)
def get_fallback_chain(self, model: str) -> List[str]:
"""Get ordered list of providers to try."""
primary = self.model_to_provider.get(model)
chain = []
if primary and self._is_healthy(primary):
chain.append(primary)
for name, cfg in sorted(self.providers.items(),
key=lambda x: x[1].priority):
if name not in chain and self._is_healthy(name):
chain.append(name)
return chain
def _is_healthy(self, provider_name: str) -> bool:
cfg = self.providers[provider_name]
if time.time() < cfg.cooldown_until:
return False
return cfg.healthy
async def route(self, model: str, messages: list,
**kwargs) -> dict:
"""Route request through fallback chain."""
chain = self.get_fallback_chain(model)
if not chain:
raise Exception("No healthy providers available")
last_error = None
for provider_name in chain:
try:
result = await self._call_provider(
provider_name, model, messages, **kwargs)
self._mark_healthy(provider_name)
return result
except Exception as e:
last_error = e
self._mark_error(provider_name, str(e))
raise Exception(f"All providers failed. Last: {last_error}")
async def _call_provider(self, provider_name, model,
messages, **kwargs):
if provider_name == "openai":
response = await self.clients["openai"].chat.completions.create(
model=model, messages=messages, **kwargs)
return {
"provider": "openai",
"model": model,
"content": response.choices[0].message.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
},
}
elif provider_name == "anthropic":
response = await self.clients["anthropic"].messages.create(
model=model, messages=messages,
max_tokens=kwargs.get("max_tokens", 1024))
return {
"provider": "anthropic",
"model": model,
"content": response.content[0].text,
"usage": {
"prompt_tokens": response.usage.input_tokens,
"completion_tokens": response.usage.output_tokens,
},
}
def _mark_healthy(self, name):
self.providers[name].healthy = True
self.providers[name].error_count = 0
def _mark_error(self, name, error):
cfg = self.providers[name]
cfg.error_count += 1
cfg.last_error = error
if cfg.error_count >= cfg.max_errors:
cfg.healthy = False
cfg.cooldown_until = time.time() + 60
def get_status(self):
return {name: {"healthy": cfg.healthy, "errors": cfg.error_count,
"models": cfg.models}
for name, cfg in self.providers.items()}
Fallback strategy: The router tries the primary provider first, then falls back to alternatives in priority order. After 3 consecutive errors, a provider enters a 60-second cooldown.
Lilly Tech Systems