Self-Healing Pipelines

Building an autonomous 'Pipeline Guardian' Agent on Databricks Apps to detect failures, analyze stack traces, and auto-repair Jobs.

TIMESTAMP2025-11-01
TYPEARCHITECTURE
STATUS● PUBLISHED

01 — Context

The following documentation covers the architectural decisions, trade-offs, and implementation details for this system.

AI AgentDatabricks AppsData EngineeringAutomation

The Pipeline Guardian

Data Engineers spend 40% of their time on "Root Cause Analysis" (RCA). When a job fails at 3 AM, it's usually a standard error: OutOfMemory, SchemaMismatch, or NetworkTimeout.

We built an Autonomous Agent hosted on Databricks Apps that intercepts these failures, reads the logs, and fixes the pipeline without human intervention.

Live Architecture

Self-Healing Runtime

EVENT BUSWEBHOOK TRIGGERAGENT RUNTIMECONTEXT WINDOWREASONING ENGINETOOL SELECTORUNITY CATALOGLOGS & STATEJOBS APICOMPUTE CONTROLINCIDENT

Phase 1: The Trigger (Event Architecture)

Databricks Workflows support System Webhooks. Instead of emailing an engineer, the Job sends a JSON payload to our Agent.

// Webhook Payload sent to https://my-agent.databricksapps.com/webhook
{
  "event_type": "JOB_RUN_FAILURE",
  "workspace_id": "1234567890",
  "run_id": "9941",
  "job_id": "5521",
  "error_type": "MAX_RETRIES_EXCEEDED"
}

The Listener (FastAPI)

The entry point is a simple API endpoint running on Databricks Apps.

from fastapi import FastAPI, Request

app = FastAPI()

@app.post("/webhook")
async def handle_alert(payload: dict):
    # 1. Extract Run ID
    run_id = payload.get("run_id")
    
    # 2. Wake up the Agent
    agent.investigate(run_id)
    
    return {"status": "Investigation Started"}

Phase 2: The Cognitive Engine (Agentic Loop)

We don't just run a script; we run a ReAct Agent (Reasoning + Acting). The Agent observes the failure and decides which tools to use.


Live Demo: The Interface

Below is a live recording of the Agent in action during the last incident.

Scenario: The Agent detects a webhook event, confirms the OOM error, and auto-scales the cluster.

LIVE RECORDING: AGENT_LOGS
Pipeline GuardianActive
Run ID: #9941-A
Processing

Tool Definitions (MCP)

We expose the databricks-sdk as safe, typed tools for the LLM.

@mcp.tool()
def fetch_driver_logs(run_id: str) -> str:
    """Retrieves the last 100 lines of the specific run's driver log (stderr)."""
    w = WorkspaceClient()
    # Logic to fetch and parse logs
    return w.jobs.get_run_output(run_id).logs

@mcp.tool()
def update_job_cluster(job_id: str, memory_mb: int) -> str:
    """Resizes the job cluster configuration."""
    # Logic to patch the job settings
    return f"Job {job_id} updated to {memory_mb}MB"

The Reasoning Process

The LLM receives the webhook context and enters a thought loop:

Thought: The system reported a failure for Run 9941. I need to see why it failed. Action: fetch_driver_logs(run_id="9941") Observation: java.lang.OutOfMemoryError: Java heap space Thought: It's an OOM. The standard procedure is to increase Executor RAM by 50%. Action: update_job_cluster(job_id="5521", memory_mb=32000) Result: Success.


Phase 3: Deployment (Databricks Apps)

We deploy this as a Databricks App. This gives the Agent:

  1. Managed Identity: It runs as a Service Principal using the workspace's native authentication. No API keys needed.
  2. Serverless Compute: No VM management.
  3. Governance: All actions (Tools) are logged in Unity Catalog for audit.
# databricks.yml
app:
  name: "pipeline-guardian"
  command: ["uv", "run", "server.py"]
  resources:
    instance_profile: "agent-role"

Results

  • MTTR (Mean Time To Recovery): Reduced from 45 mins (Human) to < 2 mins (Agent).
  • Coverage: The Agent successfully auto-resolves 70% of transient errors (OOM, Spot Instance Loss).
  • Human-in-the-Loop: For complex errors (Schema Drift), the Agent posts a summary to Slack asking for approval before altering tables.