AI Automation & Agents  

AI Agents in Practice: Event-Driven Agents over Streams & CDC

Introduction

After wiring agents to warehouses and OLTP databases, the next leap is real-time: letting agents perceive and act on events as they happen—orders placed, thresholds breached, schemas changed. This article shows how to make agents first-class citizens of streaming and change-data-capture (CDC) systems (Kafka, Pulsar, Kinesis, Redpanda; Debezium/GoldenGate/SQL Server CDC), with contracts, governance, and receipts that stand up in production.


Why streams + CDC change the game

Polling a database or warehouse adds latency and cost. With CDC and event buses, agents get low-latency signals (row inserts/updates, metric deltas, anomaly alerts) and can trigger bound, reversible actions—escalate a case, create a payment hold, notify a customer—without heavy batch jobs. The trade-off is complexity: ordering, deduplication, backpressure, and exactly-once semantics. Solve these at the contract and tool layer, not in fragile prompts.


Reference architecture

  • Ingress: CDC connectors (e.g., Debezium) emit change events to topics (e.g., db.customer.orders). Producers for domain events (e.g., payments.authorized) do the same.

  • Stream processing: Flink/Spark/Kafka Streams derive features (rolling sums, thresholds) and emit well-typed facts agents can subscribe to (e.g., risk.order_threshold_breach).

  • Agent runtime: An agent subscribes via a typed consumer, evaluates policy and context, and proposes tool calls (tickets, emails, payments, flag flips), each requiring idempotency and receipts.

  • Governance: Topic ACLs, data contracts (Avro/Protobuf/JSON Schema), PII tokenization, and dead-letter queues (DLQ).

  • Observability: Per-message traces (event key, partition/offset, watermark), action receipts, and outbox tables for exactly-once writes.


Agent–event contract (YAML)

# contracts/streaming_agent_v1.yaml
role: "StreamingOpsAgent"
scope: >
  React to certified stream topics and CDC events; propose bounded actions with idempotency and receipts.
  Never emit PII; respect sensitivity labels; ensure at-least-once processing without duplicating side effects.
inputs:
  topics:
    - name: "risk.order_threshold_breach"   # derived event
      schema: "avro://risk/order_breach@v3"
    - name: "cdc.public.orders"             # Debezium change event
      schema: "json://debezium/order@v2"
governance:
  pii_fields: ["email", "phone", "ssn_last4"]      # must be tokenized upstream
  sensitivity_ceiling: "Confidential"
  max_actions_per_minute: 120
  dlq_topic: "ops.agent_dlq"
output:
  type: object
  required: [summary, actions, citations, receipts]
  properties:
    summary: {type: string, maxWords: 120}
    actions:
      type: array
      items: {type: object, required: [tool, args, idempotency_key]}
    citations: {type: array, items: string}   # topic@schema_version
    receipts: {type: array, items: string}    # external IDs from tools

Typed tools (receipts or it didn’t happen)

# tools.py
from pydantic import BaseModel
from typing import Optional, Dict

class CreateTicketArgs(BaseModel):
    summary: str
    labels: list[str]
    idem_key: str

class PlacePaymentHoldArgs(BaseModel):
    order_id: str
    reason: str
    idem_key: str

class SendNoticeArgs(BaseModel):
    to_account: str
    template_id: str
    idem_key: str

class ToolReceipt(BaseModel):
    tool: str
    ok: bool
    ref: str             # external receipt ID (ticket id, hold id, message id)
    message: str = ""
    data: Optional[Dict] = None
# adapters.py (demo—wire to Jira/SNOW, payments, comms)
from tools import *
def create_ticket(a: CreateTicketArgs) -> ToolReceipt:
    return ToolReceipt(tool="CreateTicket", ok=True, ref=f"TKT-{a.idem_key[-6:]}", message="Ticket opened")
def place_payment_hold(a: PlacePaymentHoldArgs) -> ToolReceipt:
    return ToolReceipt(tool="PlacePaymentHold", ok=True, ref=f"HOLD-{a.order_id}", message="Hold placed")
def send_notice(a: SendNoticeArgs) -> ToolReceipt:
    return ToolReceipt(tool="SendNotice", ok=True, ref=f"MSG-{a.idem_key[-8:]}", message="Notice queued")

Event consumer with dedup + backpressure

# consumer.py
from pydantic import BaseModel
from tools import *
from adapters import *
import json, time, hashlib

ALLOWED_TOPICS = {"risk.order_threshold_breach@v3", "cdc.public.orders@v2"}
SEEN = set()   # replace with Redis/Bloom filter in prod

class Envelope(BaseModel):
    topic: str
    key: str
    ts_ms: int
    schema: str
    payload: dict

def idem_key(envelope: Envelope) -> str:
    h = hashlib.sha256()
    h.update(envelope.topic.encode())
    h.update(envelope.key.encode())
    h.update(str(envelope.ts_ms).encode())
    return "idem-" + h.hexdigest()

def handle(envelope: Envelope):
    topic_sig = f"{envelope.topic}@{envelope.schema}"
    assert topic_sig in ALLOWED_TOPICS, "Topic not allowed"

    ik = idem_key(envelope)
    if ik in SEEN: return {"summary":"duplicate", "actions":[], "receipts":[], "citations":[topic_sig]}
    SEEN.add(ik)

    actions, receipts = [], []
    p = envelope.payload

    # Pattern 1: derived breach event → ticket + hold
    if envelope.topic == "risk.order_threshold_breach":
        actions.append({"tool":"CreateTicket", "args":CreateTicketArgs(
            summary=f"Order {p['order_id']} risk breach ({p['rule']})",
            labels=["risk","orders"], idem_key=ik), "idempotency_key":ik})
        if p.get("auto_hold", True):
            actions.append({"tool":"PlacePaymentHold", "args":PlacePaymentHoldArgs(
                order_id=p["order_id"], reason=p["rule"], idem_key=ik), "idempotency_key":ik})

    # Pattern 2: raw CDC upsert → friendly notice for high-value VIPs
    if envelope.topic == "cdc.public.orders" and p.get("op") in {"c","u"} and p["after"].get("vip_flag"):
        actions.append({"tool":"SendNotice", "args":SendNoticeArgs(
            to_account=p["after"]["account_id"], template_id="vip-order-received", idem_key=ik), "idempotency_key":ik})

    # Execute with receipts
    for a in actions:
        if a["tool"] == "CreateTicket":  r = create_ticket(a["args"])
        elif a["tool"] == "PlacePaymentHold": r = place_payment_hold(a["args"])
        elif a["tool"] == "SendNotice": r = send_notice(a["args"])
        else: continue
        if r.ok: receipts.append(f"{r.tool}:{r.ref}")

    return {
        "summary": f"Processed {envelope.topic} key={envelope.key}",
        "actions": [f"{a['tool']}" for a in actions],
        "receipts": receipts,
        "citations": [topic_sig]
    }

Notes that matter in production

  • Replace SEEN with a persistent dedup keyed by (topic, key, watermark).

  • Respect watermarks and out-of-order windows (e.g., 10 minutes) in the stream processor upstream.

  • For exactly-once side effects: use transactional outbox in the action service or a saga pattern with compensating steps.


Governance & safety rails

  • Schema contracts: Require Avro/Protobuf/JSON Schema with backward-compatible evolution; fail closed on unknown required fields.

  • PII stance: PII must be tokenized at source; the agent never sees raw PII fields.

  • Rate limits: Max actions per minute per key (account/order) to avoid floods.

  • DLQ: Any validation failure or tool error goes to ops.agent_dlq with the envelope and error, not silently dropped.

  • Replayability: Agents must be idempotent on replays; rely on idempotency keys and receipts.


Observability & audit

Log per envelope: topic, partition/offset, event time vs processing time (lag), schema version, idempotency key, executed actions, external receipts, and outcome. Emit metrics for p50/p95 latency, DLQ rate, action rate, and dedup hits. Keep golden traces: a small set of canonical events and expected actions you replay in CI.


Common failure modes (and fixes)

  • Duplicate side effects on retries → Require idempotency keys; design tools as upsert/no-op.

  • Out-of-order breaches → Add a stream processor that computes sessionized state and emits a single breach per window/key.

  • Schema drift breaks agent → Pin to schema versions and adopt compatibility testing in CI with sample payloads.

  • Backpressure stalls → Implement bounded queues and drop or degrade policies (e.g., ticket only, skip notice) when backlogged.

  • PII leakage via passthrough → Enforce topic allowlists and strip/deny unapproved fields before agent logic.


End-to-end example scenario

A Flink job publishes risk.order_threshold_breach@v3 when a customer places 3 high-value orders within 10 minutes. The agent consumes the event, opens a ticket with the rule context, places a payment hold (receipt HOLD-12345), and sends a VIP notice if the account is marked VIP in the CDC payload. The response includes citations to the topic+schema and all receipts. On replay, idempotency keys ensure no duplicate holds or tickets.


Conclusion

Streaming and CDC let agents move from “smart dashboards” to operational reflexes. By treating topics and schemas as contracts, using typed, idempotent tools, and insisting on receipts, you get low-latency automation that’s safe, auditable, and easy to reason about. Pair this with your database and warehouse patterns from earlier parts, and you’ve got an agent platform that spans real-time, transactional, and analytical worlds—without the 3 a.m. pager.