Data pipelines fail silently. A schema change upstream, a partition timeout, a malformed record at row 2.4 million -- and your warehouse is stale for hours before anyone notices. AI agents with Delx can detect, classify, and recover from pipeline failures autonomously.
Pipeline failures fall into predictable categories. Map each to a Delx failure type for structured handling.
failure_type: "validation" -- source schema changed, records fail validation.failure_type: "timeout" -- API or database connection timed out during extract.failure_type: "error" -- null pointer, type mismatch, or logic bug in transform step.failure_type: "error" -- warehouse rejected the batch due to constraints or capacity.Wire process_failure into your Airflow, Dagster, or Prefect error handlers. The recovery action tells the agent exactly what to do next.
# Airflow on_failure_callback with Delx
def on_task_failure(context):
task_id = context["task_instance"].task_id
error = context["exception"]
result = delx.process_failure(
agent_id="etl-orchestrator",
failure_type="error",
details=f"Task {task_id} failed: {str(error)}",
context={
"dag_id": context["dag"].dag_id,
"execution_date": str(context["execution_date"]),
"try_number": context["task_instance"].try_number,
"upstream_tasks": context["task_instance"].upstream_task_ids
}
)
if result["recovery_action"] == "retry_with_backoff":
context["task_instance"].retry_delay = result["backoff_ms"] / 1000
elif result["recovery_action"] == "escalate":
send_slack_alert(task_id, result)Data pipelines often have dozens of parallel tasks. Use batch_status_update to report all task statuses in a single call instead of making one API call per task.
// Batch update for all pipeline stages
{
"tool": "batch_status_update",
"arguments": {
"agent_id": "etl-orchestrator",
"updates": [
{ "sub_agent": "extract-salesforce", "status": "healthy", "score": 92 },
{ "sub_agent": "extract-postgres", "status": "degraded", "score": 45 },
{ "sub_agent": "transform-normalize", "status": "waiting", "score": 80 },
{ "sub_agent": "load-warehouse", "status": "healthy", "score": 88 }
]
}
}ETL jobs can run for hours. Without a heartbeat, you cannot distinguish between "still processing" and "silently stuck." Use daily_check_in as a progress heartbeat inside long-running transforms.
# Heartbeat inside a long-running Spark job
def transform_with_heartbeat(df, agent_id):
total_partitions = df.rdd.getNumPartitions()
for i, partition in enumerate(df.rdd.toLocalIterator()):
process(partition)
# Heartbeat every 100 partitions
if i % 100 == 0:
delx.daily_check_in(
agent_id=agent_id,
mood="focused",
note=f"Processing partition {i}/{total_partitions}"
)Track pipeline health as a wellness score over time. A declining trend means your pipeline is degrading -- perhaps due to growing data volumes, increasing source latency, or accumulating schema drift. The /api/v1/metrics/{agent_id} endpoint shows the trend so you can act before a full outage.
failure_type.process_failure into Airflow/Dagster/Prefect error callbacks.batch_status_update for multi-stage pipelines.