Intermediate

Multi-Provider Routing

Route requests to OpenAI, Anthropic, and local models with automatic fallback chains.

Provider Router

# src/router.py
import openai
import anthropic
import asyncio
import time
from typing import Dict, List, Optional
from dataclasses import dataclass, field

@dataclass
class ProviderConfig:
    name: str
    api_key: str
    models: List[str]
    priority: int = 1
    healthy: bool = True
    last_error: Optional[str] = None
    error_count: int = 0
    max_errors: int = 3
    cooldown_until: float = 0

class ProviderRouter:
    def __init__(self):
        self.providers: Dict[str, ProviderConfig] = {}
        self.model_to_provider: Dict[str, str] = {}
        self.clients = {}

    def add_provider(self, config: ProviderConfig):
        self.providers[config.name] = config
        for model in config.models:
            self.model_to_provider[model] = config.name

        if config.name == "openai":
            self.clients["openai"] = openai.AsyncOpenAI(api_key=config.api_key)
        elif config.name == "anthropic":
            self.clients["anthropic"] = anthropic.AsyncAnthropic(api_key=config.api_key)

    def get_fallback_chain(self, model: str) -> List[str]:
        """Get ordered list of providers to try."""
        primary = self.model_to_provider.get(model)
        chain = []
        if primary and self._is_healthy(primary):
            chain.append(primary)
        for name, cfg in sorted(self.providers.items(),
                                key=lambda x: x[1].priority):
            if name not in chain and self._is_healthy(name):
                chain.append(name)
        return chain

    def _is_healthy(self, provider_name: str) -> bool:
        cfg = self.providers[provider_name]
        if time.time() < cfg.cooldown_until:
            return False
        return cfg.healthy

    async def route(self, model: str, messages: list,
                    **kwargs) -> dict:
        """Route request through fallback chain."""
        chain = self.get_fallback_chain(model)
        if not chain:
            raise Exception("No healthy providers available")

        last_error = None
        for provider_name in chain:
            try:
                result = await self._call_provider(
                    provider_name, model, messages, **kwargs)
                self._mark_healthy(provider_name)
                return result
            except Exception as e:
                last_error = e
                self._mark_error(provider_name, str(e))

        raise Exception(f"All providers failed. Last: {last_error}")

    async def _call_provider(self, provider_name, model,
                             messages, **kwargs):
        if provider_name == "openai":
            response = await self.clients["openai"].chat.completions.create(
                model=model, messages=messages, **kwargs)
            return {
                "provider": "openai",
                "model": model,
                "content": response.choices[0].message.content,
                "usage": {
                    "prompt_tokens": response.usage.prompt_tokens,
                    "completion_tokens": response.usage.completion_tokens,
                },
            }
        elif provider_name == "anthropic":
            response = await self.clients["anthropic"].messages.create(
                model=model, messages=messages,
                max_tokens=kwargs.get("max_tokens", 1024))
            return {
                "provider": "anthropic",
                "model": model,
                "content": response.content[0].text,
                "usage": {
                    "prompt_tokens": response.usage.input_tokens,
                    "completion_tokens": response.usage.output_tokens,
                },
            }

    def _mark_healthy(self, name):
        self.providers[name].healthy = True
        self.providers[name].error_count = 0

    def _mark_error(self, name, error):
        cfg = self.providers[name]
        cfg.error_count += 1
        cfg.last_error = error
        if cfg.error_count >= cfg.max_errors:
            cfg.healthy = False
            cfg.cooldown_until = time.time() + 60

    def get_status(self):
        return {name: {"healthy": cfg.healthy, "errors": cfg.error_count,
                       "models": cfg.models}
                for name, cfg in self.providers.items()}
💡
Fallback strategy: The router tries the primary provider first, then falls back to alternatives in priority order. After 3 consecutive errors, a provider enters a 60-second cooldown.