AI Automation & Agents  

AI Agents in Practice: Experimentation & A/B Rollout Orchestrator (Prompts + Code)

Introduction

Once agents touch production user flows, the hardest problem is changing things safely. Feature prompts, retrieval policies, ranking models, and pricing algorithms all need controlled trials, guardrails, and rapid rollback. This article delivers an Experimentation & A/B Rollout Orchestrator Agent that proposes experiments, registers them, allocates traffic, watches guardrails, and rolls forward/back only with receipts (experiment IDs, run IDs, rollout change IDs). It treats experiments as contracts, not ad-hoc flags.


The Use Case

Product and data teams run many concurrent trials: copy variants, prompt policies, search rankers, recommender models, pricing heuristics. Without discipline, results get biased (peeking), guardrails get violated (latency, safety), and rollbacks are messy. The agent encodes your experimentation policy, integrates with your flag provider and metrics store, and turns “let’s test this” into a typed, auditable change that can be promoted or reverted in seconds.


Prompt Contract (agent interface)

# file: contracts/ab_orchestrator_v1.yaml
role: "ABOrchestratorAgent"
scope: >
  Create, run, and decide A/B/n experiments with pre-registered metrics and guardrails.
  Ask once if critical fields are missing (treatment spec, primary metric, MDE, duration cap).
  Never assert success without a receipt (exp_id, run_id, change_id).
inputs:
  domain: enum["prompts","policies","ranking","pricing","ui"]
  treatment_spec: object         # structured definition of variant(s)
  primary_metric: string         # e.g., "signup_conv", "CTR", "CSAT"
  guardrails:                    # hard stops
    - {name: string, threshold: number, direction: enum["<",">"]}
  mde: number                    # minimum detectable effect in pct points or %
  max_duration_days: integer
  max_concurrent: integer
output:
  type: object
  required: [summary, registration, allocation, monitoring, decision, citations, tool_proposals]
  properties:
    summary: {type: string, maxWords: 100}
    registration:
      type: object
      required: [exp_id, primary_metric, metrics, hypothesis]
    allocation:
      type: object
      required: [arms, traffic_split, exposure_rules]
    monitoring:
      type: object
      required: [run_id, dashboards, checkpoints]
    decision:
      type: object
      required: [state, rationale, receipts]
      properties:
        state: {type: string, enum: ["running","guardrail_stop","promote","rollback","inconclusive"]}
        rationale: {type: string}
        receipts: {type: array, items: string}
    citations: {type: array, items: string}     # policy/rule ids
    tool_proposals:
      type: array
      items:
        type: object
        required: [name, args, preconditions, idempotency_key]
        properties:
          name:
            type: string
            enum: [RegisterExperiment, CreateFlag, StartRun, AllocateTraffic,
                   CheckMetrics, StopRun, PromoteVariant, RollbackVariant,
                   SnapshotDashboard, OpenTicket]
          args: {type: object}
          preconditions: {type: string}
          idempotency_key: {type: string}
policy_id: "exp_policy.v5"
citation_rule: "Minimal-span references to primary metric, guardrail thresholds, and policy clauses."

Tool Interfaces (typed, with receipts)

# tools.py
from pydantic import BaseModel
from typing import List, Dict, Optional

class RegisterExperimentArgs(BaseModel):
    domain: str
    primary_metric: str
    metrics: List[str]
    hypothesis: str
    mde: float
    max_duration_days: int

class CreateFlagArgs(BaseModel):
    key: str
    variants: List[str]      # ["control","treatmentA","treatmentB"]

class StartRunArgs(BaseModel):
    exp_id: str
    population: str          # "all_users","new_users","us_only", etc.
    run_name: str

class AllocateTrafficArgs(BaseModel):
    flag_key: str
    split: Dict[str, int]    # {"control": 50, "treatmentA": 50}
    exposure_rules: List[str]

class CheckMetricsArgs(BaseModel):
    run_id: str
    primary_metric: str
    guardrails: List[Dict]   # [{name, threshold, direction}]
    lookback_hours: int

class StopRunArgs(BaseModel):
    run_id: str
    reason: str

class PromoteVariantArgs(BaseModel):
    flag_key: str
    variant: str

class RollbackVariantArgs(BaseModel):
    flag_key: str
    to_variant: str

class SnapshotDashboardArgs(BaseModel):
    run_id: str

class OpenTicketArgs(BaseModel):
    title: str
    description: str
    severity: str
    owners: List[str]

class ToolReceipt(BaseModel):
    tool: str
    ok: bool
    ref: str                # exp_id, run_id, change_id, dashboard_snapshot_id
    message: str = ""
    data: Optional[Dict] = None
# adapters.py (demo logic; wire to your flag service, metrics store, and BI)
from tools import *
import uuid, random

def register_experiment(a: RegisterExperimentArgs) -> ToolReceipt:
    return ToolReceipt(tool="RegisterExperiment", ok=True, ref=f"EXP-{uuid.uuid4().hex[:8]}",
                       data={"primary_metric": a.primary_metric, "metrics": a.metrics, "mde": a.mde})

def create_flag(a: CreateFlagArgs) -> ToolReceipt:
    return ToolReceipt(tool="CreateFlag", ok=True, ref=f"FLAG-{a.key}", message="Flag created")

def start_run(a: StartRunArgs) -> ToolReceipt:
    return ToolReceipt(tool="StartRun", ok=True, ref=f"RUN-{uuid.uuid4().hex[:8]}", message="Run started")

def allocate_traffic(a: AllocateTrafficArgs) -> ToolReceipt:
    return ToolReceipt(tool="AllocateTraffic", ok=True, ref=f"ALLOC-{a.flag_key}", data={"split": a.split})

def check_metrics(a: CheckMetricsArgs) -> ToolReceipt:
    # toy metrics & guardrails
    delta = round(random.uniform(-0.04, 0.06), 3)  # treatment - control
    guardrail_breaches = []
    for g in a.guardrails:
        obs = random.uniform(0.0, 1.0)
        breached = (obs > g["threshold"]) if g["direction"] == ">" else (obs < g["threshold"])
        if breached: guardrail_breaches.append({"name": g["name"], "observed": round(obs,3)})
    state = "ok" if not guardrail_breaches else "breach"
    return ToolReceipt(tool="CheckMetrics", ok=True, ref=f"MET-{a.run_id}", data={"delta": delta, "guardrails": guardrail_breaches, "state": state})

def stop_run(a: StopRunArgs) -> ToolReceipt:
    return ToolReceipt(tool="StopRun", ok=True, ref=f"STOP-{a.run_id}", message=a.reason)

def promote_variant(a: PromoteVariantArgs) -> ToolReceipt:
    return ToolReceipt(tool="PromoteVariant", ok=True, ref=f"PROM-{a.variant}", message="Variant promoted")

def rollback_variant(a: RollbackVariantArgs) -> ToolReceipt:
    return ToolReceipt(tool="RollbackVariant", ok=True, ref=f"RB-{a.to_variant}", message="Rolled back")

def snapshot_dashboard(a: SnapshotDashboardArgs) -> ToolReceipt:
    return ToolReceipt(tool="SnapshotDashboard", ok=True, ref=f"DASH-{a.run_id}", message="Dashboard snapshotted")

def open_ticket(a: OpenTicketArgs) -> ToolReceipt:
    return ToolReceipt(tool="OpenTicket", ok=True, ref="EXP-INC-1024", message="Experiment incident opened")

Agent Loop (proposal → verification → execution → receipts)

# agent_ab_orchestrator.py
import uuid
from typing import Any, Dict, List
from tools import *
from adapters import *

ALLOWED = {"RegisterExperiment","CreateFlag","StartRun","AllocateTraffic",
           "CheckMetrics","StopRun","PromoteVariant","RollbackVariant",
           "SnapshotDashboard","OpenTicket"}

def new_idem(): return f"idem-{uuid.uuid4()}"

def verify(p: Dict[str,Any]) -> str:
    need = {"name","args","preconditions","idempotency_key"}
    if not need.issubset(p): return "Missing fields"
    if p["name"] not in ALLOWED: return "Tool not allowed"
    return ""

# --- Planner (replace with your LLM honoring the contract) ---
def plan(req: Dict[str,Any]) -> Dict[str,Any]:
    flag_key = f"{req['domain']}.exp"
    return {
      "summary": f"A/B orchestrated for {req['domain']} on {req['primary_metric']} with MDE {req['mde']}.",
      "registration": {},
      "allocation": {},
      "monitoring": {},
      "decision": {"state":"running","rationale":"initialization","receipts":[]},
      "citations": ["exp_policy.v5", f"guardrails:{','.join([g['name'] for g in req['guardrails']])}"],
      "tool_proposals": [
        {"name":"RegisterExperiment","args":{"domain":req["domain"],
                                             "primary_metric":req["primary_metric"],
                                             "metrics":[req["primary_metric"],"latency_p95","defect_rate"],
                                             "hypothesis":"Treatment improves primary metric without guardrail breaches.",
                                             "mde":req["mde"], "max_duration_days":req["max_duration_days"]},
         "preconditions":"Register metrics & hypothesis.","idempotency_key": new_idem()},
        {"name":"CreateFlag","args":{"key":flag_key,"variants":["control","treatmentA"]},
         "preconditions":"Expose variants via flag.","idempotency_key": new_idem()},
        {"name":"StartRun","args":{"exp_id":"", "population":"all_users","run_name":"main"},
         "preconditions":"Launch run.","idempotency_key": new_idem()},
        {"name":"AllocateTraffic","args":{"flag_key":flag_key,"split":{"control":50,"treatmentA":50},
                                          "exposure_rules":["logged_in==true"]},
         "preconditions":"Even split to start.","idempotency_key": new_idem()},
        {"name":"SnapshotDashboard","args":{"run_id":""},
         "preconditions":"Create monitoring view.","idempotency_key": new_idem()},
        {"name":"CheckMetrics","args":{"run_id":"", "primary_metric":req["primary_metric"],
                                       "guardrails":req["guardrails"], "lookback_hours":24},
         "preconditions":"Daily checkpoint.","idempotency_key": new_idem()}
      ]
    }

def handle(req: Dict[str,Any]) -> str:
    p = plan(req); receipts: List[ToolReceipt] = []

    # 1) Register → Flag → Run
    for tp in p["tool_proposals"][:2]:
        err = verify(tp); receipts.append(register_experiment(RegisterExperimentArgs(**tp["args"])) if tp["name"]=="RegisterExperiment" else create_flag(CreateFlagArgs(**tp["args"])) ) if not err else receipts.append(ToolReceipt(tool=tp["name"], ok=False, ref="blocked", message=err))
    exp_id = receipts[0].ref if receipts and receipts[0].tool=="RegisterExperiment" else "EXP-UNK"

    # Start run with exp_id
    sr = start_run(StartRunArgs(exp_id=exp_id, population="all_users", run_name="main")); receipts.append(sr)
    run_id = sr.ref

    # Allocation & dashboard snapshot
    receipts.append(allocate_traffic(AllocateTrafficArgs(flag_key=f"{req['domain']}.exp", split={"control":50,"treatmentA":50}, exposure_rules=["logged_in==true"])))
    receipts.append(snapshot_dashboard(SnapshotDashboardArgs(run_id=run_id)))

    # 2) Monitoring checkpoint → decision
    cm = check_metrics(CheckMetricsArgs(run_id=run_id, primary_metric=req["primary_metric"], guardrails=req["guardrails"], lookback_hours=24)); receipts.append(cm)
    decision_state, rationale, decision_receipts = "running", "", []
    if cm.data["state"] == "breach":
        decision_state, rationale = "guardrail_stop", f"Breach: {[g['name'] for g in cm.data['guardrails']]}"
        st = stop_run(StopRunArgs(run_id=run_id, reason=rationale)); receipts.append(st); decision_receipts.append(st.ref)
        rb = rollback_variant(RollbackVariantArgs(flag_key=f"{req['domain']}.exp", to_variant="control")); receipts.append(rb); decision_receipts.append(rb.ref)
        ot = open_ticket(OpenTicketArgs(title="Guardrail stop", description=rationale, severity="high", owners=["exp@product","oncall@data"])); receipts.append(ot); decision_receipts.append(ot.ref)
    elif cm.data["delta"] >= req["mde"]:
        decision_state, rationale = "promote", f"Effect {cm.data['delta']} ≥ MDE {req['mde']}"
        pr = promote_variant(PromoteVariantArgs(flag_key=f"{req['domain']}.exp", variant="treatmentA")); receipts.append(pr); decision_receipts.append(pr.ref)
        st = stop_run(StopRunArgs(run_id=run_id, reason="Promoted")); receipts.append(st); decision_receipts.append(st.ref)
    else:
        decision_state, rationale = "inconclusive", f"Effect {cm.data['delta']} < MDE {req['mde']} (continue or extend)"

    # 3) Render
    lines = [p["summary"], ""]
    lines.append(f"Experiment: {exp_id} | Run: {run_id}")
    lines.append(f"Allocation: control=50%, treatmentA=50% | Exposure: logged_in==true")
    lines.append(f"Checkpoint: Δ{req['primary_metric']}={cm.data['delta']} | Guardrails: {'ok' if cm.data['state']=='ok' else 'breach'}")
    lines.append(f"\nDecision: {decision_state} — {rationale}")
    if decision_receipts:
        lines.append("Receipts:")
        for r in decision_receipts: lines.append(f"- {r}")
    lines.append("\nCitations: " + ", ".join(p["citations"]))
    return "\n".join(lines)

if __name__ == "__main__":
    example = {
      "domain":"prompts",
      "treatment_spec":{"policy_bundle":"gscp-12.v4","prompt":"new-assistant-v7"},
      "primary_metric":"CSAT",
      "guardrails":[{"name":"defect_rate","threshold":0.02,"direction":"<"},
                    {"name":"latency_p95","threshold":2.5,"direction":"<"}],
      "mde":0.01,
      "max_duration_days":14,
      "max_concurrent":10
    }
    print(handle(example))

Design notes that keep you out of trouble

  • Pre-register everything. Metrics, guardrails, hypothesis, and MDE must be registered before allocation to avoid p-hacking.

  • One source of truth. The agent never computes metrics; it reads from your production metrics store (or experiment platform) and cites metric names/IDs.

  • Guardrail-first decisions. If any safety/latency guardrail breaches, the agent stops + rolls back immediately before debating lift.

  • Receipts > logs. Promotion and rollback produce flag change IDs; snapshots produce dashboard IDs; decisions are not “done” without them.

  • Golden traces. Keep a small suite of canonical experiments (e.g., “prompt vX vs vY”) that the agent can simulate in CI.


Common failure modes (and fixes)

  • Peeking & garden-of-forking paths → Lock analysis windows and require pre-registered decision rules; the agent should refuse ad-hoc splits mid-run.

  • Crossover/exposure leaks → Use user-level bucketing with sticky assignments; codify exposure rules in the allocation tool.

  • Latent guardrail breaches → Run hourly/daily checkpoints and set hard stops.

  • Rollbacks that half-apply → Treat promotions/rollbacks as atomic flag changes with idempotency keys and verify the post-state before claiming success.


Conclusion

With a Experimentation & A/B Rollout Orchestrator Agent, teams move faster and safer: every test is a contract, every action returns a receipt, and promotion/rollback become one-click, auditable operations. Tie this into your earlier agents (data quality, cost/perf, privacy) and you’ll ship AI-powered features with the operational rigor your business demands.