Introduction
Once agents touch production user flows, the hardest problem is changing things safely. Feature prompts, retrieval policies, ranking models, and pricing algorithms all need controlled trials, guardrails, and rapid rollback. This article delivers an Experimentation & A/B Rollout Orchestrator Agent that proposes experiments, registers them, allocates traffic, watches guardrails, and rolls forward/back only with receipts (experiment IDs, run IDs, rollout change IDs). It treats experiments as contracts, not ad-hoc flags.
The Use Case
Product and data teams run many concurrent trials: copy variants, prompt policies, search rankers, recommender models, pricing heuristics. Without discipline, results get biased (peeking), guardrails get violated (latency, safety), and rollbacks are messy. The agent encodes your experimentation policy, integrates with your flag provider and metrics store, and turns “let’s test this” into a typed, auditable change that can be promoted or reverted in seconds.
Prompt Contract (agent interface)
# file: contracts/ab_orchestrator_v1.yaml
role: "ABOrchestratorAgent"
scope: >
Create, run, and decide A/B/n experiments with pre-registered metrics and guardrails.
Ask once if critical fields are missing (treatment spec, primary metric, MDE, duration cap).
Never assert success without a receipt (exp_id, run_id, change_id).
inputs:
domain: enum["prompts","policies","ranking","pricing","ui"]
treatment_spec: object # structured definition of variant(s)
primary_metric: string # e.g., "signup_conv", "CTR", "CSAT"
guardrails: # hard stops
- {name: string, threshold: number, direction: enum["<",">"]}
mde: number # minimum detectable effect in pct points or %
max_duration_days: integer
max_concurrent: integer
output:
type: object
required: [summary, registration, allocation, monitoring, decision, citations, tool_proposals]
properties:
summary: {type: string, maxWords: 100}
registration:
type: object
required: [exp_id, primary_metric, metrics, hypothesis]
allocation:
type: object
required: [arms, traffic_split, exposure_rules]
monitoring:
type: object
required: [run_id, dashboards, checkpoints]
decision:
type: object
required: [state, rationale, receipts]
properties:
state: {type: string, enum: ["running","guardrail_stop","promote","rollback","inconclusive"]}
rationale: {type: string}
receipts: {type: array, items: string}
citations: {type: array, items: string} # policy/rule ids
tool_proposals:
type: array
items:
type: object
required: [name, args, preconditions, idempotency_key]
properties:
name:
type: string
enum: [RegisterExperiment, CreateFlag, StartRun, AllocateTraffic,
CheckMetrics, StopRun, PromoteVariant, RollbackVariant,
SnapshotDashboard, OpenTicket]
args: {type: object}
preconditions: {type: string}
idempotency_key: {type: string}
policy_id: "exp_policy.v5"
citation_rule: "Minimal-span references to primary metric, guardrail thresholds, and policy clauses."
Tool Interfaces (typed, with receipts)
# tools.py
from pydantic import BaseModel
from typing import List, Dict, Optional
class RegisterExperimentArgs(BaseModel):
domain: str
primary_metric: str
metrics: List[str]
hypothesis: str
mde: float
max_duration_days: int
class CreateFlagArgs(BaseModel):
key: str
variants: List[str] # ["control","treatmentA","treatmentB"]
class StartRunArgs(BaseModel):
exp_id: str
population: str # "all_users","new_users","us_only", etc.
run_name: str
class AllocateTrafficArgs(BaseModel):
flag_key: str
split: Dict[str, int] # {"control": 50, "treatmentA": 50}
exposure_rules: List[str]
class CheckMetricsArgs(BaseModel):
run_id: str
primary_metric: str
guardrails: List[Dict] # [{name, threshold, direction}]
lookback_hours: int
class StopRunArgs(BaseModel):
run_id: str
reason: str
class PromoteVariantArgs(BaseModel):
flag_key: str
variant: str
class RollbackVariantArgs(BaseModel):
flag_key: str
to_variant: str
class SnapshotDashboardArgs(BaseModel):
run_id: str
class OpenTicketArgs(BaseModel):
title: str
description: str
severity: str
owners: List[str]
class ToolReceipt(BaseModel):
tool: str
ok: bool
ref: str # exp_id, run_id, change_id, dashboard_snapshot_id
message: str = ""
data: Optional[Dict] = None
# adapters.py (demo logic; wire to your flag service, metrics store, and BI)
from tools import *
import uuid, random
def register_experiment(a: RegisterExperimentArgs) -> ToolReceipt:
return ToolReceipt(tool="RegisterExperiment", ok=True, ref=f"EXP-{uuid.uuid4().hex[:8]}",
data={"primary_metric": a.primary_metric, "metrics": a.metrics, "mde": a.mde})
def create_flag(a: CreateFlagArgs) -> ToolReceipt:
return ToolReceipt(tool="CreateFlag", ok=True, ref=f"FLAG-{a.key}", message="Flag created")
def start_run(a: StartRunArgs) -> ToolReceipt:
return ToolReceipt(tool="StartRun", ok=True, ref=f"RUN-{uuid.uuid4().hex[:8]}", message="Run started")
def allocate_traffic(a: AllocateTrafficArgs) -> ToolReceipt:
return ToolReceipt(tool="AllocateTraffic", ok=True, ref=f"ALLOC-{a.flag_key}", data={"split": a.split})
def check_metrics(a: CheckMetricsArgs) -> ToolReceipt:
# toy metrics & guardrails
delta = round(random.uniform(-0.04, 0.06), 3) # treatment - control
guardrail_breaches = []
for g in a.guardrails:
obs = random.uniform(0.0, 1.0)
breached = (obs > g["threshold"]) if g["direction"] == ">" else (obs < g["threshold"])
if breached: guardrail_breaches.append({"name": g["name"], "observed": round(obs,3)})
state = "ok" if not guardrail_breaches else "breach"
return ToolReceipt(tool="CheckMetrics", ok=True, ref=f"MET-{a.run_id}", data={"delta": delta, "guardrails": guardrail_breaches, "state": state})
def stop_run(a: StopRunArgs) -> ToolReceipt:
return ToolReceipt(tool="StopRun", ok=True, ref=f"STOP-{a.run_id}", message=a.reason)
def promote_variant(a: PromoteVariantArgs) -> ToolReceipt:
return ToolReceipt(tool="PromoteVariant", ok=True, ref=f"PROM-{a.variant}", message="Variant promoted")
def rollback_variant(a: RollbackVariantArgs) -> ToolReceipt:
return ToolReceipt(tool="RollbackVariant", ok=True, ref=f"RB-{a.to_variant}", message="Rolled back")
def snapshot_dashboard(a: SnapshotDashboardArgs) -> ToolReceipt:
return ToolReceipt(tool="SnapshotDashboard", ok=True, ref=f"DASH-{a.run_id}", message="Dashboard snapshotted")
def open_ticket(a: OpenTicketArgs) -> ToolReceipt:
return ToolReceipt(tool="OpenTicket", ok=True, ref="EXP-INC-1024", message="Experiment incident opened")
Agent Loop (proposal → verification → execution → receipts)
# agent_ab_orchestrator.py
import uuid
from typing import Any, Dict, List
from tools import *
from adapters import *
ALLOWED = {"RegisterExperiment","CreateFlag","StartRun","AllocateTraffic",
"CheckMetrics","StopRun","PromoteVariant","RollbackVariant",
"SnapshotDashboard","OpenTicket"}
def new_idem(): return f"idem-{uuid.uuid4()}"
def verify(p: Dict[str,Any]) -> str:
need = {"name","args","preconditions","idempotency_key"}
if not need.issubset(p): return "Missing fields"
if p["name"] not in ALLOWED: return "Tool not allowed"
return ""
# --- Planner (replace with your LLM honoring the contract) ---
def plan(req: Dict[str,Any]) -> Dict[str,Any]:
flag_key = f"{req['domain']}.exp"
return {
"summary": f"A/B orchestrated for {req['domain']} on {req['primary_metric']} with MDE {req['mde']}.",
"registration": {},
"allocation": {},
"monitoring": {},
"decision": {"state":"running","rationale":"initialization","receipts":[]},
"citations": ["exp_policy.v5", f"guardrails:{','.join([g['name'] for g in req['guardrails']])}"],
"tool_proposals": [
{"name":"RegisterExperiment","args":{"domain":req["domain"],
"primary_metric":req["primary_metric"],
"metrics":[req["primary_metric"],"latency_p95","defect_rate"],
"hypothesis":"Treatment improves primary metric without guardrail breaches.",
"mde":req["mde"], "max_duration_days":req["max_duration_days"]},
"preconditions":"Register metrics & hypothesis.","idempotency_key": new_idem()},
{"name":"CreateFlag","args":{"key":flag_key,"variants":["control","treatmentA"]},
"preconditions":"Expose variants via flag.","idempotency_key": new_idem()},
{"name":"StartRun","args":{"exp_id":"", "population":"all_users","run_name":"main"},
"preconditions":"Launch run.","idempotency_key": new_idem()},
{"name":"AllocateTraffic","args":{"flag_key":flag_key,"split":{"control":50,"treatmentA":50},
"exposure_rules":["logged_in==true"]},
"preconditions":"Even split to start.","idempotency_key": new_idem()},
{"name":"SnapshotDashboard","args":{"run_id":""},
"preconditions":"Create monitoring view.","idempotency_key": new_idem()},
{"name":"CheckMetrics","args":{"run_id":"", "primary_metric":req["primary_metric"],
"guardrails":req["guardrails"], "lookback_hours":24},
"preconditions":"Daily checkpoint.","idempotency_key": new_idem()}
]
}
def handle(req: Dict[str,Any]) -> str:
p = plan(req); receipts: List[ToolReceipt] = []
# 1) Register → Flag → Run
for tp in p["tool_proposals"][:2]:
err = verify(tp); receipts.append(register_experiment(RegisterExperimentArgs(**tp["args"])) if tp["name"]=="RegisterExperiment" else create_flag(CreateFlagArgs(**tp["args"])) ) if not err else receipts.append(ToolReceipt(tool=tp["name"], ok=False, ref="blocked", message=err))
exp_id = receipts[0].ref if receipts and receipts[0].tool=="RegisterExperiment" else "EXP-UNK"
# Start run with exp_id
sr = start_run(StartRunArgs(exp_id=exp_id, population="all_users", run_name="main")); receipts.append(sr)
run_id = sr.ref
# Allocation & dashboard snapshot
receipts.append(allocate_traffic(AllocateTrafficArgs(flag_key=f"{req['domain']}.exp", split={"control":50,"treatmentA":50}, exposure_rules=["logged_in==true"])))
receipts.append(snapshot_dashboard(SnapshotDashboardArgs(run_id=run_id)))
# 2) Monitoring checkpoint → decision
cm = check_metrics(CheckMetricsArgs(run_id=run_id, primary_metric=req["primary_metric"], guardrails=req["guardrails"], lookback_hours=24)); receipts.append(cm)
decision_state, rationale, decision_receipts = "running", "", []
if cm.data["state"] == "breach":
decision_state, rationale = "guardrail_stop", f"Breach: {[g['name'] for g in cm.data['guardrails']]}"
st = stop_run(StopRunArgs(run_id=run_id, reason=rationale)); receipts.append(st); decision_receipts.append(st.ref)
rb = rollback_variant(RollbackVariantArgs(flag_key=f"{req['domain']}.exp", to_variant="control")); receipts.append(rb); decision_receipts.append(rb.ref)
ot = open_ticket(OpenTicketArgs(title="Guardrail stop", description=rationale, severity="high", owners=["exp@product","oncall@data"])); receipts.append(ot); decision_receipts.append(ot.ref)
elif cm.data["delta"] >= req["mde"]:
decision_state, rationale = "promote", f"Effect {cm.data['delta']} ≥ MDE {req['mde']}"
pr = promote_variant(PromoteVariantArgs(flag_key=f"{req['domain']}.exp", variant="treatmentA")); receipts.append(pr); decision_receipts.append(pr.ref)
st = stop_run(StopRunArgs(run_id=run_id, reason="Promoted")); receipts.append(st); decision_receipts.append(st.ref)
else:
decision_state, rationale = "inconclusive", f"Effect {cm.data['delta']} < MDE {req['mde']} (continue or extend)"
# 3) Render
lines = [p["summary"], ""]
lines.append(f"Experiment: {exp_id} | Run: {run_id}")
lines.append(f"Allocation: control=50%, treatmentA=50% | Exposure: logged_in==true")
lines.append(f"Checkpoint: Δ{req['primary_metric']}={cm.data['delta']} | Guardrails: {'ok' if cm.data['state']=='ok' else 'breach'}")
lines.append(f"\nDecision: {decision_state} — {rationale}")
if decision_receipts:
lines.append("Receipts:")
for r in decision_receipts: lines.append(f"- {r}")
lines.append("\nCitations: " + ", ".join(p["citations"]))
return "\n".join(lines)
if __name__ == "__main__":
example = {
"domain":"prompts",
"treatment_spec":{"policy_bundle":"gscp-12.v4","prompt":"new-assistant-v7"},
"primary_metric":"CSAT",
"guardrails":[{"name":"defect_rate","threshold":0.02,"direction":"<"},
{"name":"latency_p95","threshold":2.5,"direction":"<"}],
"mde":0.01,
"max_duration_days":14,
"max_concurrent":10
}
print(handle(example))
Design notes that keep you out of trouble
Pre-register everything. Metrics, guardrails, hypothesis, and MDE must be registered before allocation to avoid p-hacking.
One source of truth. The agent never computes metrics; it reads from your production metrics store (or experiment platform) and cites metric names/IDs.
Guardrail-first decisions. If any safety/latency guardrail breaches, the agent stops + rolls back immediately before debating lift.
Receipts > logs. Promotion and rollback produce flag change IDs; snapshots produce dashboard IDs; decisions are not “done” without them.
Golden traces. Keep a small suite of canonical experiments (e.g., “prompt vX vs vY”) that the agent can simulate in CI.
Common failure modes (and fixes)
Peeking & garden-of-forking paths → Lock analysis windows and require pre-registered decision rules; the agent should refuse ad-hoc splits mid-run.
Crossover/exposure leaks → Use user-level bucketing with sticky assignments; codify exposure rules in the allocation tool.
Latent guardrail breaches → Run hourly/daily checkpoints and set hard stops.
Rollbacks that half-apply → Treat promotions/rollbacks as atomic flag changes with idempotency keys and verify the post-state before claiming success.
Conclusion
With a Experimentation & A/B Rollout Orchestrator Agent, teams move faster and safer: every test is a contract, every action returns a receipt, and promotion/rollback become one-click, auditable operations. Tie this into your earlier agents (data quality, cost/perf, privacy) and you’ll ship AI-powered features with the operational rigor your business demands.