The Pipeline Guardian
Data Engineers spend 40% of their time on "Root Cause Analysis" (RCA).
When a job fails at 3 AM, it's usually a standard error: OutOfMemory, SchemaMismatch, or NetworkTimeout.
We built an Autonomous Agent hosted on Databricks Apps that intercepts these failures, reads the logs, and fixes the pipeline without human intervention.
Self-Healing Runtime
Phase 1: The Trigger (Event Architecture)
Databricks Workflows support System Webhooks. Instead of emailing an engineer, the Job sends a JSON payload to our Agent.
// Webhook Payload sent to https://my-agent.databricksapps.com/webhook
{
"event_type": "JOB_RUN_FAILURE",
"workspace_id": "1234567890",
"run_id": "9941",
"job_id": "5521",
"error_type": "MAX_RETRIES_EXCEEDED"
}
The Listener (FastAPI)
The entry point is a simple API endpoint running on Databricks Apps.
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/webhook")
async def handle_alert(payload: dict):
# 1. Extract Run ID
run_id = payload.get("run_id")
# 2. Wake up the Agent
agent.investigate(run_id)
return {"status": "Investigation Started"}
Phase 2: The Cognitive Engine (Agentic Loop)
We don't just run a script; we run a ReAct Agent (Reasoning + Acting). The Agent observes the failure and decides which tools to use.
Live Demo: The Interface
Below is a live recording of the Agent in action during the last incident.
Scenario: The Agent detects a webhook event, confirms the OOM error, and auto-scales the cluster.
Tool Definitions (MCP)
We expose the databricks-sdk as safe, typed tools for the LLM.
@mcp.tool()
def fetch_driver_logs(run_id: str) -> str:
"""Retrieves the last 100 lines of the specific run's driver log (stderr)."""
w = WorkspaceClient()
# Logic to fetch and parse logs
return w.jobs.get_run_output(run_id).logs
@mcp.tool()
def update_job_cluster(job_id: str, memory_mb: int) -> str:
"""Resizes the job cluster configuration."""
# Logic to patch the job settings
return f"Job {job_id} updated to {memory_mb}MB"
The Reasoning Process
The LLM receives the webhook context and enters a thought loop:
Thought: The system reported a failure for Run 9941. I need to see why it failed. Action:
fetch_driver_logs(run_id="9941")Observation:java.lang.OutOfMemoryError: Java heap spaceThought: It's an OOM. The standard procedure is to increase Executor RAM by 50%. Action:update_job_cluster(job_id="5521", memory_mb=32000)Result: Success.
Phase 3: Deployment (Databricks Apps)
We deploy this as a Databricks App. This gives the Agent:
- Managed Identity: It runs as a Service Principal using the workspace's native authentication. No API keys needed.
- Serverless Compute: No VM management.
- Governance: All actions (Tools) are logged in Unity Catalog for audit.
# databricks.yml
app:
name: "pipeline-guardian"
command: ["uv", "run", "server.py"]
resources:
instance_profile: "agent-role"
Results
- MTTR (Mean Time To Recovery): Reduced from 45 mins (Human) to
< 2 mins(Agent). - Coverage: The Agent successfully auto-resolves 70% of transient errors (OOM, Spot Instance Loss).
- Human-in-the-Loop: For complex errors (Schema Drift), the Agent posts a summary to Slack asking for approval before altering tables.