Introduction
After wiring agents to OLTP databases, streams/CDC, and data-quality workflows, the next operational pressure is cost and performance of the analytics warehouse/lakehouse (Snowflake, BigQuery, Redshift, Databricks SQL, Fabric). This article delivers a practical Cost & Performance Optimization Agent that monitors spend and latency, diagnoses query patterns, proposes bounded remediations (clustering/partitioning tweaks, materializations, slot/warehouse rightsizing, result-cache policies), and executes only with receipts (change tickets, job run IDs, PR IDs).
The Use Case
Analytic platforms excel at elasticity—but without discipline, costs rise and SLAs drift. Human reviews (monthly cost meetings) lag reality; ad-hoc fixes don’t stick. The agent continuously ingests billing/usage telemetry and query traces, correlates spikes with models/dashboards, ranks optimization opportunities by $/benefit and risk, and proposes safe, reversible actions behind feature flags. No success claim is made without a downstream receipt.
Prompt Contract (agent interface)
# file: contracts/warehouse_cost_perf_v1.yaml
role: "CostPerfAgent"
scope: >
Monitor warehouse/lakehouse cost and latency; propose and execute bounded optimizations.
Ask once if critical fields are missing (platform, project/account, telemetry windows).
Never assert success without a receipt (ticket id, PR id, job id).
inputs:
platform: enum["snowflake","bigquery","redshift","databricks","fabric"]
lookback_days: integer
sla_ms_p95: integer
budget_monthly_usd: number
output:
type: object
required: [summary, hotspots, proposals, impact_model, citations, next_steps, tool_proposals]
properties:
summary: {type: string, maxWords: 100}
hotspots:
type: array
items:
type: object
required: [owner, object, type, cost_usd, p95_ms, calls_7d]
properties:
owner: {type: string}
object: {type: string} # table/view/query_tag/model/dashboard
type: {type: string} # "table","model","dashboard","job","warehouse"
cost_usd: {type: number}
p95_ms: {type: number}
calls_7d: {type: integer}
proposals:
type: array
items:
type: object
required: [action, target, est_saving_usd, est_p95_ms_delta, risk, preconditions]
properties:
action: {type: string, enum: ["cluster","partition","materialize","result_cache","job_rewrite","rightsizing","schedule_shift","auto_suspend","slot_quota"]}
target: {type: string}
est_saving_usd: {type: number}
est_p95_ms_delta: {type: number}
risk: {type: string, enum: ["low","medium","high"]}
preconditions: {type: array, items: string}
impact_model:
type: object
required: [current_month_usd, projected_month_usd, projected_p95_ms]
properties:
current_month_usd: {type: number}
projected_month_usd: {type: number}
projected_p95_ms: {type: number}
citations: {type: array, items: string} # query ids, job ids, object names
next_steps: {type: array, items: string, maxItems: 6}
tool_proposals:
type: array
items:
type: object
required: [name, args, preconditions, idempotency_key]
properties:
name:
type: string
enum: [IngestUsage, IngestQueryLog, DetectHotspots, Simulate, OpenTicket,
ApplyRightsizing, SetAutoSuspend, CreateMaterialization, UpdatePartitioning,
EnableResultCache, SubmitPR]
args: {type: object}
preconditions: {type: string}
idempotency_key: {type: string}
policy_id: "warehouse_cost_policy.v2"
citation_rule: "Minimal-span references to object names and query/job ids."
Tool Interfaces (typed, with receipts)
# tools.py
from pydantic import BaseModel
from typing import Optional, Dict, List
class IngestUsageArgs(BaseModel):
platform: str
lookback_days: int
class IngestQueryLogArgs(BaseModel):
platform: str
lookback_days: int
tag_filter: Optional[str] = None
class DetectHotspotsArgs(BaseModel):
usage: Dict
queries: Dict
sla_ms_p95: int
class SimulateArgs(BaseModel):
proposal: Dict # {action,target,params}
class OpenTicketArgs(BaseModel):
title: str
description: str
owners: List[str]
severity: str
class ApplyRightsizingArgs(BaseModel):
target: str # e.g., "wh.etl_large"
size: str # "S","M","L","XL" or slots
autosuspend_sec: int
class SetAutoSuspendArgs(BaseModel):
target: str
autosuspend_sec: int
class CreateMaterializationArgs(BaseModel):
model: str
schedule: str # cron or window
ttl_hours: int
class UpdatePartitioningArgs(BaseModel):
table: str
partition_by: str
class EnableResultCacheArgs(BaseModel):
scope: str # "project"|"schema"|"query_tag"
enabled: bool
class SubmitPRArgs(BaseModel):
repo: str
branch: str
message: str
diff: str
class ToolReceipt(BaseModel):
tool: str
ok: bool
ref: str # ticket id, job id, pr id
message: str = ""
data: Optional[Dict] = None
# adapters.py (demo logic; wire to platform SDKs/APIs in production)
from tools import *
def ingest_usage(a: IngestUsageArgs) -> ToolReceipt:
return ToolReceipt(tool="IngestUsage", ok=True, ref=f"usage-{a.lookback_days}", data={"total_usd": 48231.17})
def ingest_query_log(a: IngestQueryLogArgs) -> ToolReceipt:
return ToolReceipt(tool="IngestQueryLog", ok=True, ref=f"qlog-{a.lookback_days}", data={"queries": [{"id":"q-991","p95_ms":4200,"cost_usd":612,"object":"model.sales_orders_daily","tag":"dash:revenue_core","calls_7d":1800}]})
def detect_hotspots(a: DetectHotspotsArgs) -> ToolReceipt:
hs = [{"owner":"bi@sales","object":"model.sales_orders_daily","type":"model","cost_usd":3200,"p95_ms":4200,"calls_7d":1800},
{"owner":"etl@data","object":"wh.etl_large","type":"warehouse","cost_usd":19000,"p95_ms":1800,"calls_7d":400}]
return ToolReceipt(tool="DetectHotspots", ok=True, ref="hot-1", data={"hotspots": hs})
def simulate(a: SimulateArgs) -> ToolReceipt:
# toy simulator
p = a.proposal
if p["action"]=="materialize":
return ToolReceipt(tool="Simulate", ok=True, ref="sim-1", data={"saving_usd": 2400, "p95_ms_delta": -2200})
if p["action"]=="rightsizing":
return ToolReceipt(tool="Simulate", ok=True, ref="sim-2", data={"saving_usd": 6000, "p95_ms_delta": +80})
return ToolReceipt(tool="Simulate", ok=True, ref="sim-0", data={"saving_usd": 0, "p95_ms_delta": 0})
def open_ticket(a: OpenTicketArgs) -> ToolReceipt:
return ToolReceipt(tool="OpenTicket", ok=True, ref="COST-1742", message="Optimization ticket opened")
def apply_rightsizing(a: ApplyRightsizingArgs) -> ToolReceipt:
return ToolReceipt(tool="ApplyRightsizing", ok=True, ref=f"chg-{a.target}", message="Warehouse resized")
def set_autosuspend(a: SetAutoSuspendArgs) -> ToolReceipt:
return ToolReceipt(tool="SetAutoSuspend", ok=True, ref=f"sus-{a.target}", message="Auto-suspend set")
def create_materialization(a: CreateMaterializationArgs) -> ToolReceipt:
return ToolReceipt(tool="CreateMaterialization", ok=True, ref=f"job-mat-{a.model}", message="Materialization scheduled")
def update_partitioning(a: UpdatePartitioningArgs) -> ToolReceipt:
return ToolReceipt(tool="UpdatePartitioning", ok=True, ref=f"ddl-{a.table}", message="Partitioning updated")
def enable_result_cache(a: EnableResultCacheArgs) -> ToolReceipt:
return ToolReceipt(tool="EnableResultCache", ok=True, ref=f"rc-{a.scope}", message="Result cache toggled")
def submit_pr(a: SubmitPRArgs) -> ToolReceipt:
return ToolReceipt(tool="SubmitPR", ok=True, ref="PR-983", message="PR submitted")
Agent Loop (proposal → verification → execution → receipts)
# agent_cost_perf.py
import uuid, json
from typing import Any, Dict, List
from tools import *
from adapters import *
ALLOWED = {"IngestUsage","IngestQueryLog","DetectHotspots","Simulate","OpenTicket",
"ApplyRightsizing","SetAutoSuspend","CreateMaterialization","UpdatePartitioning",
"EnableResultCache","SubmitPR"}
def new_idem(): return f"idem-{uuid.uuid4()}"
def verify(p: Dict[str,Any]) -> str:
need = {"name","args","preconditions","idempotency_key"}
if not need.issubset(p): return "Missing proposal fields"
if p["name"] not in ALLOWED: return "Tool not allowed"
return ""
def run(p: Dict[str,Any]) -> ToolReceipt:
n,a = p["name"], p["args"]
return (
ingest_usage(IngestUsageArgs(**a)) if n=="IngestUsage" else
ingest_query_log(IngestQueryLogArgs(**a)) if n=="IngestQueryLog" else
detect_hotspots(DetectHotspotsArgs(**a)) if n=="DetectHotspots" else
simulate(SimulateArgs(**a)) if n=="Simulate" else
open_ticket(OpenTicketArgs(**a)) if n=="OpenTicket" else
apply_rightsizing(ApplyRightsizingArgs(**a)) if n=="ApplyRightsizing" else
set_autosuspend(SetAutoSuspendArgs(**a)) if n=="SetAutoSuspend" else
create_materialization(CreateMaterializationArgs(**a)) if n=="CreateMaterialization" else
update_partitioning(UpdatePartitioningArgs(**a)) if n=="UpdatePartitioning" else
enable_result_cache(EnableResultCacheArgs(**a)) if n=="EnableResultCache" else
submit_pr(SubmitPRArgs(**a)) if n=="SubmitPR" else
ToolReceipt(tool=n, ok=False, ref="none", message="Unknown tool")
)
# --- Model shim (replace with your LLM honoring the contract) ---
def plan(req: Dict[str,Any]) -> Dict[str,Any]:
return {
"summary": f"Monitoring {req['platform']} for {req['lookback_days']}d; propose actions to meet p95 ≤ {req['sla_ms_p95']}ms and budget ${req['budget_monthly_usd']:.0f}.",
"hotspots": [],
"proposals": [
{"action":"materialize","target":"model.sales_orders_daily","est_saving_usd":2400,"est_p95_ms_delta":-2200,"risk":"low",
"preconditions":["Downstream dashboards use identical filters"]},
{"action":"rightsizing","target":"wh.etl_large","est_saving_usd":6000,"est_p95_ms_delta":80,"risk":"medium",
"preconditions":["Peak concurrency ≤ 8","No SLA violations during batch"]},
{"action":"auto_suspend","target":"wh.etl_large","est_saving_usd":1200,"est_p95_ms_delta":0,"risk":"low",
"preconditions":["Idle windows > 5 min between jobs"]}
],
"impact_model": {"current_month_usd": 52000, "projected_month_usd": 42400, "projected_p95_ms": 1800},
"citations": ["q-991","model.sales_orders_daily","wh.etl_large"],
"next_steps": ["Ingest usage & queries","Detect hotspots","Simulate proposals","Open ticket + PR","Apply low-risk changes first"],
"tool_proposals": [
{"name":"IngestUsage","args":{"platform":req["platform"],"lookback_days":req["lookback_days"]},
"preconditions":"Usage window present.","idempotency_key": new_idem()},
{"name":"IngestQueryLog","args":{"platform":req["platform"],"lookback_days":req["lookback_days"],"tag_filter":None},
"preconditions":"Query log accessible.","idempotency_key": new_idem()},
{"name":"DetectHotspots","args":{"usage":{},"queries":{},"sla_ms_p95":req["sla_ms_p95"]},
"preconditions":"Correlate cost vs p95.","idempotency_key": new_idem()},
{"name":"Simulate","args":{"proposal":{"action":"materialize","target":"model.sales_orders_daily","params":{"ttl":"24h"}}},
"preconditions":"Estimate savings/latency.","idempotency_key": new_idem()},
{"name":"Simulate","args":{"proposal":{"action":"rightsizing","target":"wh.etl_large","params":{"size_to":"M","autosuspend":60}}},
"preconditions":"Estimate savings/latency.","idempotency_key": new_idem()},
{"name":"OpenTicket","args":{"title":"Cost/Perf optimization rollout","description":"See attached plan and sims","owners":["bi@sales","data@platform"],"severity":"medium"},
"preconditions":"Change tracking.","idempotency_key": new_idem()},
{"name":"CreateMaterialization","args":{"model":"model.sales_orders_daily","schedule":"hourly","ttl_hours":24},
"preconditions":"Low-risk, reversible.","idempotency_key": new_idem()},
{"name":"ApplyRightsizing","args":{"target":"wh.etl_large","size":"M","autosuspend_sec":60},
"preconditions":"Batch safe window.","idempotency_key": new_idem()},
{"name":"SetAutoSuspend","args":{"target":"wh.etl_large","autosuspend_sec":60},
"preconditions":"Idle gaps exist.","idempotency_key": new_idem()},
{"name":"SubmitPR","args":{"repo":"analytics/infra","branch":"feat/cost-perf-ops","message":"Add materialization + autosuspend","diff":"..."},
"preconditions":"Infra as code.","idempotency_key": new_idem()}
]
}
def handle(req: Dict[str,Any]) -> str:
p = plan(req)
receipts: List[ToolReceipt] = []
for tp in p["tool_proposals"]:
err = verify(tp)
if err: receipts.append(ToolReceipt(tool=tp["name"], ok=False, ref="blocked", message=err)); continue
r = run(tp); receipts.append(r)
idx = {r.tool:r for r in receipts}
hs = idx["DetectHotspots"].data["hotspots"] if idx.get("DetectHotspots") else []
lines = [p["summary"], ""]
if hs:
lines.append("Hotspots:")
for h in hs:
lines.append(f"- {h['object']} ({h['type']}), owner {h['owner']}: ${h['cost_usd']:.0f}/7d, p95={h['p95_ms']}ms, calls={h['calls_7d']}")
lines.append("\nProposals:")
for pr in p["proposals"]:
lines.append(f"- {pr['action']} → {pr['target']} (risk {pr['risk']}, est save ${pr['est_saving_usd']:.0f}, Δp95 {pr['est_p95_ms_delta']}ms)")
if idx.get("CreateMaterialization"): lines.append(f"\nMaterialization: {idx['CreateMaterialization'].message} ({idx['CreateMaterialization'].ref})")
if idx.get("ApplyRightsizing"): lines.append(f"Rightsizing: {idx['ApplyRightsizing'].message} ({idx['ApplyRightsizing'].ref})")
if idx.get("SetAutoSuspend"): lines.append(f"Auto-suspend: {idx['SetAutoSuspend'].message} ({idx['SetAutoSuspend'].ref})")
if idx.get("OpenTicket"): lines.append(f"Change ticket: {idx['OpenTicket'].ref}")
if idx.get("SubmitPR"): lines.append(f"Infrastructure PR: {idx['SubmitPR'].ref}")
im = p["impact_model"]
lines.append(f"\nProjected impact — spend ${im['projected_month_usd']:.0f} (was ${im['current_month_usd']:.0f}), p95≈{im['projected_p95_ms']}ms")
lines.append("\nNext steps:")
for s in p["next_steps"]: lines.append(f"• {s}")
lines.append("\nCitations: " + ", ".join(p["citations"]))
return "\n".join(lines)
if __name__ == "__main__":
example_req = {"platform":"snowflake","lookback_days":14,"sla_ms_p95":2000,"budget_monthly_usd":45000}
print(handle(example_req))
The Prompt You’d Send to the Model (concise and testable)
System:
You are CostPerfAgent. Follow the contract:
- Ask once if platform, lookback_days, sla_ms_p95, or budget_monthly_usd are missing.
- Cite minimal spans for query/object IDs.
- Propose tools; never assert success without receipts.
- Output JSON with: summary, hotspots[], proposals[], impact_model{}, citations[], next_steps[], tool_proposals[].
User:
Optimize our Snowflake costs while keeping p95 ≤ 2000ms, 14-day lookback, budget $45k.
Rollout playbook (what works in production)
Treat every change as infra-as-code behind feature flags; run canary (small workspace/schema), measure p95 & cache hit rate, then promote. Keep golden traces of expensive queries/dashboards to guard against regressions in CI. Record receipts for each change (ticket/PR/job) and the semantic fingerprint (object names, query tags). Reassess proposals weekly; automatically revert changes that don’t deliver projected savings or that breach SLAs.
Conclusion
A Cost & Performance Optimization Agent turns warehouse tuning from sporadic heroics into a continuous, auditable loop. With typed tools, receipts, and simulations, you can cut spend while improving user-perceived latency—and do it safely across Snowflake, BigQuery, Redshift, Databricks, or Fabric.