AI Automation & Agents  

AI Agents in Practice: Data Quality & Lineage Incident Response Agent (Prompts + Code)

Introduction

After wiring agents to warehouses, OLTP systems, and streams, the next operational bottleneck is trust: catching bad data before it hits dashboards, models, and downstream apps. This article delivers an end-to-end Data Quality & Lineage Incident Response Agent that consumes test results from your quality framework (e.g., dbt tests, Great Expectations, Deequ), correlates failures with lineage, estimates impact, and executes bounded actions—quarantine, backfill, revert, or notify—only when it has a verifiable receipt.


The Use Case

Data teams run hundreds of tests across sources and transformations. When a freshness SLA slips, a null-rate spikes, or a schema breaks, the scramble begins: who owns the table, what depends on it, which dashboards or models are affected, and what safe step should we take now? The agent automates that loop. It ingests quality signals and lineage graphs, ranks the incident by blast radius, proposes a small set of remediations with preconditions, and executes the chosen one through typed tools—each returning a receipt (job run ID, pull request ID, ticket ID, quarantine rule ID). It then posts a concise RCA stub and opens an incident task with acceptance criteria.


Prompt Contract (agent interface)

# file: contracts/data_quality_incident_v1.yaml
role: "DQIncidentAgent"
scope: >
  Triage data-quality failures using tests + lineage; propose and execute bounded remediations.
  Ask once if critical fields are missing (asset_id, test_id, failure, detected_at, lineage_ctx).
  Never claim success without a receipt from the target system.
output:
  type: object
  required: [summary, severity, impact, proposals, citations, next_steps, tool_proposals]
  properties:
    summary: {type: string, maxWords: 100}
    severity: {type: string, enum: ["low","medium","high","critical"]}
    impact:
      type: object
      required: [downstream_assets, estimated_viewers, data_freshness_min]
      properties:
        downstream_assets: {type: array, items: string}
        estimated_viewers: {type: integer}
        data_freshness_min: {type: integer}
    proposals:
      type: array
      items:
        type: object
        required: [action, reason, preconditions]
        properties:
          action: {type: string, enum: ["quarantine","backfill","revert_pipeline","pause_refresh","open_ticket","notify_owners"]}
          reason: {type: string}
          preconditions: {type: array, items: string}
    citations: {type: array, items: string}   # test/run IDs, lineage edges, policy claims
    next_steps: {type: array, items: string, maxItems: 6}
    tool_proposals:
      type: array
      items:
        type: object
        required: [name, args, preconditions, idempotency_key]
        properties:
          name:
            type: string
            enum: [GetLineage, GetQualityRun, QuarantineAsset, TriggerBackfill,
                   RevertPipeline, PauseRefresh, CreateTicket, NotifyOwners, PostRCA]
          args: {type: object}
          preconditions: {type: string}
          idempotency_key: {type: string}
policy_id: "data_quality_policy.v3"
citation_rule: "Minimal-span references to failing_test_id, upstream->downstream edges, and policy clauses."

Tool Interfaces (typed, with receipts)

# tools.py
from pydantic import BaseModel
from typing import Optional, List, Dict
from datetime import datetime

class GetLineageArgs(BaseModel):
    asset_id: str

class GetQualityRunArgs(BaseModel):
    test_id: str
    run_id: Optional[str] = None

class QuarantineAssetArgs(BaseModel):
    asset_id: str
    rule: str                 # e.g., filter predicate or blocklist
    reason: str
    expires_at: Optional[datetime]

class TriggerBackfillArgs(BaseModel):
    job_id: str
    start: str                # ISO date
    end: str                  # ISO date
    priority: str = "normal"

class RevertPipelineArgs(BaseModel):
    repo: str
    environment: str
    commit_sha: str
    reason: str

class PauseRefreshArgs(BaseModel):
    dashboard_id: str
    duration_min: int

class CreateTicketArgs(BaseModel):
    title: str
    description: str
    severity: str
    owners: List[str]

class NotifyOwnersArgs(BaseModel):
    owners: List[str]
    message: str

class PostRCAArgs(BaseModel):
    asset_id: str
    incident_id: str
    summary: str
    runbook_link: Optional[str]

class ToolReceipt(BaseModel):
    tool: str
    ok: bool
    ref: str                  # external receipt (run_id, PR id, ticket id)
    message: str = ""
    data: Optional[Dict] = None
# adapters.py  (demo logic—wire to your platform APIs)
from tools import *
LINEAGE = {
  "warehouse.sales.orders": ["semantic.sales_model", "dashboard.revenue_core"],
  "warehouse.crm.accounts": ["semantic.crm_model", "model.churn_xgb"]
}
OWNERS = {
  "warehouse.sales.orders": ["dataops@sales","owner@sales"],
  "dashboard.revenue_core": ["bi@sales"]
}

def get_lineage(a: GetLineageArgs) -> ToolReceipt:
    downstream = LINEAGE.get(a.asset_id, [])
    viewers = 420 if "dashboard" in " ".join(downstream) else 120
    return ToolReceipt(tool="GetLineage", ok=True, ref=f"lin-{a.asset_id}",
                       data={"downstream": downstream, "estimated_viewers": viewers})

def get_quality_run(a: GetQualityRunArgs) -> ToolReceipt:
    return ToolReceipt(tool="GetQualityRun", ok=True, ref=f"run-{a.test_id}",
                       data={"failing_test_id": a.test_id, "null_rate": 0.34, "freshness_min": 185})

def quarantine_asset(a: QuarantineAssetArgs) -> ToolReceipt:
    return ToolReceipt(tool="QuarantineAsset", ok=True, ref=f"q-{a.asset_id}", message="Quarantine rule applied",
                       data={"rule": a.rule})

def trigger_backfill(a: TriggerBackfillArgs) -> ToolReceipt:
    return ToolReceipt(tool="TriggerBackfill", ok=True, ref=f"job-{a.job_id}", message="Backfill started")

def revert_pipeline(a: RevertPipelineArgs) -> ToolReceipt:
    return ToolReceipt(tool="RevertPipeline", ok=True, ref=f"pr-revert-{a.commit_sha[:7]}", message="Revert PR opened")

def pause_refresh(a: PauseRefreshArgs) -> ToolReceipt:
    return ToolReceipt(tool="PauseRefresh", ok=True, ref=f"pause-{a.dashboard_id}", message="Refresh paused")

def create_ticket(a: CreateTicketArgs) -> ToolReceipt:
    return ToolReceipt(tool="CreateTicket", ok=True, ref="INC-4821", message="Incident opened")

def notify_owners(a: NotifyOwnersArgs) -> ToolReceipt:
    return ToolReceipt(tool="NotifyOwners", ok=True, ref="msg-01", message="Notified owners")

def post_rca(a: PostRCAArgs) -> ToolReceipt:
    return ToolReceipt(tool="PostRCA", ok=True, ref=f"rca-{a.incident_id}", message="RCA stub posted")

Agent Loop (proposal → verification → execution → receipts)

# agent_dq_incident.py
import uuid, json
from typing import Any, Dict, List
from tools import *
from adapters import *

ALLOWED_TOOLS = {"GetLineage","GetQualityRun","QuarantineAsset","TriggerBackfill",
                 "RevertPipeline","PauseRefresh","CreateTicket","NotifyOwners","PostRCA"}

def new_idem(): return f"idem-{uuid.uuid4()}"

def verify(p: Dict[str,Any]) -> str:
    need = {"name","args","preconditions","idempotency_key"}
    if not need.issubset(p): return "Missing proposal fields"
    if p["name"] not in ALLOWED_TOOLS: return "Tool not allowed"
    return ""

def exec_tool(p: Dict[str,Any]) -> ToolReceipt:
    n,a = p["name"], p["args"]
    return (
        get_lineage(GetLineageArgs(**a))         if n=="GetLineage" else
        get_quality_run(GetQualityRunArgs(**a))  if n=="GetQualityRun" else
        quarantine_asset(QuarantineAssetArgs(**a)) if n=="QuarantineAsset" else
        trigger_backfill(TriggerBackfillArgs(**a)) if n=="TriggerBackfill" else
        revert_pipeline(RevertPipelineArgs(**a))   if n=="RevertPipeline" else
        pause_refresh(PauseRefreshArgs(**a))       if n=="PauseRefresh" else
        create_ticket(CreateTicketArgs(**a))       if n=="CreateTicket" else
        notify_owners(NotifyOwnersArgs(**a))       if n=="NotifyOwners" else
        post_rca(PostRCAArgs(**a))                 if n=="PostRCA" else
        ToolReceipt(tool=n, ok=False, ref="none", message="Unknown tool")
    )

# --- Model shim (replace with your LLM call honoring the contract) ---
def plan_from_inputs(incident: Dict[str,Any]) -> Dict[str,Any]:
    severity = "high" if incident["failure"]["type"] in {"freshness_sla","null_spike"} else "medium"
    return {
      "summary": f"{incident['asset_id']} failed {incident['failure']['type']} at {incident['detected_at']}.",
      "severity": severity,
      "impact": {"downstream_assets": [], "estimated_viewers": 0, "data_freshness_min": 0},
      "proposals": [
        {"action":"quarantine","reason":"Protect consumers while investigating.",
         "preconditions":["Downstream dashboards exist","Failure is reproducible"]},
        {"action":"backfill","reason":"Restore completeness once source fixed.",
         "preconditions":["Source availability confirmed"]},
        {"action":"open_ticket","reason":"Track remediation with owners.",
         "preconditions":["Owners identified"]}
      ],
      "citations": [incident["failure"]["test_id"]],
      "next_steps": ["Get lineage","Fetch quality run","Notify owners","Quarantine if high impact","Open incident","Plan backfill"],
      "tool_proposals": [
        {"name":"GetLineage","args":{"asset_id":incident["asset_id"]},
         "preconditions":"Assess blast radius.","idempotency_key": new_idem()},
        {"name":"GetQualityRun","args":{"test_id":incident["failure"]["test_id"]},
         "preconditions":"Quantify failure & freshness.","idempotency_key": new_idem()},
        {"name":"NotifyOwners","args":{"owners":OWNERS.get(incident["asset_id"], []),
                                       "message":"DQ failure detected; triaging."},
         "preconditions":"Coordinate stakeholders.","idempotency_key": new_idem()},
        {"name":"QuarantineAsset","args":{"asset_id":incident["asset_id"], "rule":"where ingest_ts >= now()-interval '1 day' -> exclude",
                                          "reason":"High null rate; temporary quarantine","expires_at":None},
         "preconditions":"High severity or many viewers.","idempotency_key": new_idem()},
        {"name":"CreateTicket","args":{"title":f"DQ incident on {incident['asset_id']}",
                                       "description":"Auto-opened by agent; see lineage + failing test.",
                                       "severity":severity, "owners":OWNERS.get(incident['asset_id'], [])},
         "preconditions":"Persistent tracking.","idempotency_key": new_idem()},
        {"name":"TriggerBackfill","args":{"job_id":"elt.orders.daily","start":"2025-10-01","end":"2025-10-21","priority":"high"},
         "preconditions":"Source fixed; run backfill.","idempotency_key": new_idem()},
        {"name":"PostRCA","args":{"asset_id":incident["asset_id"],"incident_id":"INC-4821",
                                   "summary":"Initial RCA stub created","runbook_link":"https://runbooks/dq"},
         "preconditions":"Document evidence.","idempotency_key": new_idem()}
      ]
    }

def handle(incident: Dict[str,Any]) -> str:
    plan = plan_from_inputs(incident)
    receipts: List[ToolReceipt] = []
    for prop in plan["tool_proposals"]:
        err = verify(prop)
        if err: receipts.append(ToolReceipt(tool=prop["name"], ok=False, ref="blocked", message=err)); continue
        r = exec_tool(prop); receipts.append(r)
    idx = {r.tool:r for r in receipts}
    impact = {"downstream_assets": [], "estimated_viewers": 0, "data_freshness_min": 0}
    if idx.get("GetLineage"): 
        impact["downstream_assets"] = idx["GetLineage"].data["downstream"]
        impact["estimated_viewers"] = idx["GetLineage"].data["estimated_viewers"]
    if idx.get("GetQualityRun"): 
        impact["data_freshness_min"] = idx["GetQualityRun"].data["freshness_min"]

    lines = [plan["summary"], f"Severity: {plan['severity']}"]
    lines.append(f"Impact — downstream: {impact['downstream_assets'] or 'none'}, viewers≈{impact['estimated_viewers']}, freshness={impact['data_freshness_min']} min")
    if idx.get("QuarantineAsset"): lines.append(f"Quarantine: {idx['QuarantineAsset'].message} ({idx['QuarantineAsset'].ref})")
    if idx.get("TriggerBackfill"): lines.append(f"Backfill: {idx['TriggerBackfill'].message} ({idx['TriggerBackfill'].ref})")
    if idx.get("CreateTicket"):    lines.append(f"Incident: {idx['CreateTicket'].ref}")
    if idx.get("NotifyOwners"):    lines.append(f"Owners notified: {idx['NotifyOwners'].ref}")
    if idx.get("PostRCA"):         lines.append(f"RCA: {idx['PostRCA'].ref}")
    lines.append("\nNext steps:")
    for s in plan["next_steps"]: lines.append(f"• {s}")
    lines.append("\nCitations: " + ", ".join(plan["citations"]))
    return "\n".join(lines)

if __name__ == "__main__":
    example = {
      "asset_id":"warehouse.sales.orders",
      "failure":{"type":"null_spike","test_id":"dbt.tests.not_null_orders_customer_id"},
      "detected_at":"2025-10-21T09:32:00Z",
      "lineage_ctx":{}
    }
    print(handle(example))

The Prompt You’d Send to the Model (concise and testable)

System:
You are DQIncidentAgent. Follow the contract:
- Ask once if asset_id, test_id, failure, detected_at, or lineage_ctx are missing.
- Cite failing_test_id and lineage edges in minimal spans.
- Propose tools; never assert success without a receipt.
- Output JSON with: summary, severity, impact{}, proposals[], citations[], next_steps[], tool_proposals[].

User:
DQ failure:
{"asset_id":"warehouse.sales.orders","failure":{"type":"null_spike","test_id":"dbt.tests.not_null_orders_customer_id"},"detected_at":"2025-10-21T09:32:00Z","lineage_ctx":{}}

How to adapt quickly

Wire GetQualityRun to your dbt Cloud/GE/Deequ API; connect GetLineage to your catalog/lineage graph (OpenLineage, Purview, Unity Catalog). Implement QuarantineAsset as a governed rule in your lakehouse or warehouse (e.g., materialized view with a predicate, or a feature flag in the serving layer). Make TriggerBackfill call your orchestrator (Airflow/Dagster/Prefect/Databricks Jobs) with idempotency keys. Ensure PauseRefresh integrates with BI (Power BI/Looker) to protect viewers. Capture all receipts and the semantic fingerprint (asset/test IDs, edges) in your audit logs.


Conclusion

Production agents must earn trust. A Data Quality & Lineage Incident Response Agent converts noisy failures into precise, auditable actions—protecting consumers, coordinating owners, and restoring correctness with receipts and documentation. Pair it with your database, streaming, and semantic-layer agents and you have an always-on safety net for the modern data platform.