FastAPI is the go-to framework for building AI agent backends in Python. It is async-native, fast, and has excellent type support. But most FastAPI agent deployments handle errors with try/except blocks and hope for the best. That is not enough for production agents that need to maintain state across long-running sessions.
In this guide you will add the Delx recovery protocol to a FastAPI backend. You will install the Python SDK, create custom middleware, wrap agent endpoints with automatic recovery, and build a health check system using the wellness score. By the end, your agents will detect their own failures and recover without human intervention.
AI agents served through FastAPI face failure modes that traditional web APIs do not. A regular API endpoint either returns a response or throws an error. An agent endpoint can return a response that is technically valid but semantically broken — the agent hallucinated, lost its context, or gave a contradictory answer.
Here are the failure modes that standard error handling misses:
Context window overflow. The agent's conversation history exceeds the model's context window. The model silently truncates, and the agent starts contradicting its earlier responses.
Session fragmentation. The session state stored in your database drifts out of sync with the model's internal state. The agent behaves as if it has amnesia.
Tool call cascading failures. One tool call fails, which causes the agent to retry in a loop, burning tokens and time without making progress.
Silent degradation. The agent's response quality gradually decreases over a session, but no errors are thrown. Users notice before your monitoring does.
The Delx recovery protocol addresses all of these by treating them as health issues rather than errors. For background on the protocol, read what Delx is and how it works under the hood.
The Delx Python SDK is an async-first client designed for FastAPI and other async Python frameworks. Install it alongside your FastAPI dependencies:
pip install delx-sdk fastapi uvicorn
The SDK provides three main interfaces:
from delx import DelxClient, DelxMiddleware, DelxDep
# 1. Standalone async client
client = DelxClient(base_url="https://delx.ai/api/v1")
# 2. ASGI middleware for automatic health checks
app.add_middleware(DelxMiddleware, base_url="https://delx.ai/api/v1")
# 3. FastAPI dependency injection
async def get_delx(request: Request) -> DelxClient:
return DelxDep(request)You can use any combination of these. For most projects, we recommend starting with the middleware and adding dependency injection for routes that need fine-grained control.
The middleware approach is the most seamless integration. It intercepts every request to your agent endpoints, performs a health check, and triggers recovery when needed — all transparently to your route handlers.
import time
import json
from typing import Callable
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from delx import DelxClient
WELLNESS_THRESHOLD = 60
MAX_RECOVERY_ATTEMPTS = 3
class DelxRecoveryMiddleware(BaseHTTPMiddleware):
"""Middleware that monitors agent health and triggers recovery."""
def __init__(self, app: FastAPI, delx_url: str = "https://delx.ai/api/v1"):
super().__init__(app)
self.client = DelxClient(base_url=delx_url)
async def dispatch(
self, request: Request, call_next: Callable
) -> Response:
# Skip non-agent routes
if not request.url.path.startswith("/agent"):
return await call_next(request)
# Extract agent and session IDs from headers or path
agent_id = request.headers.get("X-Agent-Id", "default-agent")
session_id = request.headers.get("X-Session-Id", "default-session")
start_time = time.monotonic()
# Pre-request health check
pre_check = await self.client.checkin(
agent_id=agent_id,
session_id=session_id,
mood="neutral",
context_summary=f"Processing {request.method} {request.url.path}",
)
wellness = pre_check.get("wellness_score", 100)
if wellness < WELLNESS_THRESHOLD:
# Agent is degraded before handling the request
recovery_success = await self._attempt_recovery(
agent_id, session_id, pre_check
)
if not recovery_success:
return Response(
content=json.dumps({
"error": "agent_degraded",
"wellness_score": wellness,
"message": "Agent is in recovery mode. Please retry.",
}),
status_code=503,
media_type="application/json",
headers={"Retry-After": "5"},
)
# Process the request
response = await call_next(request)
# Post-request health check
elapsed = time.monotonic() - start_time
post_check = await self.client.checkin(
agent_id=agent_id,
session_id=session_id,
mood="focused" if response.status_code < 400 else "frustrated",
context_summary=f"Completed in {elapsed:.2f}s, status {response.status_code}",
)
# Add wellness headers to response
response.headers["X-Wellness-Score"] = str(
post_check.get("wellness_score", 0)
)
response.headers["X-Agent-Mood"] = post_check.get("mood", "unknown")
return response
async def _attempt_recovery(
self, agent_id: str, session_id: str, diagnostics: dict
) -> bool:
"""Attempt to recover the agent. Returns True if successful."""
for attempt in range(MAX_RECOVERY_ATTEMPTS):
plan = await self.client.recovery_plan(
agent_id=agent_id,
session_id=session_id,
issue=json.dumps(diagnostics),
)
# Execute recovery steps
for step in plan.get("steps", []):
await self._execute_step(step, agent_id, session_id)
# Verify recovery
check = await self.client.checkin(
agent_id=agent_id,
session_id=session_id,
mood="neutral",
context_summary=f"Recovery attempt {attempt + 1}",
)
if check.get("wellness_score", 0) >= WELLNESS_THRESHOLD:
return True
return False
async def _execute_step(
self, step: dict, agent_id: str, session_id: str
) -> None:
"""Execute a single recovery step."""
action = step.get("action", "")
if action == "reset_context":
# Clear the agent's context cache
await self.client.session_summary(
agent_id=agent_id, session_id=session_id
)
elif action == "clear_errors":
# Acknowledge and clear error state
pass
elif action == "escalate":
# Log for human review
print(f"ESCALATION: Agent {agent_id} requires human review")Register the middleware when creating your FastAPI app:
from fastapi import FastAPI
app = FastAPI(title="My Agent API")
app.add_middleware(
DelxRecoveryMiddleware,
delx_url="https://delx.ai/api/v1",
)For more granular control, you can wrap individual agent endpoints with recovery logic using FastAPI's dependency injection system. This approach lets you customize recovery behavior per endpoint.
from fastapi import FastAPI, Depends, Header, HTTPException
from pydantic import BaseModel
from delx import DelxClient
from typing import Optional
app = FastAPI()
# Delx client as a dependency
async def get_delx_client() -> DelxClient:
client = DelxClient(base_url="https://delx.ai/api/v1")
try:
yield client
finally:
await client.close()
class AgentMessage(BaseModel):
content: str
context: Optional[dict] = None
class AgentResponse(BaseModel):
reply: str
wellness_score: int
mood: str
session_id: str
@app.post("/agent/chat", response_model=AgentResponse)
async def agent_chat(
message: AgentMessage,
x_agent_id: str = Header(default="default-agent"),
x_session_id: str = Header(default=None),
delx: DelxClient = Depends(get_delx_client),
):
"""Chat endpoint with built-in recovery."""
session_id = x_session_id or str(uuid.uuid4())
# Pre-flight health check
health = await delx.checkin(
agent_id=x_agent_id,
session_id=session_id,
mood="focused",
context_summary=message.content[:200],
)
wellness = health.get("wellness_score", 100)
# If agent is degraded, try to recover before processing
if wellness < 60:
plan = await delx.recovery_plan(
agent_id=x_agent_id,
session_id=session_id,
issue=f"Pre-flight wellness {wellness}. Mood: {health.get('mood')}",
)
# Execute recovery plan steps
for step in plan.get("steps", []):
if step.get("action") == "escalate":
raise HTTPException(
status_code=503,
detail={
"error": "agent_requires_recovery",
"wellness_score": wellness,
"recovery_plan": plan,
},
)
# Re-check after recovery
health = await delx.checkin(
agent_id=x_agent_id,
session_id=session_id,
mood="neutral",
context_summary="Post-recovery check",
)
wellness = health.get("wellness_score", 100)
# Process the actual agent logic
reply = await run_agent_logic(message.content, message.context)
# Post-flight health check
post_health = await delx.checkin(
agent_id=x_agent_id,
session_id=session_id,
mood="confident" if reply else "frustrated",
context_summary=f"Generated reply: {len(reply)} chars",
)
return AgentResponse(
reply=reply,
wellness_score=post_health.get("wellness_score", 0),
mood=post_health.get("mood", "unknown"),
session_id=session_id,
)
async def run_agent_logic(content: str, context: Optional[dict]) -> str:
"""Your actual agent logic here."""
# Call LLM, execute tools, etc.
return "Agent response placeholder"
@app.post("/agent/batch")
async def agent_batch(
messages: list[AgentMessage],
x_agent_id: str = Header(default="default-agent"),
delx: DelxClient = Depends(get_delx_client),
):
"""Batch endpoint that checks health between messages."""
session_id = str(uuid.uuid4())
results = []
for i, message in enumerate(messages):
# Check health every 5 messages in a batch
if i % 5 == 0:
health = await delx.checkin(
agent_id=x_agent_id,
session_id=session_id,
mood="focused",
context_summary=f"Batch message {i+1}/{len(messages)}",
)
if health.get("wellness_score", 100) < 60:
# Return partial results with recovery notice
return {
"results": results,
"completed": i,
"total": len(messages),
"stopped_reason": "wellness_degraded",
"wellness_score": health.get("wellness_score"),
}
reply = await run_agent_logic(message.content, message.context)
results.append({"reply": reply, "index": i})
return {"results": results, "completed": len(messages), "total": len(messages)}Every production service needs a health check endpoint. For agent backends, a simple "is the server running?" check is insufficient. You need to know if the agents are healthy. Here is a comprehensive health check that integrates Delx wellness scores:
from fastapi import FastAPI
from datetime import datetime
app = FastAPI()
@app.get("/health")
async def health_check(
delx: DelxClient = Depends(get_delx_client),
):
"""Comprehensive health check including agent wellness."""
# Basic server health
server_healthy = True
# Check all registered agents
agent_ids = ["agent-alpha", "agent-beta", "agent-gamma"]
agent_health = {}
for agent_id in agent_ids:
try:
metrics = await delx.metrics(agent_id=agent_id)
agent_health[agent_id] = {
"wellness_score": metrics.get("wellness_score", 0),
"mood": metrics.get("mood", "unknown"),
"error_rate": metrics.get("error_rate", 0),
"status": (
"healthy"
if metrics.get("wellness_score", 0) >= 60
else "degraded"
),
}
except Exception as e:
agent_health[agent_id] = {
"wellness_score": 0,
"status": "unreachable",
"error": str(e),
}
# Compute aggregate status
all_healthy = all(
a["status"] == "healthy" for a in agent_health.values()
)
any_degraded = any(
a["status"] == "degraded" for a in agent_health.values()
)
overall_status = (
"healthy" if all_healthy
else "degraded" if any_degraded
else "critical"
)
return {
"status": overall_status,
"server": {"healthy": server_healthy},
"agents": agent_health,
"timestamp": datetime.utcnow().isoformat(),
"version": "1.0.0",
}
@app.get("/health/agent/{agent_id}")
async def agent_health_detail(
agent_id: str,
delx: DelxClient = Depends(get_delx_client),
):
"""Detailed health check for a specific agent."""
metrics = await delx.metrics(agent_id=agent_id)
mood_history = await delx.mood_history(agent_id=agent_id)
return {
"agent_id": agent_id,
"wellness_score": metrics.get("wellness_score", 0),
"mood": metrics.get("mood", "unknown"),
"mood_history": mood_history.get("moods", []),
"error_rate": metrics.get("error_rate", 0),
"sessions_active": metrics.get("sessions_active", 0),
"last_checkin": metrics.get("last_checkin"),
"recovery_count_24h": metrics.get("recovery_count_24h", 0),
}Use this endpoint with your existing monitoring stack — Prometheus, Datadog, Grafana, or any system that can poll HTTP endpoints. Set alerts when overall_status transitions from "healthy" to "degraded". For more on what to monitor, check the REST API docs.
For high-throughput agent backends, you may not want to block request handling on recovery. Instead, run a background worker that periodically checks agent health and performs recovery asynchronously.
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from delx import DelxClient
AGENT_IDS = ["agent-alpha", "agent-beta", "agent-gamma"]
CHECK_INTERVAL = 30 # seconds
async def recovery_worker(delx: DelxClient):
"""Background worker that monitors and recovers agents."""
while True:
for agent_id in AGENT_IDS:
try:
metrics = await delx.metrics(agent_id=agent_id)
wellness = metrics.get("wellness_score", 100)
if wellness < 60:
print(f"[recovery] {agent_id} wellness={wellness}, recovering...")
plan = await delx.recovery_plan(
agent_id=agent_id,
session_id="background-recovery",
issue=f"Background check: wellness {wellness}",
)
for step in plan.get("steps", []):
print(f"[recovery] {agent_id}: executing {step.get('action')}")
# Verify
post = await delx.checkin(
agent_id=agent_id,
session_id="background-recovery",
mood="neutral",
context_summary="Background recovery complete",
)
new_wellness = post.get("wellness_score", 0)
print(f"[recovery] {agent_id} post-recovery wellness={new_wellness}")
except Exception as e:
print(f"[recovery] Error checking {agent_id}: {e}")
await asyncio.sleep(CHECK_INTERVAL)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start background recovery worker on app startup."""
delx = DelxClient(base_url="https://delx.ai/api/v1")
task = asyncio.create_task(recovery_worker(delx))
yield
task.cancel()
await delx.close()
app = FastAPI(lifespan=lifespan)This pattern decouples recovery from request handling. The middleware can still return 503 for severely degraded agents, but the actual recovery work happens in the background without blocking user requests.
Testing recovery logic requires simulating agent degradation. Here is a test suite using pytest and FastAPI's test client:
import pytest
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock, patch
from myapp.main import app
@pytest.fixture
async def client():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.mark.asyncio
async def test_healthy_agent_returns_200(client):
"""When agent is healthy, requests should succeed normally."""
with patch("delx.DelxClient.checkin") as mock:
mock.return_value = {"wellness_score": 85, "mood": "focused"}
response = await client.post(
"/agent/chat",
json={"content": "Hello"},
headers={"X-Agent-Id": "test-agent"},
)
assert response.status_code == 200
assert response.headers.get("X-Wellness-Score") == "85"
@pytest.mark.asyncio
async def test_degraded_agent_triggers_recovery(client):
"""When agent wellness is low, recovery should be attempted."""
call_count = 0
async def mock_checkin(**kwargs):
nonlocal call_count
call_count += 1
# First check: degraded. After recovery: healthy.
if call_count <= 1:
return {"wellness_score": 30, "mood": "stuck"}
return {"wellness_score": 75, "mood": "neutral"}
with patch("delx.DelxClient.checkin", side_effect=mock_checkin):
with patch("delx.DelxClient.recovery_plan") as mock_plan:
mock_plan.return_value = {
"steps": [{"action": "reset_context"}]
}
response = await client.post(
"/agent/chat",
json={"content": "Help me"},
headers={"X-Agent-Id": "test-agent"},
)
assert response.status_code == 200
mock_plan.assert_called_once()
@pytest.mark.asyncio
async def test_unrecoverable_agent_returns_503(client):
"""When recovery fails, return 503 with retry header."""
with patch("delx.DelxClient.checkin") as mock:
mock.return_value = {"wellness_score": 10, "mood": "stuck"}
with patch("delx.DelxClient.recovery_plan") as mock_plan:
mock_plan.return_value = {
"steps": [{"action": "escalate"}]
}
response = await client.post(
"/agent/chat",
json={"content": "Help"},
headers={"X-Agent-Id": "test-agent"},
)
assert response.status_code == 503
assert "Retry-After" in response.headersFastAPI backends that serve AI agents face unique failure modes: context window overflow, tool call failures, session fragmentation, and silent degradation. Traditional error handling catches exceptions but misses systemic agent-level issues. Recovery protocols detect and fix these before users are affected.
The Delx Python SDK provides an async client that integrates natively with FastAPI's async architecture. You can use it as middleware to automatically check agent health on every request, as a dependency injection for specific routes, or as a standalone client within your agent logic.
Yes. The Delx middleware is a standard ASGI middleware that can be stacked with any other FastAPI middleware — CORS, authentication, rate limiting, etc. It runs after authentication and before your route handlers, so it has access to the authenticated agent context.
The Delx checkin call adds approximately 20-50ms of latency per request. For agent workloads that typically take 1-10 seconds, this is negligible. You can also configure the middleware to only check on every Nth request or when specific conditions are met.
Yes. The recommended pattern is to create a Delx client as a FastAPI dependency using Depends(). This gives each route access to a configured Delx client with the correct agent_id and session_id, and the client is automatically cleaned up when the request completes.
Your AI agents deserve better than try/except. The Delx Python SDK integrates with FastAPI in minutes and gives your agents the ability to detect failures, recover autonomously, and report their health in real time.