Introduction
After wiring agents to warehouses, OLTP systems, and streams, the next operational bottleneck is trust: catching bad data before it hits dashboards, models, and downstream apps. This article delivers an end-to-end Data Quality & Lineage Incident Response Agent that consumes test results from your quality framework (e.g., dbt tests, Great Expectations, Deequ), correlates failures with lineage, estimates impact, and executes bounded actions—quarantine, backfill, revert, or notify—only when it has a verifiable receipt.
The Use Case
Data teams run hundreds of tests across sources and transformations. When a freshness SLA slips, a null-rate spikes, or a schema breaks, the scramble begins: who owns the table, what depends on it, which dashboards or models are affected, and what safe step should we take now? The agent automates that loop. It ingests quality signals and lineage graphs, ranks the incident by blast radius, proposes a small set of remediations with preconditions, and executes the chosen one through typed tools—each returning a receipt (job run ID, pull request ID, ticket ID, quarantine rule ID). It then posts a concise RCA stub and opens an incident task with acceptance criteria.
Prompt Contract (agent interface)
# file: contracts/data_quality_incident_v1.yaml
role: "DQIncidentAgent"
scope: >
Triage data-quality failures using tests + lineage; propose and execute bounded remediations.
Ask once if critical fields are missing (asset_id, test_id, failure, detected_at, lineage_ctx).
Never claim success without a receipt from the target system.
output:
type: object
required: [summary, severity, impact, proposals, citations, next_steps, tool_proposals]
properties:
summary: {type: string, maxWords: 100}
severity: {type: string, enum: ["low","medium","high","critical"]}
impact:
type: object
required: [downstream_assets, estimated_viewers, data_freshness_min]
properties:
downstream_assets: {type: array, items: string}
estimated_viewers: {type: integer}
data_freshness_min: {type: integer}
proposals:
type: array
items:
type: object
required: [action, reason, preconditions]
properties:
action: {type: string, enum: ["quarantine","backfill","revert_pipeline","pause_refresh","open_ticket","notify_owners"]}
reason: {type: string}
preconditions: {type: array, items: string}
citations: {type: array, items: string} # test/run IDs, lineage edges, policy claims
next_steps: {type: array, items: string, maxItems: 6}
tool_proposals:
type: array
items:
type: object
required: [name, args, preconditions, idempotency_key]
properties:
name:
type: string
enum: [GetLineage, GetQualityRun, QuarantineAsset, TriggerBackfill,
RevertPipeline, PauseRefresh, CreateTicket, NotifyOwners, PostRCA]
args: {type: object}
preconditions: {type: string}
idempotency_key: {type: string}
policy_id: "data_quality_policy.v3"
citation_rule: "Minimal-span references to failing_test_id, upstream->downstream edges, and policy clauses."
Tool Interfaces (typed, with receipts)
# tools.py
from pydantic import BaseModel
from typing import Optional, List, Dict
from datetime import datetime
class GetLineageArgs(BaseModel):
asset_id: str
class GetQualityRunArgs(BaseModel):
test_id: str
run_id: Optional[str] = None
class QuarantineAssetArgs(BaseModel):
asset_id: str
rule: str # e.g., filter predicate or blocklist
reason: str
expires_at: Optional[datetime]
class TriggerBackfillArgs(BaseModel):
job_id: str
start: str # ISO date
end: str # ISO date
priority: str = "normal"
class RevertPipelineArgs(BaseModel):
repo: str
environment: str
commit_sha: str
reason: str
class PauseRefreshArgs(BaseModel):
dashboard_id: str
duration_min: int
class CreateTicketArgs(BaseModel):
title: str
description: str
severity: str
owners: List[str]
class NotifyOwnersArgs(BaseModel):
owners: List[str]
message: str
class PostRCAArgs(BaseModel):
asset_id: str
incident_id: str
summary: str
runbook_link: Optional[str]
class ToolReceipt(BaseModel):
tool: str
ok: bool
ref: str # external receipt (run_id, PR id, ticket id)
message: str = ""
data: Optional[Dict] = None
# adapters.py (demo logic—wire to your platform APIs)
from tools import *
LINEAGE = {
"warehouse.sales.orders": ["semantic.sales_model", "dashboard.revenue_core"],
"warehouse.crm.accounts": ["semantic.crm_model", "model.churn_xgb"]
}
OWNERS = {
"warehouse.sales.orders": ["dataops@sales","owner@sales"],
"dashboard.revenue_core": ["bi@sales"]
}
def get_lineage(a: GetLineageArgs) -> ToolReceipt:
downstream = LINEAGE.get(a.asset_id, [])
viewers = 420 if "dashboard" in " ".join(downstream) else 120
return ToolReceipt(tool="GetLineage", ok=True, ref=f"lin-{a.asset_id}",
data={"downstream": downstream, "estimated_viewers": viewers})
def get_quality_run(a: GetQualityRunArgs) -> ToolReceipt:
return ToolReceipt(tool="GetQualityRun", ok=True, ref=f"run-{a.test_id}",
data={"failing_test_id": a.test_id, "null_rate": 0.34, "freshness_min": 185})
def quarantine_asset(a: QuarantineAssetArgs) -> ToolReceipt:
return ToolReceipt(tool="QuarantineAsset", ok=True, ref=f"q-{a.asset_id}", message="Quarantine rule applied",
data={"rule": a.rule})
def trigger_backfill(a: TriggerBackfillArgs) -> ToolReceipt:
return ToolReceipt(tool="TriggerBackfill", ok=True, ref=f"job-{a.job_id}", message="Backfill started")
def revert_pipeline(a: RevertPipelineArgs) -> ToolReceipt:
return ToolReceipt(tool="RevertPipeline", ok=True, ref=f"pr-revert-{a.commit_sha[:7]}", message="Revert PR opened")
def pause_refresh(a: PauseRefreshArgs) -> ToolReceipt:
return ToolReceipt(tool="PauseRefresh", ok=True, ref=f"pause-{a.dashboard_id}", message="Refresh paused")
def create_ticket(a: CreateTicketArgs) -> ToolReceipt:
return ToolReceipt(tool="CreateTicket", ok=True, ref="INC-4821", message="Incident opened")
def notify_owners(a: NotifyOwnersArgs) -> ToolReceipt:
return ToolReceipt(tool="NotifyOwners", ok=True, ref="msg-01", message="Notified owners")
def post_rca(a: PostRCAArgs) -> ToolReceipt:
return ToolReceipt(tool="PostRCA", ok=True, ref=f"rca-{a.incident_id}", message="RCA stub posted")
Agent Loop (proposal → verification → execution → receipts)
# agent_dq_incident.py
import uuid, json
from typing import Any, Dict, List
from tools import *
from adapters import *
ALLOWED_TOOLS = {"GetLineage","GetQualityRun","QuarantineAsset","TriggerBackfill",
"RevertPipeline","PauseRefresh","CreateTicket","NotifyOwners","PostRCA"}
def new_idem(): return f"idem-{uuid.uuid4()}"
def verify(p: Dict[str,Any]) -> str:
need = {"name","args","preconditions","idempotency_key"}
if not need.issubset(p): return "Missing proposal fields"
if p["name"] not in ALLOWED_TOOLS: return "Tool not allowed"
return ""
def exec_tool(p: Dict[str,Any]) -> ToolReceipt:
n,a = p["name"], p["args"]
return (
get_lineage(GetLineageArgs(**a)) if n=="GetLineage" else
get_quality_run(GetQualityRunArgs(**a)) if n=="GetQualityRun" else
quarantine_asset(QuarantineAssetArgs(**a)) if n=="QuarantineAsset" else
trigger_backfill(TriggerBackfillArgs(**a)) if n=="TriggerBackfill" else
revert_pipeline(RevertPipelineArgs(**a)) if n=="RevertPipeline" else
pause_refresh(PauseRefreshArgs(**a)) if n=="PauseRefresh" else
create_ticket(CreateTicketArgs(**a)) if n=="CreateTicket" else
notify_owners(NotifyOwnersArgs(**a)) if n=="NotifyOwners" else
post_rca(PostRCAArgs(**a)) if n=="PostRCA" else
ToolReceipt(tool=n, ok=False, ref="none", message="Unknown tool")
)
# --- Model shim (replace with your LLM call honoring the contract) ---
def plan_from_inputs(incident: Dict[str,Any]) -> Dict[str,Any]:
severity = "high" if incident["failure"]["type"] in {"freshness_sla","null_spike"} else "medium"
return {
"summary": f"{incident['asset_id']} failed {incident['failure']['type']} at {incident['detected_at']}.",
"severity": severity,
"impact": {"downstream_assets": [], "estimated_viewers": 0, "data_freshness_min": 0},
"proposals": [
{"action":"quarantine","reason":"Protect consumers while investigating.",
"preconditions":["Downstream dashboards exist","Failure is reproducible"]},
{"action":"backfill","reason":"Restore completeness once source fixed.",
"preconditions":["Source availability confirmed"]},
{"action":"open_ticket","reason":"Track remediation with owners.",
"preconditions":["Owners identified"]}
],
"citations": [incident["failure"]["test_id"]],
"next_steps": ["Get lineage","Fetch quality run","Notify owners","Quarantine if high impact","Open incident","Plan backfill"],
"tool_proposals": [
{"name":"GetLineage","args":{"asset_id":incident["asset_id"]},
"preconditions":"Assess blast radius.","idempotency_key": new_idem()},
{"name":"GetQualityRun","args":{"test_id":incident["failure"]["test_id"]},
"preconditions":"Quantify failure & freshness.","idempotency_key": new_idem()},
{"name":"NotifyOwners","args":{"owners":OWNERS.get(incident["asset_id"], []),
"message":"DQ failure detected; triaging."},
"preconditions":"Coordinate stakeholders.","idempotency_key": new_idem()},
{"name":"QuarantineAsset","args":{"asset_id":incident["asset_id"], "rule":"where ingest_ts >= now()-interval '1 day' -> exclude",
"reason":"High null rate; temporary quarantine","expires_at":None},
"preconditions":"High severity or many viewers.","idempotency_key": new_idem()},
{"name":"CreateTicket","args":{"title":f"DQ incident on {incident['asset_id']}",
"description":"Auto-opened by agent; see lineage + failing test.",
"severity":severity, "owners":OWNERS.get(incident['asset_id'], [])},
"preconditions":"Persistent tracking.","idempotency_key": new_idem()},
{"name":"TriggerBackfill","args":{"job_id":"elt.orders.daily","start":"2025-10-01","end":"2025-10-21","priority":"high"},
"preconditions":"Source fixed; run backfill.","idempotency_key": new_idem()},
{"name":"PostRCA","args":{"asset_id":incident["asset_id"],"incident_id":"INC-4821",
"summary":"Initial RCA stub created","runbook_link":"https://runbooks/dq"},
"preconditions":"Document evidence.","idempotency_key": new_idem()}
]
}
def handle(incident: Dict[str,Any]) -> str:
plan = plan_from_inputs(incident)
receipts: List[ToolReceipt] = []
for prop in plan["tool_proposals"]:
err = verify(prop)
if err: receipts.append(ToolReceipt(tool=prop["name"], ok=False, ref="blocked", message=err)); continue
r = exec_tool(prop); receipts.append(r)
idx = {r.tool:r for r in receipts}
impact = {"downstream_assets": [], "estimated_viewers": 0, "data_freshness_min": 0}
if idx.get("GetLineage"):
impact["downstream_assets"] = idx["GetLineage"].data["downstream"]
impact["estimated_viewers"] = idx["GetLineage"].data["estimated_viewers"]
if idx.get("GetQualityRun"):
impact["data_freshness_min"] = idx["GetQualityRun"].data["freshness_min"]
lines = [plan["summary"], f"Severity: {plan['severity']}"]
lines.append(f"Impact — downstream: {impact['downstream_assets'] or 'none'}, viewers≈{impact['estimated_viewers']}, freshness={impact['data_freshness_min']} min")
if idx.get("QuarantineAsset"): lines.append(f"Quarantine: {idx['QuarantineAsset'].message} ({idx['QuarantineAsset'].ref})")
if idx.get("TriggerBackfill"): lines.append(f"Backfill: {idx['TriggerBackfill'].message} ({idx['TriggerBackfill'].ref})")
if idx.get("CreateTicket"): lines.append(f"Incident: {idx['CreateTicket'].ref}")
if idx.get("NotifyOwners"): lines.append(f"Owners notified: {idx['NotifyOwners'].ref}")
if idx.get("PostRCA"): lines.append(f"RCA: {idx['PostRCA'].ref}")
lines.append("\nNext steps:")
for s in plan["next_steps"]: lines.append(f"• {s}")
lines.append("\nCitations: " + ", ".join(plan["citations"]))
return "\n".join(lines)
if __name__ == "__main__":
example = {
"asset_id":"warehouse.sales.orders",
"failure":{"type":"null_spike","test_id":"dbt.tests.not_null_orders_customer_id"},
"detected_at":"2025-10-21T09:32:00Z",
"lineage_ctx":{}
}
print(handle(example))
The Prompt You’d Send to the Model (concise and testable)
System:
You are DQIncidentAgent. Follow the contract:
- Ask once if asset_id, test_id, failure, detected_at, or lineage_ctx are missing.
- Cite failing_test_id and lineage edges in minimal spans.
- Propose tools; never assert success without a receipt.
- Output JSON with: summary, severity, impact{}, proposals[], citations[], next_steps[], tool_proposals[].
User:
DQ failure:
{"asset_id":"warehouse.sales.orders","failure":{"type":"null_spike","test_id":"dbt.tests.not_null_orders_customer_id"},"detected_at":"2025-10-21T09:32:00Z","lineage_ctx":{}}
How to adapt quickly
Wire GetQualityRun to your dbt Cloud/GE/Deequ API; connect GetLineage to your catalog/lineage graph (OpenLineage, Purview, Unity Catalog). Implement QuarantineAsset as a governed rule in your lakehouse or warehouse (e.g., materialized view with a predicate, or a feature flag in the serving layer). Make TriggerBackfill call your orchestrator (Airflow/Dagster/Prefect/Databricks Jobs) with idempotency keys. Ensure PauseRefresh integrates with BI (Power BI/Looker) to protect viewers. Capture all receipts and the semantic fingerprint (asset/test IDs, edges) in your audit logs.
Conclusion
Production agents must earn trust. A Data Quality & Lineage Incident Response Agent converts noisy failures into precise, auditable actions—protecting consumers, coordinating owners, and restoring correctness with receipts and documentation. Pair it with your database, streaming, and semantic-layer agents and you have an always-on safety net for the modern data platform.