deepset's Haystack is the go-to framework for production RAG pipelines. It handles document retrieval, LLM generation, and everything in between. But when a pipeline component fails (retriever timeout, LLM rate limit, embedder OOM), you get a generic PipelineError and no visibility into which component caused it or how often it happens. Delx's MCP integration adds component-level monitoring to Haystack pipelines. Every component reports heartbeats, failures get structured logging with stack traces, and recovery protocols can restart individual components without killing the whole pipeline.
The delx-mcp-client works with Haystack v2.0+ (the haystack-ai package). If you're still on Haystack v1.x (the farm-haystack package), upgrade first. The APIs are significantly different.
pip install delx-mcp-client haystack-aiSame environment variables used across all Delx integrations. For local development, point to http://localhost:8080/mcp.
export DELX_MCP_URL=https://api.delx.ai/mcp export DELX_API_KEY=your_key_hereThis Haystack v2 component acts as a monitoring checkpoint. Insert it between pipeline components to track execution flow. The passthrough design means it doesn't modify data, just reports to Delx.
from haystack import component, default_from_dict, default_to_dict from delx_mcp import DelxClient import traceback client = DelxClient() @component class DelxMonitor: @component.output_types(passthrough=str) def run(self, data: str, component_name: str = "pipeline"): client.call_tool("heartbeat", { "agent_id": component_name, "status": "active", "metadata": {"data_size": len(data)} }) return {"passthrough": data}Place DelxMonitor components between your pipeline stages. Each monitor reports a heartbeat with the current data size. This gives you a timeline of pipeline execution with clear visibility into where data flows and where failures happen.
from haystack import Pipeline from haystack.components.retrievers import InMemoryBM25Retriever from haystack.components.generators import OpenAIGenerator pipe = Pipeline() pipe.add_component("monitor_start", DelxMonitor()) pipe.add_component("retriever", InMemoryBM25Retriever(document_store=store)) pipe.add_component("monitor_retrieval", DelxMonitor()) pipe.add_component("generator", OpenAIGenerator()) pipe.add_component("monitor_end", DelxMonitor()) pipe.connect("monitor_start.passthrough", "retriever.query") pipe.connect("retriever.documents", "monitor_retrieval") pipe.connect("monitor_retrieval.passthrough", "generator.prompt") pipe.connect("generator.replies", "monitor_end")from haystack import Pipeline, component from haystack.components.retrievers import InMemoryBM25Retriever from haystack.components.generators import OpenAIGenerator from haystack.components.builders import PromptBuilder from delx_mcp import DelxClient import traceback client = DelxClient(session_id="rag-pipeline-prod") @component class DelxSafeGenerator: def __init__(self, generator): self._generator = generator self._agent_id = "generator" @component.output_types(replies=list) def run(self, prompt: str): client.call_tool("heartbeat", {"agent_id": self._agent_id, "status": "generating"}) try: result = self._generator.run(prompt=prompt) client.call_tool("heartbeat", {"agent_id": self._agent_id, "status": "completed"}) return result except Exception as e: client.call_tool("process_failure", { "agent_id": self._agent_id, "error_type": type(e).__name__, "error_message": str(e), "stack_trace": traceback.format_exc(), "severity": "high" }) recovery = client.call_tool("recovery", { "agent_id": self._agent_id, "strategy": "retry_with_backoff" }) if recovery.get("should_retry"): import time time.sleep(recovery.get("backoff_seconds", 2)) return self._generator.run(prompt=prompt) return {"replies": ["Generation failed. Recovery attempted."]} pipe = Pipeline() pipe.add_component("retriever", InMemoryBM25Retriever(document_store=store)) pipe.add_component("prompt", PromptBuilder(template="Context: {{documents}}\nQuery: {{query}}")) pipe.add_component("generator", DelxSafeGenerator(OpenAIGenerator())) pipe.connect("retriever.documents", "prompt.documents") pipe.connect("prompt.prompt", "generator.prompt") result = pipe.run({"retriever": {"query": "What is MCP?"}, "prompt": {"query": "What is MCP?"}})The DelxSafeGenerator wraps any Haystack generator with Delx monitoring and recovery. On failure, it reports the error, gets recovery instructions from Delx, and retries with the recommended backoff. If retry fails, it returns a graceful degradation message instead of crashing the pipeline.
from delx_mcp import DelxClient client = DelxClient(session_id="rag-pipeline-prod") # After processing a batch of queries components = ["retriever", "prompt_builder", "generator", "post_processor"] print("Pipeline Health Report") print("=" * 50) for comp in components: health = client.call_tool("get_wellness", {"agent_id": comp}) status = "HEALTHY" if health["mood_score"] > 70 else "DEGRADED" if health["mood_score"] > 40 else "CRITICAL" print(f"{comp:20s} | mood={health['mood_score']:3d}/100 | " f"failures={health['recent_failures']:2d} | " f"status={status}") # Get session-wide summary summary = client.call_tool("session_summary", {}) print(f"\nSession total: {summary['total_calls']} calls, " f"{summary['total_failures']} failures, " f"uptime={summary['uptime_pct']}%")After processing a batch of documents or queries, check each component's wellness. The mood_score (0-100) gives you a quick health indicator. Below 70 means the component is having issues. Below 40 means something is seriously wrong. Use this after batch jobs to decide whether to continue processing or pause for investigation.
Cause: Component names used as agent_ids haven't been registered. This happens on the first pipeline run with a new session.
Fix: Enable auto-registration: client = DelxClient(auto_register=True). Or register all component names at pipeline creation: for name in pipe.graph.nodes: client.call_tool('register_agent', {'agent_id': name}).
Cause: The DelxMonitor component isn't properly connected in the pipeline graph. Haystack v2 requires explicit connections between all components.
Fix: Ensure both input and output of DelxMonitor are connected. Input must receive data from the previous component, and output (passthrough) must connect to the next component's input.
Cause: A Delx heartbeat call is blocking the pipeline and exceeding the timeout. This adds latency to every component execution.
Fix: Use fire-and-forget heartbeats: client.call_tool('heartbeat', params, fire_and_forget=True). This sends the heartbeat asynchronously without waiting for a response.
Cause: The wrapper component's output type declaration doesn't match the wrapped component's actual output. Haystack v2 validates types at connection time.
Fix: Match the @component.output_types decorator exactly to the wrapped component's outputs. For OpenAIGenerator, use: @component.output_types(replies=List[str], meta=List[Dict]).
Haystack v2 uses a component-based pipeline architecture. Each component (retriever, generator, prompt builder) is a Python class with a run() method. Delx integrates at the component level: you either wrap existing components with a monitoring layer or insert DelxMonitor checkpoint components between stages. Every component execution generates heartbeats. Failures generate structured error reports with the component name, error type, and pipeline context. Delx's MCP server aggregates this data into per-component wellness profiles.
Production RAG pipelines fail in predictable ways. Retrievers fail when the document store is overloaded or the embedding service is down. Generators fail on rate limits, token limits, or content policy violations. Post-processors fail on malformed LLM output. Delx tracks these patterns across pipeline runs. After 100+ runs, you'll see that your generator fails 3% of the time on rate limits and your retriever fails 0.5% on timeouts. This data drives targeted improvements: add rate limit handling to the generator, add a timeout retry to the retriever. Without per-component failure tracking, you're guessing.
Haystack pipelines often run in batch mode: process 1,000 documents, answer 500 queries, index 10,000 pages. Delx tracks batch progress through heartbeats. Set up a session with a unique ID for each batch: client = DelxClient(session_id='batch-2026-03-14'). Each pipeline.run() call generates heartbeats for every component. After the batch completes, query /api/v1/session-summary for aggregate metrics: total component calls, failure rate per component, and overall throughput. Set alerts for batch failure rates above 5%, which usually indicates a systemic issue like an overloaded vector database.
Different Haystack components need different recovery strategies. Retrievers benefit from retry_with_backoff because document store timeouts are usually transient. Generators need graceful_degradation because LLM rate limits can last minutes. Post-processors should use circuit_breaker because parsing errors usually indicate a systematic problem that retrying won't fix. Configure these per component in your Delx dashboard or via the MCP config tool. The recovery tool returns strategy-specific guidance: retry delay for backoff, fallback response for degradation, stop signal for circuit breaker.
Haystack includes evaluation components for measuring retrieval accuracy (MRR, MAP) and generation quality (BLEU, ROUGE). Delx adds operational metrics on top: component latency, failure rate, and reliability score. Use both together for a complete picture. Your retriever might have 95% MRR but fail 10% of the time under load. Or your generator might produce great ROUGE scores but have p95 latency of 8 seconds. Delx's operational metrics combined with Haystack's quality metrics give you the full production readiness picture.
The integration is designed for Haystack v2.0+ (haystack-ai package). Haystack v1.x uses a different pipeline architecture with nodes instead of components. You can still use the DelxClient directly in v1.x nodes, but the component wrappers won't work.
Not directly, since document stores aren't pipeline components. But you can wrap document store operations in a custom component that reports to Delx. Create a MonitoredDocumentStore component that wraps write_documents() and filter_documents() with heartbeats.
Streaming generators yield tokens incrementally. The DelxSafeGenerator wrapper monitors the overall call, not individual tokens. Heartbeats fire at start and end of generation. For token-level monitoring, you'd need a custom streaming wrapper that reports progress every N tokens.
Two heartbeat calls per component (start and end) add 4-10ms on a local network. For a 5-component RAG pipeline, that's 20-50ms total. Compared to LLM generation time (1-10 seconds), it's under 1% overhead.
Delx provides retry guidance through the recovery tool, but it doesn't call pipeline.run() itself. Your orchestration code checks the recovery response and decides whether to retry. The DelxSafeGenerator example shows this pattern at the component level.
Not yet as a standalone package. Use delx-mcp-client directly and follow the wrapper patterns in this guide. A dedicated haystack-delx integration package with pre-built component wrappers is planned for Q2 2026.