AI Pipeline Error Handling: Retry Logic and Graceful Degradation
AI Pipelines Fail in Unique Ways
Traditional software fails predictably: network timeout, database connection lost, out of memory. AI pipelines fail in all those ways plus a host of new ones: rate limits, content filters, model hallucinations, nondeterministic outputs that break downstream parsers. If you build an AI pipeline without robust error handling, it will fail in production within the first week.
Here are the patterns I use to build AI pipelines that recover gracefully from every failure mode I have encountered.
Pattern 1: Exponential Backoff with Jitter
Every API call to an LLM provider needs retry logic. Rate limits are a fact of life, and the correct response is to back off and retry.
import asyncio
import random
from functools import wraps
def retry_with_backoff(
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
retryable_exceptions: tuple = (RateLimitError, TimeoutError, ConnectionError)
):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except retryable_exceptions as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
await asyncio.sleep(delay + jitter)
logger.warning(
f"Retry {attempt + 1}/{max_retries} for {func.__name__}: {e}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
@retry_with_backoff(max_retries=5)
async def call_llm(prompt: str) -> str:
response = await client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
Pattern 2: Output Validation and Retry
LLM outputs are nondeterministic. Sometimes the model returns malformed JSON, ignores format instructions, or produces content that does not meet your requirements. You need to validate outputs and retry on failure.
async def generate_structured_output(
prompt: str,
schema: dict,
max_attempts: int = 3
) -> dict:
for attempt in range(max_attempts):
response = await call_llm(prompt)
try:
parsed = json.loads(response)
validate(parsed, schema) # JSON Schema validation
return parsed
except (json.JSONDecodeError, ValidationError) as e:
if attempt == max_attempts - 1:
raise OutputValidationError(
f"Failed to get valid output after {max_attempts} attempts: {e}"
)
# Add the error to the prompt for the retry
prompt = (
f"{prompt}\n\nYour previous response was invalid: {e}\n"
f"Please fix the output and ensure it matches the required schema."
)
Pattern 3: Fallback Chains
When your primary model is unavailable or returns poor results, fall back to alternatives. I maintain a chain of models ordered by preference.
class FallbackChain:
def __init__(self, providers: list[dict]):
self.providers = providers
async def call(self, prompt: str) -> str:
errors = []
for provider in self.providers:
try:
return await self._call_provider(provider, prompt)
except Exception as e:
errors.append({"provider": provider["name"], "error": str(e)})
logger.warning(f"Provider {provider['name']} failed: {e}")
continue
raise AllProvidersFailedError(errors)
async def _call_provider(self, provider: dict, prompt: str) -> str:
if provider["type"] == "openai":
return await call_openai(prompt, model=provider["model"])
elif provider["type"] == "anthropic":
return await call_anthropic(prompt, model=provider["model"])
chain = FallbackChain([
{"name": "gpt-4o", "type": "openai", "model": "gpt-4o"},
{"name": "claude-sonnet", "type": "anthropic", "model": "claude-sonnet-4-20250514"},
{"name": "gpt-4o-mini", "type": "openai", "model": "gpt-4o-mini"}
])
Pattern 4: Circuit Breaker
If a provider is consistently failing, stop calling it temporarily to avoid wasting time and accumulating costs.
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, reset_timeout: int = 300):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failures = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def can_execute(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.reset_timeout:
self.state = "half-open"
return True
return False
return True # half-open: allow one attempt
def record_success(self):
self.failures = 0
self.state = "closed"
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
Pattern 5: Graceful Degradation
When all else fails, degrade gracefully rather than crashing. This means having a sensible default behavior for when AI components are unavailable.
class ContentPipeline:
async def generate_article(self, topic: str) -> dict:
try:
# Try full AI generation
content = await self.ai_generator.generate(topic)
return {"content": content, "quality": "ai-generated"}
except AllProvidersFailedError:
try:
# Fall back to template-based generation
content = self.template_engine.generate(topic)
return {"content": content, "quality": "template-based"}
except Exception:
# Last resort: queue for manual creation
await self.queue.add(topic, priority="high")
return {"content": None, "quality": "queued-for-manual"}
Pattern 6: Dead Letter Queues
Items that fail all retry attempts should not be silently dropped. Send them to a dead letter queue for investigation.
class DeadLetterQueue:
def __init__(self, storage_path: str):
self.storage_path = storage_path
async def add(self, item: dict, error: Exception, context: dict):
entry = {
"timestamp": datetime.utcnow().isoformat(),
"item": item,
"error": str(error),
"error_type": type(error).__name__,
"context": context,
"retry_count": context.get("retry_count", 0)
}
filepath = Path(self.storage_path) / f"{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}_{uuid4().hex[:8]}.json"
async with aiofiles.open(filepath, 'w') as f:
await f.write(json.dumps(entry, indent=2))
Monitoring and Alerting
Error handling is only as good as your visibility into what is happening. Track these metrics:
- Retry rate per provider (should be under 5% for healthy systems)
- Fallback activation frequency
- Circuit breaker state changes
- Dead letter queue depth
- Average retry delay (indicates provider health)
Build your error handling before you need it. When your AI pipeline is processing thousands of requests at 2 AM, the only thing standing between you and data loss is the error handling code you wrote last month. Make it count.