Introduction
This pattern implements an IT Access Review & Deprovisioning Agent. It continuously evaluates user/application access against least-privilege policy, detects stale or risky entitlements, requests approvals for exceptions, and executes revocations with receipts from IAM/IdP systems. It never asserts success without verifiable evidence (ticket IDs, change IDs, directory audit logs), giving security, IT, and audit teams a single, reliable loop.
The Use Case
Organizations accumulate access: role changes, transfers, contractors, and abandoned service accounts. Quarterly access reviews are manual and error-prone. The agent assembles entitlements from IdP/IAM/SaaS, correlates with HR status and role catalogs, flags violations (inactive, offboarded, toxic combinations, orphaned accounts), and proposes actions: keep, downgrade, or revoke. For sensitive scopes (e.g., production admin), it requests manager or app-owner approval before execution. All facts cite policy claims; all writes return receipts.
Prompt Contract (agent interface)
# file: contracts/it_access_review_v1.yaml
role: "ITAccessReviewAgent"
scope: >
Evaluate user/app entitlements against least-privilege policy; propose downgrades or revocations.
Ask once if critical fields are missing (user_id, employment_status, role, entitlements[]).
Propose tool calls; never assert success without an IAM receipt.
output:
type: object
required: [summary, decision, user, findings, citations, next_steps, tool_proposals]
properties:
summary: {type: string, maxWords: 90}
decision: {type: string, enum: ["revoke","downgrade","keep","need_approval","need_more_info"]}
user:
type: object
required: [user_id, role, employment_status]
properties:
user_id: {type: string}
role: {type: string}
employment_status: {type: string, enum: ["active","terminated","contract_ended","on_leave","transferred"]}
findings:
type: array
items:
type: object
required: [entitlement, status, reason]
properties:
entitlement: {type: string}
status: {type: string, enum: ["ok","stale","orphaned","toxic_combo","excessive"]}
reason: {type: string}
citations: {type: array, items: {type: string}}
next_steps: {type: array, items: {type: string}, maxItems: 6}
tool_proposals:
type: array
items:
type: object
required: [name, args, preconditions, idempotency_key]
properties:
name: {type: string, enum: [FetchEntitlements, CheckPolicy, RequestApproval, RevokeAccess, DowngradeAccess, CreateTicket]}
args: {type: object}
preconditions: {type: string}
idempotency_key: {type: string}
policy_id: "iam_policy.v6"
citation_rule: "1–2 minimal-span claim_ids per factual sentence"
decoding:
narrative: {top_p: 0.9, temperature: 0.7}
bullets: {top_p: 0.82, temperature: 0.45}
Example claims (context given to the model)
[
{"claim_id":"policy:least_privilege","text":"Users must retain only entitlements necessary for their current role.",
"effective_date":"2025-02-01","source_id":"doc:iam_policy_v6","span":"only entitlements necessary"},
{"claim_id":"policy:offboarding:deadline","text":"Accounts for terminated users must be disabled within 24 hours.",
"effective_date":"2025-02-01","source_id":"doc:iam_policy_v6","span":"disabled within 24 hours"},
{"claim_id":"policy:toxic:prod_admin_plus_billing","text":"Production admin and billing admin must not be combined.",
"effective_date":"2025-02-01","source_id":"doc:iam_policy_v6","span":"must not be combined"},
{"claim_id":"policy:review:quarterly","text":"All privileged roles require a quarterly attestation by the app owner.",
"effective_date":"2025-02-01","source_id":"doc:iam_policy_v6","span":"quarterly attestation by the app owner"}
]
Tool Interfaces (typed, with receipts)
# tools.py
from pydantic import BaseModel
from typing import List, Optional, Dict
class FetchEntitlementsArgs(BaseModel):
user_id: str
class CheckPolicyArgs(BaseModel):
user_id: str
role: str
employment_status: str
entitlements: List[str]
class RequestApprovalArgs(BaseModel):
user_id: str
entitlement: str
approver_id: str
reason: str
class RevokeAccessArgs(BaseModel):
user_id: str
entitlement: str
class DowngradeAccessArgs(BaseModel):
user_id: str
entitlement: str
target_role: str
class CreateTicketArgs(BaseModel):
user_id: str
action: str
details: Dict[str, str]
class ToolReceipt(BaseModel):
tool: str
ok: bool
ref: str
message: str = ""
data: Optional[Dict] = None
# adapters.py (demo logic)
from tools import *
DIRECTORY = {
"u-101": {"status":"terminated", "role":"Engineer", "ents":["okta:prod_admin","gsuite:billing_admin","github:write_all"]},
"u-202": {"status":"active", "role":"Finance", "ents":["okta:read_only","gsuite:billing_admin"]}
}
def fetch_entitlements(a: FetchEntitlementsArgs) -> ToolReceipt:
user = DIRECTORY.get(a.user_id)
if not user:
return ToolReceipt(tool="FetchEntitlements", ok=False, ref="user-missing", message="User not found")
return ToolReceipt(tool="FetchEntitlements", ok=True, ref=f"ents-{a.user_id}",
data={"role": user["role"], "employment_status": user["status"], "entitlements": user["ents"]})
def check_policy(a: CheckPolicyArgs) -> ToolReceipt:
findings = []
for e in a.entitlements:
if a.employment_status in {"terminated","contract_ended"}:
findings.append({"entitlement": e, "status":"orphaned", "reason":"User terminated; revoke within 24h"})
elif e == "okta:prod_admin" and "gsuite:billing_admin" in a.entitlements:
findings.append({"entitlement": e, "status":"toxic_combo", "reason":"Prod admin + billing admin forbidden"})
elif a.role != "Finance" and e == "gsuite:billing_admin":
findings.append({"entitlement": e, "status":"excessive", "reason":"Not required for role"})
else:
findings.append({"entitlement": e, "status":"ok", "reason":""})
decision = "revoke" if any(f["status"] in {"orphaned","toxic_combo"} for f in findings) else "downgrade"
return ToolReceipt(tool="CheckPolicy", ok=True, ref=f"policy-{a.user_id}", data={"findings": findings, "decision": decision})
def request_approval(a: RequestApprovalArgs) -> ToolReceipt:
return ToolReceipt(tool="RequestApproval", ok=True, ref=f"appr-{a.user_id}-{a.entitlement}",
message=f"Approval requested from {a.approver_id}")
def revoke_access(a: RevokeAccessArgs) -> ToolReceipt:
return ToolReceipt(tool="RevokeAccess", ok=True, ref=f"revoke-{a.user_id}-{a.entitlement}",
message=f"Revoked {a.entitlement}")
def downgrade_access(a: DowngradeAccessArgs) -> ToolReceipt:
return ToolReceipt(tool="DowngradeAccess", ok=True, ref=f"dgr-{a.user_id}-{a.entitlement}",
message=f"Downgraded {a.entitlement} to {a.target_role}")
def create_ticket(a: CreateTicketArgs) -> ToolReceipt:
return ToolReceipt(tool="CreateTicket", ok=True, ref=f"ticket-{a.user_id}", data=a.details)
Agent Loop (proposal → verification → execution → receipts)
# agent_it_access_review.py
import uuid, json
from typing import Any, Dict, List
from tools import *
from adapters import *
ALLOWED_TOOLS = {"FetchEntitlements","CheckPolicy","RequestApproval","RevokeAccess","DowngradeAccess","CreateTicket"}
def new_idem(): return f"idem-{uuid.uuid4()}"
def verify_proposal(p: Dict[str, Any]) -> str:
req = {"name","args","preconditions","idempotency_key"}
if not req.issubset(p): return "Missing proposal fields"
if p["name"] not in ALLOWED_TOOLS: return "Tool not allowed"
if p["name"] in {"RevokeAccess","DowngradeAccess"} and "entitlement" not in p["args"]:
return "Entitlement required"
return ""
def execute(p: Dict[str, Any]) -> ToolReceipt:
n, a = p["name"], p["args"]
if n=="FetchEntitlements": return fetch_entitlements(FetchEntitlementsArgs(**a))
if n=="CheckPolicy": return check_policy(CheckPolicyArgs(**a))
if n=="RequestApproval": return request_approval(RequestApprovalArgs(**a))
if n=="RevokeAccess": return revoke_access(RevokeAccessArgs(**a))
if n=="DowngradeAccess": return downgrade_access(DowngradeAccessArgs(**a))
if n=="CreateTicket": return create_ticket(CreateTicketArgs(**a))
return ToolReceipt(tool=n, ok=False, ref="none", message="Unknown tool")
# --- Model shim returning a plan per contract (replace with your LLM call) ---
def call_model(contract_yaml: str, claims: List[Dict[str,Any]], user_id: str) -> Dict[str,Any]:
# The real model would use tool outputs; here we produce an illustrative plan
return {
"summary": f"Access review for {user_id}: revoke orphaned/toxic, downgrade excessive, ticket for audit.",
"decision": "revoke",
"user": {"user_id": user_id, "role": "Engineer", "employment_status": "terminated"},
"findings": [],
"citations": ["policy:least_privilege","policy:offboarding:deadline","policy:toxic:prod_admin_plus_billing","policy:review:quarterly"],
"next_steps": ["Fetch entitlements","Check policy & determine actions","Request approvals for privileged scopes","Execute revocations/downgrades","Create audit ticket"],
"tool_proposals": [
{"name":"FetchEntitlements","args":{"user_id":user_id},
"preconditions":"Get current entitlements from IdP/IAM.","idempotency_key": new_idem()},
{"name":"CheckPolicy","args":{"user_id":user_id,"role":"Engineer","employment_status":"terminated",
"entitlements":["okta:prod_admin","gsuite:billing_admin","github:write_all"]},
"preconditions":"Compare against policy and role catalog.","idempotency_key": new_idem()},
{"name":"RevokeAccess","args":{"user_id":user_id,"entitlement":"okta:prod_admin"},
"preconditions":"Terminate user: revoke within 24h.","idempotency_key": new_idem()},
{"name":"RevokeAccess","args":{"user_id":user_id,"entitlement":"gsuite:billing_admin"},
"preconditions":"Toxic combo with prod admin; revoke.","idempotency_key": new_idem()},
{"name":"DowngradeAccess","args":{"user_id":user_id,"entitlement":"github:write_all","target_role":"github:read_only"},
"preconditions":"Excessive for terminated user; or remove if policy demands.","idempotency_key": new_idem()},
{"name":"CreateTicket","args":{"user_id":user_id,"action":"access_cleanup",
"details":{"change_ids":"revoke-refs","reason":"offboarding+toxic_combo"}},
"preconditions":"Audit trail for SOX/ISO evidence.","idempotency_key": new_idem()}
]
}
def render_response(plan: Dict[str,Any], receipts: List[ToolReceipt]) -> str:
idx = {r.tool:r for r in receipts}
lines = [plan["summary"], ""]
lines.append(f"Decision: {plan['decision']}")
if idx.get("CheckPolicy"):
for f in idx["CheckPolicy"].data["findings"]:
lines.append(f"- {f['entitlement']}: {f['status']} — {f['reason']}")
if idx.get("RevokeAccess"):
lines.append(f"\nRevocation: {idx['RevokeAccess'].message} ({idx['RevokeAccess'].ref})")
if idx.get("DowngradeAccess"):
lines.append(f"Downgrade: {idx['DowngradeAccess'].message} ({idx['DowngradeAccess'].ref})")
if idx.get("CreateTicket"):
lines.append(f"Audit ticket: {idx['CreateTicket'].ref}")
lines.append("\nNext steps:")
for s in plan["next_steps"]:
lines.append(f"• {s}")
lines.append("\nCitations: " + ", ".join(plan["citations"]))
return "\n".join(lines)
def handle(user_id: str) -> str:
contract = open("contracts/it_access_review_v1.yaml").read()
claims: List[Dict[str,Any]] = [] # load iam policy claims
plan = call_model(contract, claims, user_id)
receipts: List[ToolReceipt] = []
for prop in plan["tool_proposals"]:
reason = verify_proposal(prop)
if reason:
receipts.append(ToolReceipt(tool=prop["name"], ok=False, ref="blocked", message=reason)); continue
r = execute(prop)
receipts.append(r)
if not r.ok and prop["name"] in {"RevokeAccess","DowngradeAccess"}:
break
return render_response(plan, receipts)
if __name__ == "__main__":
print(handle("u-101"))
The Prompt You’d Send to the Model (concise and testable)
System:
You are ITAccessReviewAgent. Follow the contract:
- Ask once if user_id, employment_status, role, or entitlements[] are missing.
- Cite 1–2 claim_ids per factual sentence using provided claims.
- Propose tools; never assert success without an IAM/IdP receipt.
- Output JSON with keys: summary, decision, user{}, findings[], citations[], next_steps[], tool_proposals[].
Claims (eligible only):
[ ... JSON array of IAM policy claims like above ... ]
User:
Please review and remediate access for user "u-101" (terminated).
How to adapt quickly
Wire FetchEntitlements to your IdP/IAM (e.g., Okta/Azure AD) and app APIs; load role catalogs and ownership maps for approvals. Enforce idempotency on revoke/downgrade operations and store change receipts (who/what/when). Implement golden traces (sample users: terminated, transferred, privileged) and run them in CI. Keep minimal-span citations on every factual sentence and no implied writes—actions only count with receipts. Add a feature flag for policy bundles so you can canary new toxic-combination rules and rollback safely.