| 4 min read

AI Pipeline Error Handling: Retry Logic and Graceful Degradation

error handling AI pipeline production reliability Python resilience

AI Pipelines Fail in Unique Ways

Traditional software fails predictably: network timeout, database connection lost, out of memory. AI pipelines fail in all those ways plus a host of new ones: rate limits, content filters, model hallucinations, nondeterministic outputs that break downstream parsers. If you build an AI pipeline without robust error handling, it will fail in production within the first week.

Here are the patterns I use to build AI pipelines that recover gracefully from every failure mode I have encountered.

Pattern 1: Exponential Backoff with Jitter

Every API call to an LLM provider needs retry logic. Rate limits are a fact of life, and the correct response is to back off and retry.

import asyncio
import random
from functools import wraps

def retry_with_backoff(
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    retryable_exceptions: tuple = (RateLimitError, TimeoutError, ConnectionError)
):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return await func(*args, **kwargs)
                except retryable_exceptions as e:
                    if attempt == max_retries - 1:
                        raise
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    jitter = random.uniform(0, delay * 0.1)
                    await asyncio.sleep(delay + jitter)
                    logger.warning(
                        f"Retry {attempt + 1}/{max_retries} for {func.__name__}: {e}"
                    )
            return await func(*args, **kwargs)
        return wrapper
    return decorator

@retry_with_backoff(max_retries=5)
async def call_llm(prompt: str) -> str:
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

Pattern 2: Output Validation and Retry

LLM outputs are nondeterministic. Sometimes the model returns malformed JSON, ignores format instructions, or produces content that does not meet your requirements. You need to validate outputs and retry on failure.

async def generate_structured_output(
    prompt: str, 
    schema: dict, 
    max_attempts: int = 3
) -> dict:
    for attempt in range(max_attempts):
        response = await call_llm(prompt)
        
        try:
            parsed = json.loads(response)
            validate(parsed, schema)  # JSON Schema validation
            return parsed
        except (json.JSONDecodeError, ValidationError) as e:
            if attempt == max_attempts - 1:
                raise OutputValidationError(
                    f"Failed to get valid output after {max_attempts} attempts: {e}"
                )
            # Add the error to the prompt for the retry
            prompt = (
                f"{prompt}\n\nYour previous response was invalid: {e}\n"
                f"Please fix the output and ensure it matches the required schema."
            )

Pattern 3: Fallback Chains

When your primary model is unavailable or returns poor results, fall back to alternatives. I maintain a chain of models ordered by preference.

class FallbackChain:
    def __init__(self, providers: list[dict]):
        self.providers = providers
    
    async def call(self, prompt: str) -> str:
        errors = []
        
        for provider in self.providers:
            try:
                return await self._call_provider(provider, prompt)
            except Exception as e:
                errors.append({"provider": provider["name"], "error": str(e)})
                logger.warning(f"Provider {provider['name']} failed: {e}")
                continue
        
        raise AllProvidersFailedError(errors)
    
    async def _call_provider(self, provider: dict, prompt: str) -> str:
        if provider["type"] == "openai":
            return await call_openai(prompt, model=provider["model"])
        elif provider["type"] == "anthropic":
            return await call_anthropic(prompt, model=provider["model"])

chain = FallbackChain([
    {"name": "gpt-4o", "type": "openai", "model": "gpt-4o"},
    {"name": "claude-sonnet", "type": "anthropic", "model": "claude-sonnet-4-20250514"},
    {"name": "gpt-4o-mini", "type": "openai", "model": "gpt-4o-mini"}
])

Pattern 4: Circuit Breaker

If a provider is consistently failing, stop calling it temporarily to avoid wasting time and accumulating costs.

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, reset_timeout: int = 300):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open
    
    def can_execute(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = "half-open"
                return True
            return False
        return True  # half-open: allow one attempt
    
    def record_success(self):
        self.failures = 0
        self.state = "closed"
    
    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"

Pattern 5: Graceful Degradation

When all else fails, degrade gracefully rather than crashing. This means having a sensible default behavior for when AI components are unavailable.

class ContentPipeline:
    async def generate_article(self, topic: str) -> dict:
        try:
            # Try full AI generation
            content = await self.ai_generator.generate(topic)
            return {"content": content, "quality": "ai-generated"}
        except AllProvidersFailedError:
            try:
                # Fall back to template-based generation
                content = self.template_engine.generate(topic)
                return {"content": content, "quality": "template-based"}
            except Exception:
                # Last resort: queue for manual creation
                await self.queue.add(topic, priority="high")
                return {"content": None, "quality": "queued-for-manual"}

Pattern 6: Dead Letter Queues

Items that fail all retry attempts should not be silently dropped. Send them to a dead letter queue for investigation.

class DeadLetterQueue:
    def __init__(self, storage_path: str):
        self.storage_path = storage_path
    
    async def add(self, item: dict, error: Exception, context: dict):
        entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "item": item,
            "error": str(error),
            "error_type": type(error).__name__,
            "context": context,
            "retry_count": context.get("retry_count", 0)
        }
        
        filepath = Path(self.storage_path) / f"{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}_{uuid4().hex[:8]}.json"
        async with aiofiles.open(filepath, 'w') as f:
            await f.write(json.dumps(entry, indent=2))

Monitoring and Alerting

Error handling is only as good as your visibility into what is happening. Track these metrics:

  • Retry rate per provider (should be under 5% for healthy systems)
  • Fallback activation frequency
  • Circuit breaker state changes
  • Dead letter queue depth
  • Average retry delay (indicates provider health)

Build your error handling before you need it. When your AI pipeline is processing thousands of requests at 2 AM, the only thing standing between you and data loss is the error handling code you wrote last month. Make it count.