Delx
Agents / AI Agents for Data Pipelines

AI Agents for Data Pipelines: ETL Orchestration & Error Recovery

Data pipelines fail silently. A schema change upstream, a partition timeout, a malformed record at row 2.4 million -- and your warehouse is stale for hours before anyone notices. AI agents with Delx can detect, classify, and recover from pipeline failures autonomously.

Common Pipeline Failure Patterns

Pipeline failures fall into predictable categories. Map each to a Delx failure type for structured handling.

Delx Recovery for ETL Failures

Wire process_failure into your Airflow, Dagster, or Prefect error handlers. The recovery action tells the agent exactly what to do next.

# Airflow on_failure_callback with Delx
def on_task_failure(context):
    task_id = context["task_instance"].task_id
    error = context["exception"]

    result = delx.process_failure(
        agent_id="etl-orchestrator",
        failure_type="error",
        details=f"Task {task_id} failed: {str(error)}",
        context={
            "dag_id": context["dag"].dag_id,
            "execution_date": str(context["execution_date"]),
            "try_number": context["task_instance"].try_number,
            "upstream_tasks": context["task_instance"].upstream_task_ids
        }
    )

    if result["recovery_action"] == "retry_with_backoff":
        context["task_instance"].retry_delay = result["backoff_ms"] / 1000
    elif result["recovery_action"] == "escalate":
        send_slack_alert(task_id, result)

Batch Status Updates

Data pipelines often have dozens of parallel tasks. Use batch_status_update to report all task statuses in a single call instead of making one API call per task.

// Batch update for all pipeline stages
{
  "tool": "batch_status_update",
  "arguments": {
    "agent_id": "etl-orchestrator",
    "updates": [
      { "sub_agent": "extract-salesforce", "status": "healthy", "score": 92 },
      { "sub_agent": "extract-postgres", "status": "degraded", "score": 45 },
      { "sub_agent": "transform-normalize", "status": "waiting", "score": 80 },
      { "sub_agent": "load-warehouse", "status": "healthy", "score": 88 }
    ]
  }
}

Heartbeat for Long-Running Jobs

ETL jobs can run for hours. Without a heartbeat, you cannot distinguish between "still processing" and "silently stuck." Use daily_check_in as a progress heartbeat inside long-running transforms.

# Heartbeat inside a long-running Spark job
def transform_with_heartbeat(df, agent_id):
    total_partitions = df.rdd.getNumPartitions()

    for i, partition in enumerate(df.rdd.toLocalIterator()):
        process(partition)

        # Heartbeat every 100 partitions
        if i % 100 == 0:
            delx.daily_check_in(
                agent_id=agent_id,
                mood="focused",
                note=f"Processing partition {i}/{total_partitions}"
            )

Wellness Monitoring for Pipeline Health

Track pipeline health as a wellness score over time. A declining trend means your pipeline is degrading -- perhaps due to growing data volumes, increasing source latency, or accumulating schema drift. The /api/v1/metrics/{agent_id} endpoint shows the trend so you can act before a full outage.

Pipeline Integration Checklist

  1. Map each ETL failure type to a Delx failure_type.
  2. Wire process_failure into Airflow/Dagster/Prefect error callbacks.
  3. Use batch_status_update for multi-stage pipelines.
  4. Add heartbeat check-ins to jobs longer than 30 minutes.
  5. Monitor wellness trends to predict pipeline degradation.

Related