AI Automation & Agents  

AI Agents in Practice: IT Access Review & Deprovisioning Agent (Prompts + Code)

Introduction

This pattern implements an IT Access Review & Deprovisioning Agent. It continuously evaluates user/application access against least-privilege policy, detects stale or risky entitlements, requests approvals for exceptions, and executes revocations with receipts from IAM/IdP systems. It never asserts success without verifiable evidence (ticket IDs, change IDs, directory audit logs), giving security, IT, and audit teams a single, reliable loop.


The Use Case

Organizations accumulate access: role changes, transfers, contractors, and abandoned service accounts. Quarterly access reviews are manual and error-prone. The agent assembles entitlements from IdP/IAM/SaaS, correlates with HR status and role catalogs, flags violations (inactive, offboarded, toxic combinations, orphaned accounts), and proposes actions: keep, downgrade, or revoke. For sensitive scopes (e.g., production admin), it requests manager or app-owner approval before execution. All facts cite policy claims; all writes return receipts.


Prompt Contract (agent interface)

# file: contracts/it_access_review_v1.yaml
role: "ITAccessReviewAgent"
scope: >
  Evaluate user/app entitlements against least-privilege policy; propose downgrades or revocations.
  Ask once if critical fields are missing (user_id, employment_status, role, entitlements[]).
  Propose tool calls; never assert success without an IAM receipt.
output:
  type: object
  required: [summary, decision, user, findings, citations, next_steps, tool_proposals]
  properties:
    summary: {type: string, maxWords: 90}
    decision: {type: string, enum: ["revoke","downgrade","keep","need_approval","need_more_info"]}
    user:
      type: object
      required: [user_id, role, employment_status]
      properties:
        user_id: {type: string}
        role: {type: string}
        employment_status: {type: string, enum: ["active","terminated","contract_ended","on_leave","transferred"]}
    findings:
      type: array
      items:
        type: object
        required: [entitlement, status, reason]
        properties:
          entitlement: {type: string}
          status: {type: string, enum: ["ok","stale","orphaned","toxic_combo","excessive"]}
          reason: {type: string}
    citations: {type: array, items: {type: string}}
    next_steps: {type: array, items: {type: string}, maxItems: 6}
    tool_proposals:
      type: array
      items:
        type: object
        required: [name, args, preconditions, idempotency_key]
        properties:
          name: {type: string, enum: [FetchEntitlements, CheckPolicy, RequestApproval, RevokeAccess, DowngradeAccess, CreateTicket]}
          args: {type: object}
          preconditions: {type: string}
          idempotency_key: {type: string}
policy_id: "iam_policy.v6"
citation_rule: "1–2 minimal-span claim_ids per factual sentence"
decoding:
  narrative: {top_p: 0.9, temperature: 0.7}
  bullets:   {top_p: 0.82, temperature: 0.45}

Example claims (context given to the model)

[
  {"claim_id":"policy:least_privilege","text":"Users must retain only entitlements necessary for their current role.",
   "effective_date":"2025-02-01","source_id":"doc:iam_policy_v6","span":"only entitlements necessary"},
  {"claim_id":"policy:offboarding:deadline","text":"Accounts for terminated users must be disabled within 24 hours.",
   "effective_date":"2025-02-01","source_id":"doc:iam_policy_v6","span":"disabled within 24 hours"},
  {"claim_id":"policy:toxic:prod_admin_plus_billing","text":"Production admin and billing admin must not be combined.",
   "effective_date":"2025-02-01","source_id":"doc:iam_policy_v6","span":"must not be combined"},
  {"claim_id":"policy:review:quarterly","text":"All privileged roles require a quarterly attestation by the app owner.",
   "effective_date":"2025-02-01","source_id":"doc:iam_policy_v6","span":"quarterly attestation by the app owner"}
]

Tool Interfaces (typed, with receipts)

# tools.py
from pydantic import BaseModel
from typing import List, Optional, Dict

class FetchEntitlementsArgs(BaseModel):
    user_id: str

class CheckPolicyArgs(BaseModel):
    user_id: str
    role: str
    employment_status: str
    entitlements: List[str]

class RequestApprovalArgs(BaseModel):
    user_id: str
    entitlement: str
    approver_id: str
    reason: str

class RevokeAccessArgs(BaseModel):
    user_id: str
    entitlement: str

class DowngradeAccessArgs(BaseModel):
    user_id: str
    entitlement: str
    target_role: str

class CreateTicketArgs(BaseModel):
    user_id: str
    action: str
    details: Dict[str, str]

class ToolReceipt(BaseModel):
    tool: str
    ok: bool
    ref: str
    message: str = ""
    data: Optional[Dict] = None
# adapters.py  (demo logic)
from tools import *

DIRECTORY = {
  "u-101": {"status":"terminated", "role":"Engineer", "ents":["okta:prod_admin","gsuite:billing_admin","github:write_all"]},
  "u-202": {"status":"active", "role":"Finance", "ents":["okta:read_only","gsuite:billing_admin"]}
}

def fetch_entitlements(a: FetchEntitlementsArgs) -> ToolReceipt:
    user = DIRECTORY.get(a.user_id)
    if not user:
        return ToolReceipt(tool="FetchEntitlements", ok=False, ref="user-missing", message="User not found")
    return ToolReceipt(tool="FetchEntitlements", ok=True, ref=f"ents-{a.user_id}",
                       data={"role": user["role"], "employment_status": user["status"], "entitlements": user["ents"]})

def check_policy(a: CheckPolicyArgs) -> ToolReceipt:
    findings = []
    for e in a.entitlements:
        if a.employment_status in {"terminated","contract_ended"}:
            findings.append({"entitlement": e, "status":"orphaned", "reason":"User terminated; revoke within 24h"})
        elif e == "okta:prod_admin" and "gsuite:billing_admin" in a.entitlements:
            findings.append({"entitlement": e, "status":"toxic_combo", "reason":"Prod admin + billing admin forbidden"})
        elif a.role != "Finance" and e == "gsuite:billing_admin":
            findings.append({"entitlement": e, "status":"excessive", "reason":"Not required for role"})
        else:
            findings.append({"entitlement": e, "status":"ok", "reason":""})
    decision = "revoke" if any(f["status"] in {"orphaned","toxic_combo"} for f in findings) else "downgrade"
    return ToolReceipt(tool="CheckPolicy", ok=True, ref=f"policy-{a.user_id}", data={"findings": findings, "decision": decision})

def request_approval(a: RequestApprovalArgs) -> ToolReceipt:
    return ToolReceipt(tool="RequestApproval", ok=True, ref=f"appr-{a.user_id}-{a.entitlement}",
                       message=f"Approval requested from {a.approver_id}")

def revoke_access(a: RevokeAccessArgs) -> ToolReceipt:
    return ToolReceipt(tool="RevokeAccess", ok=True, ref=f"revoke-{a.user_id}-{a.entitlement}",
                       message=f"Revoked {a.entitlement}")

def downgrade_access(a: DowngradeAccessArgs) -> ToolReceipt:
    return ToolReceipt(tool="DowngradeAccess", ok=True, ref=f"dgr-{a.user_id}-{a.entitlement}",
                       message=f"Downgraded {a.entitlement} to {a.target_role}")

def create_ticket(a: CreateTicketArgs) -> ToolReceipt:
    return ToolReceipt(tool="CreateTicket", ok=True, ref=f"ticket-{a.user_id}", data=a.details)

Agent Loop (proposal → verification → execution → receipts)

# agent_it_access_review.py
import uuid, json
from typing import Any, Dict, List
from tools import *
from adapters import *

ALLOWED_TOOLS = {"FetchEntitlements","CheckPolicy","RequestApproval","RevokeAccess","DowngradeAccess","CreateTicket"}

def new_idem(): return f"idem-{uuid.uuid4()}"

def verify_proposal(p: Dict[str, Any]) -> str:
    req = {"name","args","preconditions","idempotency_key"}
    if not req.issubset(p): return "Missing proposal fields"
    if p["name"] not in ALLOWED_TOOLS: return "Tool not allowed"
    if p["name"] in {"RevokeAccess","DowngradeAccess"} and "entitlement" not in p["args"]:
        return "Entitlement required"
    return ""

def execute(p: Dict[str, Any]) -> ToolReceipt:
    n, a = p["name"], p["args"]
    if n=="FetchEntitlements":  return fetch_entitlements(FetchEntitlementsArgs(**a))
    if n=="CheckPolicy":        return check_policy(CheckPolicyArgs(**a))
    if n=="RequestApproval":    return request_approval(RequestApprovalArgs(**a))
    if n=="RevokeAccess":       return revoke_access(RevokeAccessArgs(**a))
    if n=="DowngradeAccess":    return downgrade_access(DowngradeAccessArgs(**a))
    if n=="CreateTicket":       return create_ticket(CreateTicketArgs(**a))
    return ToolReceipt(tool=n, ok=False, ref="none", message="Unknown tool")

# --- Model shim returning a plan per contract (replace with your LLM call) ---
def call_model(contract_yaml: str, claims: List[Dict[str,Any]], user_id: str) -> Dict[str,Any]:
    # The real model would use tool outputs; here we produce an illustrative plan
    return {
      "summary": f"Access review for {user_id}: revoke orphaned/toxic, downgrade excessive, ticket for audit.",
      "decision": "revoke",
      "user": {"user_id": user_id, "role": "Engineer", "employment_status": "terminated"},
      "findings": [],
      "citations": ["policy:least_privilege","policy:offboarding:deadline","policy:toxic:prod_admin_plus_billing","policy:review:quarterly"],
      "next_steps": ["Fetch entitlements","Check policy & determine actions","Request approvals for privileged scopes","Execute revocations/downgrades","Create audit ticket"],
      "tool_proposals": [
        {"name":"FetchEntitlements","args":{"user_id":user_id},
         "preconditions":"Get current entitlements from IdP/IAM.","idempotency_key": new_idem()},
        {"name":"CheckPolicy","args":{"user_id":user_id,"role":"Engineer","employment_status":"terminated",
                                      "entitlements":["okta:prod_admin","gsuite:billing_admin","github:write_all"]},
         "preconditions":"Compare against policy and role catalog.","idempotency_key": new_idem()},
        {"name":"RevokeAccess","args":{"user_id":user_id,"entitlement":"okta:prod_admin"},
         "preconditions":"Terminate user: revoke within 24h.","idempotency_key": new_idem()},
        {"name":"RevokeAccess","args":{"user_id":user_id,"entitlement":"gsuite:billing_admin"},
         "preconditions":"Toxic combo with prod admin; revoke.","idempotency_key": new_idem()},
        {"name":"DowngradeAccess","args":{"user_id":user_id,"entitlement":"github:write_all","target_role":"github:read_only"},
         "preconditions":"Excessive for terminated user; or remove if policy demands.","idempotency_key": new_idem()},
        {"name":"CreateTicket","args":{"user_id":user_id,"action":"access_cleanup",
                                       "details":{"change_ids":"revoke-refs","reason":"offboarding+toxic_combo"}},
         "preconditions":"Audit trail for SOX/ISO evidence.","idempotency_key": new_idem()}
      ]
    }

def render_response(plan: Dict[str,Any], receipts: List[ToolReceipt]) -> str:
    idx = {r.tool:r for r in receipts}
    lines = [plan["summary"], ""]
    lines.append(f"Decision: {plan['decision']}")
    if idx.get("CheckPolicy"):
        for f in idx["CheckPolicy"].data["findings"]:
            lines.append(f"- {f['entitlement']}: {f['status']} — {f['reason']}")
    if idx.get("RevokeAccess"):
        lines.append(f"\nRevocation: {idx['RevokeAccess'].message} ({idx['RevokeAccess'].ref})")
    if idx.get("DowngradeAccess"):
        lines.append(f"Downgrade: {idx['DowngradeAccess'].message} ({idx['DowngradeAccess'].ref})")
    if idx.get("CreateTicket"):
        lines.append(f"Audit ticket: {idx['CreateTicket'].ref}")
    lines.append("\nNext steps:")
    for s in plan["next_steps"]:
        lines.append(f"• {s}")
    lines.append("\nCitations: " + ", ".join(plan["citations"]))
    return "\n".join(lines)

def handle(user_id: str) -> str:
    contract = open("contracts/it_access_review_v1.yaml").read()
    claims: List[Dict[str,Any]] = []  # load iam policy claims
    plan = call_model(contract, claims, user_id)
    receipts: List[ToolReceipt] = []
    for prop in plan["tool_proposals"]:
        reason = verify_proposal(prop)
        if reason:
            receipts.append(ToolReceipt(tool=prop["name"], ok=False, ref="blocked", message=reason)); continue
        r = execute(prop)
        receipts.append(r)
        if not r.ok and prop["name"] in {"RevokeAccess","DowngradeAccess"}:
            break
    return render_response(plan, receipts)

if __name__ == "__main__":
    print(handle("u-101"))

The Prompt You’d Send to the Model (concise and testable)

System:
You are ITAccessReviewAgent. Follow the contract:
- Ask once if user_id, employment_status, role, or entitlements[] are missing.
- Cite 1–2 claim_ids per factual sentence using provided claims.
- Propose tools; never assert success without an IAM/IdP receipt.
- Output JSON with keys: summary, decision, user{}, findings[], citations[], next_steps[], tool_proposals[].

Claims (eligible only):
[ ... JSON array of IAM policy claims like above ... ]

User:
Please review and remediate access for user "u-101" (terminated).

How to adapt quickly

Wire FetchEntitlements to your IdP/IAM (e.g., Okta/Azure AD) and app APIs; load role catalogs and ownership maps for approvals. Enforce idempotency on revoke/downgrade operations and store change receipts (who/what/when). Implement golden traces (sample users: terminated, transferred, privileged) and run them in CI. Keep minimal-span citations on every factual sentence and no implied writes—actions only count with receipts. Add a feature flag for policy bundles so you can canary new toxic-combination rules and rollback safely.