AI Agents  

Building an AI Control Plane in Python: A Minimal, Production-Shaped Blueprint (with Code)

What this is (and what it is not)

This is a practical, code-forward blueprint for a minimal AI control plane in Python. It focuses on the mechanisms that make agentic AI governable: request intake, policy decisions, entitlements, least-privilege tool access, stage gates, artifact versioning, and an append-only evidence ledger. The objective is to give you a runnable skeleton that looks like a real system, not a conceptual diagram.

It is not a full product, and it is intentionally not tied to one vendor’s stack. The patterns here apply whether you are using OpenAI, Azure, local models, or a hybrid. The control plane sits above the model layer and focuses on enforcement and evidence. You can expand this into a full enterprise platform by replacing the storage layer, hardening auth, adding distributed orchestration, and implementing richer policy evaluation.


Architecture you will implement

Core modules in this blueprint:

  • RunStore: persists runs and state transitions

  • PolicyEngine: returns machine-readable decisions

  • Entitlements: budgets and throttling

  • EvidenceLedger: append-only events

  • ToolProxy: the choke point for tool calls

  • Validators: quality gates at stage boundaries

  • ArtifactStore: versioned deliverables

The main design decision is where enforcement happens. The control plane is only real if policy and entitlements are enforced in code paths that cannot be bypassed. That is why tools are invoked through a proxy, why gates run at stage boundaries, and why evidence is appended as events rather than scattered logs. In production, you can distribute execution across workers, but enforcement and evidence should remain centralized and consistent.

We will assume FastAPI for HTTP and SQLite for simplicity. In production you would swap SQLite with Postgres/SQL Server and add proper authN/authZ. The code below keeps each concern isolated so those swaps are straightforward rather than invasive.


1) Data model primitives (SQLite)

The data model is the foundation for auditability and operability. If you do not standardize runs, artifacts, and evidence early, you will end up with fragmented logs that cannot be reconciled into an audit narrative or cost model. The schema here is intentionally small: Runs for lifecycle, Evidence for append-only events, and Artifacts for versioned outputs.

In production, you would likely add tables for node/stage state, tool call detail, approvals, and cost records. But even this minimal set gives you a usable foundation: every run has a stable identity, every major action emits evidence, and every output is stored as a versioned artifact with a content hash. Those three capabilities are what makes the system governable.

# file: control_plane/db.py
from __future__ import annotations
import sqlite3
from contextlib import contextmanager
from pathlib import Path

DB_PATH = Path("control_plane.db")

@contextmanager
def db():
    conn = sqlite3.connect(DB_PATH)
    try:
        conn.execute("PRAGMA foreign_keys=ON;")
        yield conn
        conn.commit()
    finally:
        conn.close()

def init_db():
    with db() as conn:
        conn.executescript("""
        CREATE TABLE IF NOT EXISTS Runs(
            RunId TEXT PRIMARY KEY,
            TenantId TEXT NOT NULL,
            UserId TEXT NOT NULL,
            WorkflowKey TEXT NOT NULL,
            Environment TEXT NOT NULL,
            Status TEXT NOT NULL,
            RequestHash TEXT NOT NULL,
            CreatedAtUtc TEXT NOT NULL
        );

        CREATE TABLE IF NOT EXISTS Evidence(
            EvidenceId TEXT PRIMARY KEY,
            RunId TEXT NOT NULL,
            Kind TEXT NOT NULL,
            PayloadJson TEXT NOT NULL,
            CreatedAtUtc TEXT NOT NULL,
            FOREIGN KEY (RunId) REFERENCES Runs(RunId)
        );

        CREATE TABLE IF NOT EXISTS Artifacts(
            ArtifactId TEXT PRIMARY KEY,
            RunId TEXT NOT NULL,
            TypeKey TEXT NOT NULL,
            Version INTEGER NOT NULL,
            ContentHash TEXT NOT NULL,
            StoragePath TEXT NOT NULL,
            Classification TEXT NOT NULL,
            CreatedAtUtc TEXT NOT NULL,
            FOREIGN KEY (RunId) REFERENCES Runs(RunId)
        );

        CREATE INDEX IF NOT EXISTS IX_Evidence_RunId ON Evidence(RunId);
        CREATE INDEX IF NOT EXISTS IX_Artifacts_RunId ON Artifacts(RunId);
        """)

2) Canonical hashing utilities

Canonical hashing is a quiet but critical control-plane capability. It gives you stable identities for requests and artifacts so you can deduplicate, compare, and prove integrity. “Stable” is the key word: if the same input produces a different hash because of key ordering or formatting differences, you lose the ability to reason about repeatability and provenance.

In a mature implementation, you will use canonical hashes to drive caching, determine whether a deliverable changed materially, and create tamper-evident chains in your evidence ledger. Even in this minimal blueprint, hashing makes your run intake and artifact versioning far more robust, and it sets you up for audit narratives that can be validated technically rather than trusted socially.

# file: control_plane/hashing.py
from __future__ import annotations
import hashlib
import json
from typing import Any

def canonical_json(obj: Any) -> str:
    return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False)

def sha256_text(text: str) -> str:
    return hashlib.sha256(text.encode("utf-8")).hexdigest()

def sha256_obj(obj: Any) -> str:
    return sha256_text(canonical_json(obj))

3) Evidence ledger (append-only)

The evidence ledger is what makes a control plane defensible. It is not “logging.” It is a structured, append-only record of what happened: policy decisions, tool invocations, gate outcomes, approvals, and artifacts produced. Without this ledger, you can still generate outputs, but you cannot answer executive questions when something goes wrong.

Append-only matters because it reduces the temptation to “fix the record.” If you need corrections, you append new evidence rather than rewriting history. In production, you might add hashing chains or write once storage. Here, the important idea is that evidence events are first-class, queryable data, not scattered strings across service logs.

# file: control_plane/evidence.py
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
from typing import Any, Dict

from .db import db

def utc_now() -> str:
    return datetime.now(timezone.utc).isoformat()

class EvidenceLedger:
    def append(self, run_id: str, kind: str, payload: Dict[str, Any]) -> str:
        evidence_id = str(uuid.uuid4())
        with db() as conn:
            conn.execute(
                "INSERT INTO Evidence(EvidenceId, RunId, Kind, PayloadJson, CreatedAtUtc) VALUES(?,?,?,?,?)",
                (evidence_id, run_id, kind, json.dumps(payload, ensure_ascii=False), utc_now())
            )
        return evidence_id

4) Policy engine (executable decisions)

A policy engine is only useful if it outputs an explicit decision that the rest of the system can enforce. The mistake many teams make is mixing policy logic into scattered code paths. That leads to drift: one service enforces a rule, another forgets it, and audits become impossible because behavior is inconsistent.

This blueprint uses a deterministic function that evaluates environment and data classification and returns a PolicyDecision. In production, you could replace this with OPA/Rego or a structured rules engine, but the interface should remain stable: given context, return a machine-readable, versioned decision. That decision must be recorded in the evidence ledger for reproducibility.

# file: control_plane/policy.py
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Literal

Env = Literal["innovation", "production"]
Classification = Literal["Public", "Internal", "Confidential", "Restricted"]

@dataclass(frozen=True)
class PolicyDecision:
    policy_version: str
    allow_tools: List[str]
    require_approval_for: List[str]  # actions
    max_tool_calls: int
    retention_days: int
    output_classification: Classification

class PolicyEngine:
    POLICY_VERSION = "v1.0.0"

    def evaluate(self, *, env: Env, classification: Classification, workflow_key: str) -> PolicyDecision:
        allow_tools = ["read_repo", "search_docs"]
        require_approval_for = []

        max_tool_calls = 20 if env == "innovation" else 10
        retention_days = 7 if env == "innovation" else 90
        output_classification = classification

        if env == "production":
            require_approval_for.append("publish")
            require_approval_for.append("commit_repo")

        if classification == "Restricted":
            allow_tools = ["search_docs"]
            max_tool_calls = min(max_tool_calls, 5)
            require_approval_for.extend(["export", "send_external"])

        return PolicyDecision(
            policy_version=self.POLICY_VERSION,
            allow_tools=allow_tools,
            require_approval_for=sorted(set(require_approval_for)),
            max_tool_calls=max_tool_calls,
            retention_days=retention_days,
            output_classification=output_classification
        )

5) Entitlements (budget + throttling)

Entitlements turn AI from “unbounded usage” into a managed capability. Without entitlements, your control plane can enforce policy but still fail operationally: usage spikes unpredictably, spend becomes difficult to forecast, and critical workflows get starved by low-value runs. Entitlements provide the structure to allocate capacity and budget intentionally.

This minimal example returns per-run budget ceilings and per-day limits. In a production system, you would tie entitlements to plans, contracts, or internal chargeback models. The key is that entitlements are resolved early and recorded as evidence. That way, every run is explainable: it executed under a specific capacity and budget envelope.

# file: control_plane/entitlements.py
from __future__ import annotations
from dataclasses import dataclass

@dataclass
class Entitlement:
    max_runs_per_day: int
    max_cost_usd_per_run: float

class EntitlementsService:
    def resolve(self, tenant_id: str, user_id: str, workflow_key: str) -> Entitlement:
        return Entitlement(max_runs_per_day=50, max_cost_usd_per_run=2.50)

6) Tool proxy (least-privilege choke point)

The tool proxy is where agentic AI becomes safe. Models are not the biggest risk. Tools are. If an agent can call external systems directly, you have no enforceable boundary. A tool proxy ensures that every action is checked against policy and entitlements and that evidence is produced consistently.

This proxy enforces allowlists and a max call limit, and it logs each tool call as evidence. In production, you would add parameter constraints (path allowlists, repo scopes, query constraints), secret mediation, and network sandboxing. But the core principle remains: tools are never invoked directly by agents. They are invoked through a policy-enforcing choke point.

# file: control_plane/tools.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Callable

from .evidence import EvidenceLedger

@dataclass(frozen=True)
class ToolContext:
    run_id: str
    allowed_tools: set[str]
    remaining_calls: int

class ToolProxy:
    def __init__(self, ledger: EvidenceLedger):
        self._ledger = ledger
        self._tools: Dict[str, Callable[[Dict[str, Any]], Dict[str, Any]]] = {}

    def register(self, name: str, fn: Callable[[Dict[str, Any]], Dict[str, Any]]) -> None:
        self._tools[name] = fn

    def call(self, ctx: ToolContext, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
        if tool_name not in ctx.allowed_tools:
            self._ledger.append(ctx.run_id, "ToolDenied", {"tool": tool_name, "reason": "NotAllowedByPolicy"})
            raise PermissionError(f"Tool '{tool_name}' is not allowed by policy.")

        if ctx.remaining_calls <= 0:
            self._ledger.append(ctx.run_id, "ToolDenied", {"tool": tool_name, "reason": "ToolCallLimitExceeded"})
            raise RuntimeError("Tool call limit exceeded.")

        fn = self._tools.get(tool_name)
        if not fn:
            self._ledger.append(ctx.run_id, "ToolDenied", {"tool": tool_name, "reason": "ToolNotRegistered"})
            raise KeyError(f"Tool '{tool_name}' not registered.")

        self._ledger.append(ctx.run_id, "ToolCallStarted", {"tool": tool_name, "params": params})
        result = fn(params)
        self._ledger.append(ctx.run_id, "ToolCallCompleted", {"tool": tool_name, "result": result})
        return result

Example tool implementations:

These are placeholders to demonstrate the interface. In a real platform, “search docs” would call your internal retrieval service, and “read repo” would apply strict path constraints and tenant scoping. The proxy does not care what the tool does; it cares that the tool is allowed, invoked safely, and evidence is captured.

# file: control_plane/tool_impl.py
from __future__ import annotations
from typing import Dict, Any

def search_docs(params: Dict[str, Any]) -> Dict[str, Any]:
    q = str(params.get("q", "")).strip()
    return {"hits": [{"title": "AI Control Plane Spec", "score": 0.91}], "query": q}

def read_repo(params: Dict[str, Any]) -> Dict[str, Any]:
    path = str(params.get("path", "")).strip()
    return {"path": path, "content_preview": "..."[:200]}

7) Validators (quality gates)

Validators are your system’s automated reviewers. They reduce rework, prevent low-quality deliverables from moving forward, and create measurable quality signals. The common misconception is that quality gates “slow teams down.” In practice, they speed teams up by catching expensive errors early and standardizing what “good” looks like.

This minimal validator checks for completeness. In production, you would add cross-artifact consistency checks, policy compliance checks, and readiness checks. The important property is that validators are deterministic and produce structured findings. That makes them explainable, testable, and auditable.

# file: control_plane/validators.py
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Any, List

@dataclass
class GateResult:
    passed: bool
    findings: List[Dict[str, Any]]

class Validator:
    def validate(self, artifact: Dict[str, Any]) -> GateResult:
        raise NotImplementedError

class CompletenessValidator(Validator):
    def validate(self, artifact: Dict[str, Any]) -> GateResult:
        findings = []
        required_keys = ["title", "summary", "controls", "owner"]
        for k in required_keys:
            if not artifact.get(k):
                findings.append({"key": k, "issue": "Missing"})
        return GateResult(passed=len(findings) == 0, findings=findings)

8) Artifact store (versioned outputs)

Artifacts are how AI output becomes an enterprise asset rather than disposable text. Versioning solves a real operational problem: teams need to know what changed, which version was approved, and what the provenance is. Hashing enables integrity checks and deduplication, and classification drives retention and access rules.

This artifact store persists JSON to disk and tracks metadata in SQLite. In production, you would store content in object storage and track metadata in SQL, but the pattern is the same. Every artifact write is a durable event: versioned, hashed, classified, and associated with a run.

# file: control_plane/artifacts.py
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict

from .db import db
from .hashing import sha256_text, canonical_json

ARTIFACT_ROOT = Path("artifacts")
ARTIFACT_ROOT.mkdir(exist_ok=True)

def utc_now() -> str:
    return datetime.now(timezone.utc).isoformat()

class ArtifactStore:
    def save(self, *, run_id: str, type_key: str, content: Dict[str, Any], classification: str) -> Dict[str, Any]:
        artifact_id = str(uuid.uuid4())
        content_text = canonical_json(content)
        content_hash = sha256_text(content_text)

        with db() as conn:
            cur = conn.execute(
                "SELECT COALESCE(MAX(Version), 0) FROM Artifacts WHERE RunId=? AND TypeKey=?",
                (run_id, type_key),
            )
            version = int(cur.fetchone()[0]) + 1

            path = ARTIFACT_ROOT / f"{run_id}_{type_key}_v{version}.json"
            path.write_text(content_text, encoding="utf-8")

            conn.execute(
                "INSERT INTO Artifacts(ArtifactId, RunId, TypeKey, Version, ContentHash, StoragePath, Classification, CreatedAtUtc) VALUES(?,?,?,?,?,?,?,?)",
                (artifact_id, run_id, type_key, version, content_hash, str(path), classification, utc_now())
            )

        return {
            "artifactId": artifact_id,
            "type": type_key,
            "version": version,
            "hash": content_hash,
            "path": str(path)
        }

9) Orchestrator (staged execution example)

The orchestrator is where policy, entitlements, tools, gates, and artifacts are composed into an actual workflow. A common mistake is to treat orchestration as “call the model and return text.” A control plane orchestrator is more like a pipeline runner: it builds a plan, enforces constraints, calls tools via the proxy, validates outputs, and records evidence at each step.

This example is deliberately simple: it resolves entitlements, evaluates policy, calls a tool, constructs an artifact, runs a validator, then stores the artifact and completes the run. In production, you would have multiple stages, retries, node state, and approvals. But this is enough to demonstrate the non-negotiable flow: every stage emits evidence, and every output becomes a versioned artifact.

# file: control_plane/orchestrator.py
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from typing import Any, Dict

from .db import db
from .hashing import sha256_obj
from .evidence import EvidenceLedger
from .policy import PolicyEngine, Env, Classification
from .entitlements import EntitlementsService
from .tools import ToolProxy, ToolContext
from .validators import CompletenessValidator
from .artifacts import ArtifactStore

def utc_now() -> str:
    return datetime.now(timezone.utc).isoformat()

class ControlPlaneOrchestrator:
    def __init__(self):
        self.ledger = EvidenceLedger()
        self.policy = PolicyEngine()
        self.entitlements = EntitlementsService()
        self.artifacts = ArtifactStore()
        self.validator = CompletenessValidator()
        self.tools = ToolProxy(self.ledger)

    def create_run(self, *, tenant_id: str, user_id: str, workflow_key: str, env: Env, request: Dict[str, Any]) -> str:
        run_id = str(uuid.uuid4())
        request_hash = sha256_obj(request)
        with db() as conn:
            conn.execute(
                "INSERT INTO Runs(RunId, TenantId, UserId, WorkflowKey, Environment, Status, RequestHash, CreatedAtUtc) VALUES(?,?,?,?,?,?,?,?)",
                (run_id, tenant_id, user_id, workflow_key, env, "Created", request_hash, utc_now())
            )
        self.ledger.append(run_id, "RunCreated", {"workflow": workflow_key, "env": env, "requestHash": request_hash})
        return run_id

    def run(self, *, tenant_id: str, user_id: str, workflow_key: str, env: Env, classification: Classification, request: Dict[str, Any]) -> Dict[str, Any]:
        run_id = self.create_run(tenant_id=tenant_id, user_id=user_id, workflow_key=workflow_key, env=env, request=request)

        ent = self.entitlements.resolve(tenant_id, user_id, workflow_key)
        self.ledger.append(run_id, "EntitlementResolved", {"maxRunsPerDay": ent.max_runs_per_day, "maxCostUsdPerRun": ent.max_cost_usd_per_run})

        decision = self.policy.evaluate(env=env, classification=classification, workflow_key=workflow_key)
        self.ledger.append(run_id, "PolicyDecision", {
            "policyVersion": decision.policy_version,
            "allowTools": decision.allow_tools,
            "requireApprovalFor": decision.require_approval_for,
            "maxToolCalls": decision.max_tool_calls,
            "retentionDays": decision.retention_days,
            "outputClassification": decision.output_classification
        })

        ctx = ToolContext(run_id=run_id, allowed_tools=set(decision.allow_tools), remaining_calls=decision.max_tool_calls)

        try:
            _ = self.tools.call(ctx, "search_docs", {"q": "AI control plane policy engine"})
        except Exception as ex:
            self.ledger.append(run_id, "ToolCallError", {"error": str(ex)})

        artifact = {
            "title": "AI Control Plane Blueprint",
            "summary": "A minimal governed architecture for agentic AI including policy, entitlements, audit, and quality gates.",
            "controls": {
                "policy": "Executable policy decisions with versioning",
                "entitlements": "Budget envelopes and rate limits",
                "audit": "Append-only evidence ledger",
                "tools": "Least-privilege tool proxy",
                "quality": "Stage gates and validators"
            },
            "owner": {"tenant": tenant_id, "user": user_id},
        }

        gate = self.validator.validate(artifact)
        self.ledger.append(run_id, "GateResult", {"gate": "CompletenessValidator", "passed": gate.passed, "findings": gate.findings})

        if not gate.passed:
            with db() as conn:
                conn.execute("UPDATE Runs SET Status=? WHERE RunId=?", ("FailedGate", run_id))
            return {"runId": run_id, "status": "FailedGate", "findings": gate.findings}

        saved = self.artifacts.save(run_id=run_id, type_key="control_plane_blueprint", content=artifact, classification=decision.output_classification)
        self.ledger.append(run_id, "ArtifactSaved", saved)

        with db() as conn:
            conn.execute("UPDATE Runs SET Status=? WHERE RunId=?", ("Completed", run_id))

        return {"runId": run_id, "status": "Completed", "artifact": saved}

10) FastAPI entrypoint (run a workflow)

An API boundary makes the control plane usable by real systems: UIs, CLIs, CI pipelines, or other services. It also gives you a clean place to enforce authentication and tenancy. In production, you would integrate OIDC and attach tenant/user identity from claims rather than trusting fields in the request body.

This entrypoint demonstrates how the orchestrator is invoked as a single “run” operation. The key is that errors become structured responses and that run creation and evidence capture happen inside the orchestrator. That means even failures are auditable and queryable, which is essential for incident response and system hardening.

# file: app.py
from __future__ import annotations
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Any, Dict, Literal

from control_plane.db import init_db
from control_plane.orchestrator import ControlPlaneOrchestrator

Env = Literal["innovation", "production"]
Classification = Literal["Public", "Internal", "Confidential", "Restricted"]

app = FastAPI(title="AI Control Plane (Minimal)")

init_db()
orch = ControlPlaneOrchestrator()

from control_plane.tool_impl import search_docs, read_repo
orch.tools.register("search_docs", search_docs)
orch.tools.register("read_repo", read_repo)

class RunRequest(BaseModel):
    tenantId: str
    userId: str
    workflowKey: str = "control_plane_demo"
    env: Env = "innovation"
    classification: Classification = "Internal"
    payload: Dict[str, Any] = {}

@app.post("/runs")
def run_workflow(req: RunRequest):
    try:
        result = orch.run(
            tenant_id=req.tenantId,
            user_id=req.userId,
            workflow_key=req.workflowKey,
            env=req.env,
            classification=req.classification,
            request=req.payload,
        )
        return result
    except PermissionError as ex:
        raise HTTPException(status_code=403, detail=str(ex))
    except Exception as ex:
        raise HTTPException(status_code=500, detail=str(ex))

Run it:

pip install fastapi uvicorn pydantic
uvicorn app:app --reload

Example request:

curl -X POST http://127.0.0.1:8000/runs \
  -H "Content-Type: application/json" \
  -d '{"tenantId":"t1","userId":"u1","env":"production","classification":"Internal","payload":{"objective":"blueprint"}}'

Where to extend this into real production

If you want this blueprint to survive production load, you will harden three areas first: identity/tenancy, enforcement boundaries, and durability. Identity and tenancy ensures the system cannot be tricked into cross-tenant access. Enforcement boundaries ensures tools and approvals cannot be bypassed. Durability ensures evidence and artifacts survive failures and can be queried reliably.

A practical production upgrade list:

  • Replace SQLite with Postgres/SQL Server and add migrations

  • Add real authN/authZ (OIDC) and tenant isolation

  • Implement true budgets (token metering + tool costs) and enforcement

  • Add a real policy engine (OPA/Rego or your deterministic evaluator)

  • Add async orchestration (Celery, Dramatiq, Temporal) for long-running runs

  • Add a real tool sandbox (filesystem and network constraints)

  • Add durable object storage for artifacts and signed run manifests

  • Expand validators into a library and attach them per workflow stage

  • Add approval service endpoints and UI integration

From there, the platform becomes an operating system for governed agentic workflows. You add workflows one by one, and each new workflow automatically inherits policy enforcement, evidence capture, and quality gating. That is the compounding value of a control plane: governance becomes the default path, not a manual add-on.