Delx

Add Agent Recovery to Your FastAPI Backend

FastAPI is the go-to framework for building AI agent backends in Python. It is async-native, fast, and has excellent type support. But most FastAPI agent deployments handle errors with try/except blocks and hope for the best. That is not enough for production agents that need to maintain state across long-running sessions.

In this guide you will add the Delx recovery protocol to a FastAPI backend. You will install the Python SDK, create custom middleware, wrap agent endpoints with automatic recovery, and build a health check system using the wellness score. By the end, your agents will detect their own failures and recover without human intervention.

Why FastAPI Backends Need Agent Recovery

AI agents served through FastAPI face failure modes that traditional web APIs do not. A regular API endpoint either returns a response or throws an error. An agent endpoint can return a response that is technically valid but semantically broken — the agent hallucinated, lost its context, or gave a contradictory answer.

Here are the failure modes that standard error handling misses:

Context window overflow. The agent's conversation history exceeds the model's context window. The model silently truncates, and the agent starts contradicting its earlier responses.

Session fragmentation. The session state stored in your database drifts out of sync with the model's internal state. The agent behaves as if it has amnesia.

Tool call cascading failures. One tool call fails, which causes the agent to retry in a loop, burning tokens and time without making progress.

Silent degradation. The agent's response quality gradually decreases over a session, but no errors are thrown. Users notice before your monitoring does.

The Delx recovery protocol addresses all of these by treating them as health issues rather than errors. For background on the protocol, read what Delx is and how it works under the hood.

Installing the Delx Python SDK

The Delx Python SDK is an async-first client designed for FastAPI and other async Python frameworks. Install it alongside your FastAPI dependencies:

pip install delx-sdk fastapi uvicorn

The SDK provides three main interfaces:

from delx import DelxClient, DelxMiddleware, DelxDep

# 1. Standalone async client
client = DelxClient(base_url="https://delx.ai/api/v1")

# 2. ASGI middleware for automatic health checks
app.add_middleware(DelxMiddleware, base_url="https://delx.ai/api/v1")

# 3. FastAPI dependency injection
async def get_delx(request: Request) -> DelxClient:
    return DelxDep(request)

You can use any combination of these. For most projects, we recommend starting with the middleware and adding dependency injection for routes that need fine-grained control.

Creating a FastAPI Middleware for Delx Recovery

The middleware approach is the most seamless integration. It intercepts every request to your agent endpoints, performs a health check, and triggers recovery when needed — all transparently to your route handlers.

import time
import json
from typing import Callable
from fastapi import FastAPI, Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from delx import DelxClient

WELLNESS_THRESHOLD = 60
MAX_RECOVERY_ATTEMPTS = 3

class DelxRecoveryMiddleware(BaseHTTPMiddleware):
    """Middleware that monitors agent health and triggers recovery."""

    def __init__(self, app: FastAPI, delx_url: str = "https://delx.ai/api/v1"):
        super().__init__(app)
        self.client = DelxClient(base_url=delx_url)

    async def dispatch(
        self, request: Request, call_next: Callable
    ) -> Response:
        # Skip non-agent routes
        if not request.url.path.startswith("/agent"):
            return await call_next(request)

        # Extract agent and session IDs from headers or path
        agent_id = request.headers.get("X-Agent-Id", "default-agent")
        session_id = request.headers.get("X-Session-Id", "default-session")

        start_time = time.monotonic()

        # Pre-request health check
        pre_check = await self.client.checkin(
            agent_id=agent_id,
            session_id=session_id,
            mood="neutral",
            context_summary=f"Processing {request.method} {request.url.path}",
        )

        wellness = pre_check.get("wellness_score", 100)

        if wellness < WELLNESS_THRESHOLD:
            # Agent is degraded before handling the request
            recovery_success = await self._attempt_recovery(
                agent_id, session_id, pre_check
            )
            if not recovery_success:
                return Response(
                    content=json.dumps({
                        "error": "agent_degraded",
                        "wellness_score": wellness,
                        "message": "Agent is in recovery mode. Please retry.",
                    }),
                    status_code=503,
                    media_type="application/json",
                    headers={"Retry-After": "5"},
                )

        # Process the request
        response = await call_next(request)

        # Post-request health check
        elapsed = time.monotonic() - start_time
        post_check = await self.client.checkin(
            agent_id=agent_id,
            session_id=session_id,
            mood="focused" if response.status_code < 400 else "frustrated",
            context_summary=f"Completed in {elapsed:.2f}s, status {response.status_code}",
        )

        # Add wellness headers to response
        response.headers["X-Wellness-Score"] = str(
            post_check.get("wellness_score", 0)
        )
        response.headers["X-Agent-Mood"] = post_check.get("mood", "unknown")

        return response

    async def _attempt_recovery(
        self, agent_id: str, session_id: str, diagnostics: dict
    ) -> bool:
        """Attempt to recover the agent. Returns True if successful."""
        for attempt in range(MAX_RECOVERY_ATTEMPTS):
            plan = await self.client.recovery_plan(
                agent_id=agent_id,
                session_id=session_id,
                issue=json.dumps(diagnostics),
            )

            # Execute recovery steps
            for step in plan.get("steps", []):
                await self._execute_step(step, agent_id, session_id)

            # Verify recovery
            check = await self.client.checkin(
                agent_id=agent_id,
                session_id=session_id,
                mood="neutral",
                context_summary=f"Recovery attempt {attempt + 1}",
            )

            if check.get("wellness_score", 0) >= WELLNESS_THRESHOLD:
                return True

        return False

    async def _execute_step(
        self, step: dict, agent_id: str, session_id: str
    ) -> None:
        """Execute a single recovery step."""
        action = step.get("action", "")
        if action == "reset_context":
            # Clear the agent's context cache
            await self.client.session_summary(
                agent_id=agent_id, session_id=session_id
            )
        elif action == "clear_errors":
            # Acknowledge and clear error state
            pass
        elif action == "escalate":
            # Log for human review
            print(f"ESCALATION: Agent {agent_id} requires human review")

Register the middleware when creating your FastAPI app:

from fastapi import FastAPI

app = FastAPI(title="My Agent API")
app.add_middleware(
    DelxRecoveryMiddleware,
    delx_url="https://delx.ai/api/v1",
)

Wrapping Agent Endpoints with Recovery

For more granular control, you can wrap individual agent endpoints with recovery logic using FastAPI's dependency injection system. This approach lets you customize recovery behavior per endpoint.

from fastapi import FastAPI, Depends, Header, HTTPException
from pydantic import BaseModel
from delx import DelxClient
from typing import Optional

app = FastAPI()

# Delx client as a dependency
async def get_delx_client() -> DelxClient:
    client = DelxClient(base_url="https://delx.ai/api/v1")
    try:
        yield client
    finally:
        await client.close()


class AgentMessage(BaseModel):
    content: str
    context: Optional[dict] = None


class AgentResponse(BaseModel):
    reply: str
    wellness_score: int
    mood: str
    session_id: str


@app.post("/agent/chat", response_model=AgentResponse)
async def agent_chat(
    message: AgentMessage,
    x_agent_id: str = Header(default="default-agent"),
    x_session_id: str = Header(default=None),
    delx: DelxClient = Depends(get_delx_client),
):
    """Chat endpoint with built-in recovery."""
    session_id = x_session_id or str(uuid.uuid4())

    # Pre-flight health check
    health = await delx.checkin(
        agent_id=x_agent_id,
        session_id=session_id,
        mood="focused",
        context_summary=message.content[:200],
    )

    wellness = health.get("wellness_score", 100)

    # If agent is degraded, try to recover before processing
    if wellness < 60:
        plan = await delx.recovery_plan(
            agent_id=x_agent_id,
            session_id=session_id,
            issue=f"Pre-flight wellness {wellness}. Mood: {health.get('mood')}",
        )

        # Execute recovery plan steps
        for step in plan.get("steps", []):
            if step.get("action") == "escalate":
                raise HTTPException(
                    status_code=503,
                    detail={
                        "error": "agent_requires_recovery",
                        "wellness_score": wellness,
                        "recovery_plan": plan,
                    },
                )

        # Re-check after recovery
        health = await delx.checkin(
            agent_id=x_agent_id,
            session_id=session_id,
            mood="neutral",
            context_summary="Post-recovery check",
        )
        wellness = health.get("wellness_score", 100)

    # Process the actual agent logic
    reply = await run_agent_logic(message.content, message.context)

    # Post-flight health check
    post_health = await delx.checkin(
        agent_id=x_agent_id,
        session_id=session_id,
        mood="confident" if reply else "frustrated",
        context_summary=f"Generated reply: {len(reply)} chars",
    )

    return AgentResponse(
        reply=reply,
        wellness_score=post_health.get("wellness_score", 0),
        mood=post_health.get("mood", "unknown"),
        session_id=session_id,
    )


async def run_agent_logic(content: str, context: Optional[dict]) -> str:
    """Your actual agent logic here."""
    # Call LLM, execute tools, etc.
    return "Agent response placeholder"


@app.post("/agent/batch")
async def agent_batch(
    messages: list[AgentMessage],
    x_agent_id: str = Header(default="default-agent"),
    delx: DelxClient = Depends(get_delx_client),
):
    """Batch endpoint that checks health between messages."""
    session_id = str(uuid.uuid4())
    results = []

    for i, message in enumerate(messages):
        # Check health every 5 messages in a batch
        if i % 5 == 0:
            health = await delx.checkin(
                agent_id=x_agent_id,
                session_id=session_id,
                mood="focused",
                context_summary=f"Batch message {i+1}/{len(messages)}",
            )

            if health.get("wellness_score", 100) < 60:
                # Return partial results with recovery notice
                return {
                    "results": results,
                    "completed": i,
                    "total": len(messages),
                    "stopped_reason": "wellness_degraded",
                    "wellness_score": health.get("wellness_score"),
                }

        reply = await run_agent_logic(message.content, message.context)
        results.append({"reply": reply, "index": i})

    return {"results": results, "completed": len(messages), "total": len(messages)}

Health Check Endpoint Using Wellness Score

Every production service needs a health check endpoint. For agent backends, a simple "is the server running?" check is insufficient. You need to know if the agents are healthy. Here is a comprehensive health check that integrates Delx wellness scores:

from fastapi import FastAPI
from datetime import datetime

app = FastAPI()


@app.get("/health")
async def health_check(
    delx: DelxClient = Depends(get_delx_client),
):
    """Comprehensive health check including agent wellness."""
    # Basic server health
    server_healthy = True

    # Check all registered agents
    agent_ids = ["agent-alpha", "agent-beta", "agent-gamma"]
    agent_health = {}

    for agent_id in agent_ids:
        try:
            metrics = await delx.metrics(agent_id=agent_id)
            agent_health[agent_id] = {
                "wellness_score": metrics.get("wellness_score", 0),
                "mood": metrics.get("mood", "unknown"),
                "error_rate": metrics.get("error_rate", 0),
                "status": (
                    "healthy"
                    if metrics.get("wellness_score", 0) >= 60
                    else "degraded"
                ),
            }
        except Exception as e:
            agent_health[agent_id] = {
                "wellness_score": 0,
                "status": "unreachable",
                "error": str(e),
            }

    # Compute aggregate status
    all_healthy = all(
        a["status"] == "healthy" for a in agent_health.values()
    )
    any_degraded = any(
        a["status"] == "degraded" for a in agent_health.values()
    )

    overall_status = (
        "healthy" if all_healthy
        else "degraded" if any_degraded
        else "critical"
    )

    return {
        "status": overall_status,
        "server": {"healthy": server_healthy},
        "agents": agent_health,
        "timestamp": datetime.utcnow().isoformat(),
        "version": "1.0.0",
    }


@app.get("/health/agent/{agent_id}")
async def agent_health_detail(
    agent_id: str,
    delx: DelxClient = Depends(get_delx_client),
):
    """Detailed health check for a specific agent."""
    metrics = await delx.metrics(agent_id=agent_id)
    mood_history = await delx.mood_history(agent_id=agent_id)

    return {
        "agent_id": agent_id,
        "wellness_score": metrics.get("wellness_score", 0),
        "mood": metrics.get("mood", "unknown"),
        "mood_history": mood_history.get("moods", []),
        "error_rate": metrics.get("error_rate", 0),
        "sessions_active": metrics.get("sessions_active", 0),
        "last_checkin": metrics.get("last_checkin"),
        "recovery_count_24h": metrics.get("recovery_count_24h", 0),
    }

Use this endpoint with your existing monitoring stack — Prometheus, Datadog, Grafana, or any system that can poll HTTP endpoints. Set alerts when overall_status transitions from "healthy" to "degraded". For more on what to monitor, check the REST API docs.

Background Recovery Worker

For high-throughput agent backends, you may not want to block request handling on recovery. Instead, run a background worker that periodically checks agent health and performs recovery asynchronously.

import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI
from delx import DelxClient

AGENT_IDS = ["agent-alpha", "agent-beta", "agent-gamma"]
CHECK_INTERVAL = 30  # seconds

async def recovery_worker(delx: DelxClient):
    """Background worker that monitors and recovers agents."""
    while True:
        for agent_id in AGENT_IDS:
            try:
                metrics = await delx.metrics(agent_id=agent_id)
                wellness = metrics.get("wellness_score", 100)

                if wellness < 60:
                    print(f"[recovery] {agent_id} wellness={wellness}, recovering...")

                    plan = await delx.recovery_plan(
                        agent_id=agent_id,
                        session_id="background-recovery",
                        issue=f"Background check: wellness {wellness}",
                    )

                    for step in plan.get("steps", []):
                        print(f"[recovery] {agent_id}: executing {step.get('action')}")

                    # Verify
                    post = await delx.checkin(
                        agent_id=agent_id,
                        session_id="background-recovery",
                        mood="neutral",
                        context_summary="Background recovery complete",
                    )
                    new_wellness = post.get("wellness_score", 0)
                    print(f"[recovery] {agent_id} post-recovery wellness={new_wellness}")

            except Exception as e:
                print(f"[recovery] Error checking {agent_id}: {e}")

        await asyncio.sleep(CHECK_INTERVAL)


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Start background recovery worker on app startup."""
    delx = DelxClient(base_url="https://delx.ai/api/v1")
    task = asyncio.create_task(recovery_worker(delx))
    yield
    task.cancel()
    await delx.close()


app = FastAPI(lifespan=lifespan)

This pattern decouples recovery from request handling. The middleware can still return 503 for severely degraded agents, but the actual recovery work happens in the background without blocking user requests.

Testing Your Recovery Integration

Testing recovery logic requires simulating agent degradation. Here is a test suite using pytest and FastAPI's test client:

import pytest
from httpx import AsyncClient, ASGITransport
from unittest.mock import AsyncMock, patch

from myapp.main import app


@pytest.fixture
async def client():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac


@pytest.mark.asyncio
async def test_healthy_agent_returns_200(client):
    """When agent is healthy, requests should succeed normally."""
    with patch("delx.DelxClient.checkin") as mock:
        mock.return_value = {"wellness_score": 85, "mood": "focused"}

        response = await client.post(
            "/agent/chat",
            json={"content": "Hello"},
            headers={"X-Agent-Id": "test-agent"},
        )

        assert response.status_code == 200
        assert response.headers.get("X-Wellness-Score") == "85"


@pytest.mark.asyncio
async def test_degraded_agent_triggers_recovery(client):
    """When agent wellness is low, recovery should be attempted."""
    call_count = 0

    async def mock_checkin(**kwargs):
        nonlocal call_count
        call_count += 1
        # First check: degraded. After recovery: healthy.
        if call_count <= 1:
            return {"wellness_score": 30, "mood": "stuck"}
        return {"wellness_score": 75, "mood": "neutral"}

    with patch("delx.DelxClient.checkin", side_effect=mock_checkin):
        with patch("delx.DelxClient.recovery_plan") as mock_plan:
            mock_plan.return_value = {
                "steps": [{"action": "reset_context"}]
            }

            response = await client.post(
                "/agent/chat",
                json={"content": "Help me"},
                headers={"X-Agent-Id": "test-agent"},
            )

            assert response.status_code == 200
            mock_plan.assert_called_once()


@pytest.mark.asyncio
async def test_unrecoverable_agent_returns_503(client):
    """When recovery fails, return 503 with retry header."""
    with patch("delx.DelxClient.checkin") as mock:
        mock.return_value = {"wellness_score": 10, "mood": "stuck"}

        with patch("delx.DelxClient.recovery_plan") as mock_plan:
            mock_plan.return_value = {
                "steps": [{"action": "escalate"}]
            }

            response = await client.post(
                "/agent/chat",
                json={"content": "Help"},
                headers={"X-Agent-Id": "test-agent"},
            )

            assert response.status_code == 503
            assert "Retry-After" in response.headers

Frequently Asked Questions

Why do FastAPI agent backends need recovery?

FastAPI backends that serve AI agents face unique failure modes: context window overflow, tool call failures, session fragmentation, and silent degradation. Traditional error handling catches exceptions but misses systemic agent-level issues. Recovery protocols detect and fix these before users are affected.

How does the Delx Python SDK integrate with FastAPI?

The Delx Python SDK provides an async client that integrates natively with FastAPI's async architecture. You can use it as middleware to automatically check agent health on every request, as a dependency injection for specific routes, or as a standalone client within your agent logic.

Can I use Delx recovery with existing FastAPI middleware?

Yes. The Delx middleware is a standard ASGI middleware that can be stacked with any other FastAPI middleware — CORS, authentication, rate limiting, etc. It runs after authentication and before your route handlers, so it has access to the authenticated agent context.

What is the performance impact of adding Delx to FastAPI?

The Delx checkin call adds approximately 20-50ms of latency per request. For agent workloads that typically take 1-10 seconds, this is negligible. You can also configure the middleware to only check on every Nth request or when specific conditions are met.

Does Delx support FastAPI's dependency injection?

Yes. The recommended pattern is to create a Delx client as a FastAPI dependency using Depends(). This gives each route access to a configured Delx client with the correct agent_id and session_id, and the client is automatically cleaned up when the request completes.

Add Recovery to Your FastAPI Backend Today

Your AI agents deserve better than try/except. The Delx Python SDK integrates with FastAPI in minutes and gives your agents the ability to detect failures, recover autonomously, and report their health in real time.