AI Automation & Agents  

AI Agents in Practice: Warehouse Cost & Performance Optimization Agent (Prompts + Code)

Introduction

After wiring agents to OLTP databases, streams/CDC, and data-quality workflows, the next operational pressure is cost and performance of the analytics warehouse/lakehouse (Snowflake, BigQuery, Redshift, Databricks SQL, Fabric). This article delivers a practical Cost & Performance Optimization Agent that monitors spend and latency, diagnoses query patterns, proposes bounded remediations (clustering/partitioning tweaks, materializations, slot/warehouse rightsizing, result-cache policies), and executes only with receipts (change tickets, job run IDs, PR IDs).


The Use Case

Analytic platforms excel at elasticity—but without discipline, costs rise and SLAs drift. Human reviews (monthly cost meetings) lag reality; ad-hoc fixes don’t stick. The agent continuously ingests billing/usage telemetry and query traces, correlates spikes with models/dashboards, ranks optimization opportunities by $/benefit and risk, and proposes safe, reversible actions behind feature flags. No success claim is made without a downstream receipt.


Prompt Contract (agent interface)

# file: contracts/warehouse_cost_perf_v1.yaml
role: "CostPerfAgent"
scope: >
  Monitor warehouse/lakehouse cost and latency; propose and execute bounded optimizations.
  Ask once if critical fields are missing (platform, project/account, telemetry windows).
  Never assert success without a receipt (ticket id, PR id, job id).
inputs:
  platform: enum["snowflake","bigquery","redshift","databricks","fabric"]
  lookback_days: integer
  sla_ms_p95: integer
  budget_monthly_usd: number
output:
  type: object
  required: [summary, hotspots, proposals, impact_model, citations, next_steps, tool_proposals]
  properties:
    summary: {type: string, maxWords: 100}
    hotspots:
      type: array
      items:
        type: object
        required: [owner, object, type, cost_usd, p95_ms, calls_7d]
        properties:
          owner: {type: string}
          object: {type: string}          # table/view/query_tag/model/dashboard
          type: {type: string}            # "table","model","dashboard","job","warehouse"
          cost_usd: {type: number}
          p95_ms: {type: number}
          calls_7d: {type: integer}
    proposals:
      type: array
      items:
        type: object
        required: [action, target, est_saving_usd, est_p95_ms_delta, risk, preconditions]
        properties:
          action: {type: string, enum: ["cluster","partition","materialize","result_cache","job_rewrite","rightsizing","schedule_shift","auto_suspend","slot_quota"]}
          target: {type: string}
          est_saving_usd: {type: number}
          est_p95_ms_delta: {type: number}
          risk: {type: string, enum: ["low","medium","high"]}
          preconditions: {type: array, items: string}
    impact_model:
      type: object
      required: [current_month_usd, projected_month_usd, projected_p95_ms]
      properties:
        current_month_usd: {type: number}
        projected_month_usd: {type: number}
        projected_p95_ms: {type: number}
    citations: {type: array, items: string}   # query ids, job ids, object names
    next_steps: {type: array, items: string, maxItems: 6}
    tool_proposals:
      type: array
      items:
        type: object
        required: [name, args, preconditions, idempotency_key]
        properties:
          name:
            type: string
            enum: [IngestUsage, IngestQueryLog, DetectHotspots, Simulate, OpenTicket,
                   ApplyRightsizing, SetAutoSuspend, CreateMaterialization, UpdatePartitioning,
                   EnableResultCache, SubmitPR]
          args: {type: object}
          preconditions: {type: string}
          idempotency_key: {type: string}
policy_id: "warehouse_cost_policy.v2"
citation_rule: "Minimal-span references to object names and query/job ids."

Tool Interfaces (typed, with receipts)

# tools.py
from pydantic import BaseModel
from typing import Optional, Dict, List

class IngestUsageArgs(BaseModel):
    platform: str
    lookback_days: int

class IngestQueryLogArgs(BaseModel):
    platform: str
    lookback_days: int
    tag_filter: Optional[str] = None

class DetectHotspotsArgs(BaseModel):
    usage: Dict
    queries: Dict
    sla_ms_p95: int

class SimulateArgs(BaseModel):
    proposal: Dict   # {action,target,params}

class OpenTicketArgs(BaseModel):
    title: str
    description: str
    owners: List[str]
    severity: str

class ApplyRightsizingArgs(BaseModel):
    target: str      # e.g., "wh.etl_large"
    size: str        # "S","M","L","XL" or slots
    autosuspend_sec: int

class SetAutoSuspendArgs(BaseModel):
    target: str
    autosuspend_sec: int

class CreateMaterializationArgs(BaseModel):
    model: str
    schedule: str    # cron or window
    ttl_hours: int

class UpdatePartitioningArgs(BaseModel):
    table: str
    partition_by: str

class EnableResultCacheArgs(BaseModel):
    scope: str       # "project"|"schema"|"query_tag"
    enabled: bool

class SubmitPRArgs(BaseModel):
    repo: str
    branch: str
    message: str
    diff: str

class ToolReceipt(BaseModel):
    tool: str
    ok: bool
    ref: str          # ticket id, job id, pr id
    message: str = ""
    data: Optional[Dict] = None
# adapters.py  (demo logic; wire to platform SDKs/APIs in production)
from tools import *
def ingest_usage(a: IngestUsageArgs) -> ToolReceipt:
    return ToolReceipt(tool="IngestUsage", ok=True, ref=f"usage-{a.lookback_days}", data={"total_usd": 48231.17})
def ingest_query_log(a: IngestQueryLogArgs) -> ToolReceipt:
    return ToolReceipt(tool="IngestQueryLog", ok=True, ref=f"qlog-{a.lookback_days}", data={"queries": [{"id":"q-991","p95_ms":4200,"cost_usd":612,"object":"model.sales_orders_daily","tag":"dash:revenue_core","calls_7d":1800}]})
def detect_hotspots(a: DetectHotspotsArgs) -> ToolReceipt:
    hs = [{"owner":"bi@sales","object":"model.sales_orders_daily","type":"model","cost_usd":3200,"p95_ms":4200,"calls_7d":1800},
          {"owner":"etl@data","object":"wh.etl_large","type":"warehouse","cost_usd":19000,"p95_ms":1800,"calls_7d":400}]
    return ToolReceipt(tool="DetectHotspots", ok=True, ref="hot-1", data={"hotspots": hs})
def simulate(a: SimulateArgs) -> ToolReceipt:
    # toy simulator
    p = a.proposal
    if p["action"]=="materialize":
        return ToolReceipt(tool="Simulate", ok=True, ref="sim-1", data={"saving_usd": 2400, "p95_ms_delta": -2200})
    if p["action"]=="rightsizing":
        return ToolReceipt(tool="Simulate", ok=True, ref="sim-2", data={"saving_usd": 6000, "p95_ms_delta": +80})
    return ToolReceipt(tool="Simulate", ok=True, ref="sim-0", data={"saving_usd": 0, "p95_ms_delta": 0})
def open_ticket(a: OpenTicketArgs) -> ToolReceipt:
    return ToolReceipt(tool="OpenTicket", ok=True, ref="COST-1742", message="Optimization ticket opened")
def apply_rightsizing(a: ApplyRightsizingArgs) -> ToolReceipt:
    return ToolReceipt(tool="ApplyRightsizing", ok=True, ref=f"chg-{a.target}", message="Warehouse resized")
def set_autosuspend(a: SetAutoSuspendArgs) -> ToolReceipt:
    return ToolReceipt(tool="SetAutoSuspend", ok=True, ref=f"sus-{a.target}", message="Auto-suspend set")
def create_materialization(a: CreateMaterializationArgs) -> ToolReceipt:
    return ToolReceipt(tool="CreateMaterialization", ok=True, ref=f"job-mat-{a.model}", message="Materialization scheduled")
def update_partitioning(a: UpdatePartitioningArgs) -> ToolReceipt:
    return ToolReceipt(tool="UpdatePartitioning", ok=True, ref=f"ddl-{a.table}", message="Partitioning updated")
def enable_result_cache(a: EnableResultCacheArgs) -> ToolReceipt:
    return ToolReceipt(tool="EnableResultCache", ok=True, ref=f"rc-{a.scope}", message="Result cache toggled")
def submit_pr(a: SubmitPRArgs) -> ToolReceipt:
    return ToolReceipt(tool="SubmitPR", ok=True, ref="PR-983", message="PR submitted")

Agent Loop (proposal → verification → execution → receipts)

# agent_cost_perf.py
import uuid, json
from typing import Any, Dict, List
from tools import *
from adapters import *

ALLOWED = {"IngestUsage","IngestQueryLog","DetectHotspots","Simulate","OpenTicket",
           "ApplyRightsizing","SetAutoSuspend","CreateMaterialization","UpdatePartitioning",
           "EnableResultCache","SubmitPR"}

def new_idem(): return f"idem-{uuid.uuid4()}"

def verify(p: Dict[str,Any]) -> str:
    need = {"name","args","preconditions","idempotency_key"}
    if not need.issubset(p): return "Missing proposal fields"
    if p["name"] not in ALLOWED: return "Tool not allowed"
    return ""

def run(p: Dict[str,Any]) -> ToolReceipt:
    n,a = p["name"], p["args"]
    return (
        ingest_usage(IngestUsageArgs(**a)) if n=="IngestUsage" else
        ingest_query_log(IngestQueryLogArgs(**a)) if n=="IngestQueryLog" else
        detect_hotspots(DetectHotspotsArgs(**a)) if n=="DetectHotspots" else
        simulate(SimulateArgs(**a)) if n=="Simulate" else
        open_ticket(OpenTicketArgs(**a)) if n=="OpenTicket" else
        apply_rightsizing(ApplyRightsizingArgs(**a)) if n=="ApplyRightsizing" else
        set_autosuspend(SetAutoSuspendArgs(**a)) if n=="SetAutoSuspend" else
        create_materialization(CreateMaterializationArgs(**a)) if n=="CreateMaterialization" else
        update_partitioning(UpdatePartitioningArgs(**a)) if n=="UpdatePartitioning" else
        enable_result_cache(EnableResultCacheArgs(**a)) if n=="EnableResultCache" else
        submit_pr(SubmitPRArgs(**a)) if n=="SubmitPR" else
        ToolReceipt(tool=n, ok=False, ref="none", message="Unknown tool")
    )

# --- Model shim (replace with your LLM honoring the contract) ---
def plan(req: Dict[str,Any]) -> Dict[str,Any]:
    return {
      "summary": f"Monitoring {req['platform']} for {req['lookback_days']}d; propose actions to meet p95 ≤ {req['sla_ms_p95']}ms and budget ${req['budget_monthly_usd']:.0f}.",
      "hotspots": [],
      "proposals": [
        {"action":"materialize","target":"model.sales_orders_daily","est_saving_usd":2400,"est_p95_ms_delta":-2200,"risk":"low",
         "preconditions":["Downstream dashboards use identical filters"]},
        {"action":"rightsizing","target":"wh.etl_large","est_saving_usd":6000,"est_p95_ms_delta":80,"risk":"medium",
         "preconditions":["Peak concurrency ≤ 8","No SLA violations during batch"]},
        {"action":"auto_suspend","target":"wh.etl_large","est_saving_usd":1200,"est_p95_ms_delta":0,"risk":"low",
         "preconditions":["Idle windows > 5 min between jobs"]}
      ],
      "impact_model": {"current_month_usd": 52000, "projected_month_usd": 42400, "projected_p95_ms": 1800},
      "citations": ["q-991","model.sales_orders_daily","wh.etl_large"],
      "next_steps": ["Ingest usage & queries","Detect hotspots","Simulate proposals","Open ticket + PR","Apply low-risk changes first"],
      "tool_proposals": [
        {"name":"IngestUsage","args":{"platform":req["platform"],"lookback_days":req["lookback_days"]},
         "preconditions":"Usage window present.","idempotency_key": new_idem()},
        {"name":"IngestQueryLog","args":{"platform":req["platform"],"lookback_days":req["lookback_days"],"tag_filter":None},
         "preconditions":"Query log accessible.","idempotency_key": new_idem()},
        {"name":"DetectHotspots","args":{"usage":{},"queries":{},"sla_ms_p95":req["sla_ms_p95"]},
         "preconditions":"Correlate cost vs p95.","idempotency_key": new_idem()},
        {"name":"Simulate","args":{"proposal":{"action":"materialize","target":"model.sales_orders_daily","params":{"ttl":"24h"}}},
         "preconditions":"Estimate savings/latency.","idempotency_key": new_idem()},
        {"name":"Simulate","args":{"proposal":{"action":"rightsizing","target":"wh.etl_large","params":{"size_to":"M","autosuspend":60}}},
         "preconditions":"Estimate savings/latency.","idempotency_key": new_idem()},
        {"name":"OpenTicket","args":{"title":"Cost/Perf optimization rollout","description":"See attached plan and sims","owners":["bi@sales","data@platform"],"severity":"medium"},
         "preconditions":"Change tracking.","idempotency_key": new_idem()},
        {"name":"CreateMaterialization","args":{"model":"model.sales_orders_daily","schedule":"hourly","ttl_hours":24},
         "preconditions":"Low-risk, reversible.","idempotency_key": new_idem()},
        {"name":"ApplyRightsizing","args":{"target":"wh.etl_large","size":"M","autosuspend_sec":60},
         "preconditions":"Batch safe window.","idempotency_key": new_idem()},
        {"name":"SetAutoSuspend","args":{"target":"wh.etl_large","autosuspend_sec":60},
         "preconditions":"Idle gaps exist.","idempotency_key": new_idem()},
        {"name":"SubmitPR","args":{"repo":"analytics/infra","branch":"feat/cost-perf-ops","message":"Add materialization + autosuspend","diff":"..."},
         "preconditions":"Infra as code.","idempotency_key": new_idem()}
      ]
    }

def handle(req: Dict[str,Any]) -> str:
    p = plan(req)
    receipts: List[ToolReceipt] = []
    for tp in p["tool_proposals"]:
        err = verify(tp)
        if err: receipts.append(ToolReceipt(tool=tp["name"], ok=False, ref="blocked", message=err)); continue
        r = run(tp); receipts.append(r)
    idx = {r.tool:r for r in receipts}
    hs = idx["DetectHotspots"].data["hotspots"] if idx.get("DetectHotspots") else []
    lines = [p["summary"], ""]
    if hs:
        lines.append("Hotspots:")
        for h in hs:
            lines.append(f"- {h['object']} ({h['type']}), owner {h['owner']}: ${h['cost_usd']:.0f}/7d, p95={h['p95_ms']}ms, calls={h['calls_7d']}")
    lines.append("\nProposals:")
    for pr in p["proposals"]:
        lines.append(f"- {pr['action']} → {pr['target']} (risk {pr['risk']}, est save ${pr['est_saving_usd']:.0f}, Δp95 {pr['est_p95_ms_delta']}ms)")
    if idx.get("CreateMaterialization"): lines.append(f"\nMaterialization: {idx['CreateMaterialization'].message} ({idx['CreateMaterialization'].ref})")
    if idx.get("ApplyRightsizing"):      lines.append(f"Rightsizing: {idx['ApplyRightsizing'].message} ({idx['ApplyRightsizing'].ref})")
    if idx.get("SetAutoSuspend"):        lines.append(f"Auto-suspend: {idx['SetAutoSuspend'].message} ({idx['SetAutoSuspend'].ref})")
    if idx.get("OpenTicket"):            lines.append(f"Change ticket: {idx['OpenTicket'].ref}")
    if idx.get("SubmitPR"):              lines.append(f"Infrastructure PR: {idx['SubmitPR'].ref}")
    im = p["impact_model"]
    lines.append(f"\nProjected impact — spend ${im['projected_month_usd']:.0f} (was ${im['current_month_usd']:.0f}), p95≈{im['projected_p95_ms']}ms")
    lines.append("\nNext steps:")
    for s in p["next_steps"]: lines.append(f"• {s}")
    lines.append("\nCitations: " + ", ".join(p["citations"]))
    return "\n".join(lines)

if __name__ == "__main__":
    example_req = {"platform":"snowflake","lookback_days":14,"sla_ms_p95":2000,"budget_monthly_usd":45000}
    print(handle(example_req))

The Prompt You’d Send to the Model (concise and testable)

System:
You are CostPerfAgent. Follow the contract:
- Ask once if platform, lookback_days, sla_ms_p95, or budget_monthly_usd are missing.
- Cite minimal spans for query/object IDs.
- Propose tools; never assert success without receipts.
- Output JSON with: summary, hotspots[], proposals[], impact_model{}, citations[], next_steps[], tool_proposals[].

User:
Optimize our Snowflake costs while keeping p95 ≤ 2000ms, 14-day lookback, budget $45k.

Rollout playbook (what works in production)

Treat every change as infra-as-code behind feature flags; run canary (small workspace/schema), measure p95 & cache hit rate, then promote. Keep golden traces of expensive queries/dashboards to guard against regressions in CI. Record receipts for each change (ticket/PR/job) and the semantic fingerprint (object names, query tags). Reassess proposals weekly; automatically revert changes that don’t deliver projected savings or that breach SLAs.


Conclusion

A Cost & Performance Optimization Agent turns warehouse tuning from sporadic heroics into a continuous, auditable loop. With typed tools, receipts, and simulations, you can cut spend while improving user-perceived latency—and do it safely across Snowflake, BigQuery, Redshift, Databricks, or Fabric.