Introduction
After wiring agents to warehouses and OLTP databases, the next leap is real-time: letting agents perceive and act on events as they happen—orders placed, thresholds breached, schemas changed. This article shows how to make agents first-class citizens of streaming and change-data-capture (CDC) systems (Kafka, Pulsar, Kinesis, Redpanda; Debezium/GoldenGate/SQL Server CDC), with contracts, governance, and receipts that stand up in production.
Why streams + CDC change the game
Polling a database or warehouse adds latency and cost. With CDC and event buses, agents get low-latency signals (row inserts/updates, metric deltas, anomaly alerts) and can trigger bound, reversible actions—escalate a case, create a payment hold, notify a customer—without heavy batch jobs. The trade-off is complexity: ordering, deduplication, backpressure, and exactly-once semantics. Solve these at the contract and tool layer, not in fragile prompts.
Reference architecture
Ingress: CDC connectors (e.g., Debezium) emit change events to topics (e.g., db.customer.orders
). Producers for domain events (e.g., payments.authorized
) do the same.
Stream processing: Flink/Spark/Kafka Streams derive features (rolling sums, thresholds) and emit well-typed facts agents can subscribe to (e.g., risk.order_threshold_breach
).
Agent runtime: An agent subscribes via a typed consumer, evaluates policy and context, and proposes tool calls (tickets, emails, payments, flag flips), each requiring idempotency and receipts.
Governance: Topic ACLs, data contracts (Avro/Protobuf/JSON Schema), PII tokenization, and dead-letter queues (DLQ).
Observability: Per-message traces (event key, partition/offset, watermark), action receipts, and outbox tables for exactly-once writes.
Agent–event contract (YAML)
# contracts/streaming_agent_v1.yaml
role: "StreamingOpsAgent"
scope: >
React to certified stream topics and CDC events; propose bounded actions with idempotency and receipts.
Never emit PII; respect sensitivity labels; ensure at-least-once processing without duplicating side effects.
inputs:
topics:
- name: "risk.order_threshold_breach" # derived event
schema: "avro://risk/order_breach@v3"
- name: "cdc.public.orders" # Debezium change event
schema: "json://debezium/order@v2"
governance:
pii_fields: ["email", "phone", "ssn_last4"] # must be tokenized upstream
sensitivity_ceiling: "Confidential"
max_actions_per_minute: 120
dlq_topic: "ops.agent_dlq"
output:
type: object
required: [summary, actions, citations, receipts]
properties:
summary: {type: string, maxWords: 120}
actions:
type: array
items: {type: object, required: [tool, args, idempotency_key]}
citations: {type: array, items: string} # topic@schema_version
receipts: {type: array, items: string} # external IDs from tools
Typed tools (receipts or it didn’t happen)
# tools.py
from pydantic import BaseModel
from typing import Optional, Dict
class CreateTicketArgs(BaseModel):
summary: str
labels: list[str]
idem_key: str
class PlacePaymentHoldArgs(BaseModel):
order_id: str
reason: str
idem_key: str
class SendNoticeArgs(BaseModel):
to_account: str
template_id: str
idem_key: str
class ToolReceipt(BaseModel):
tool: str
ok: bool
ref: str # external receipt ID (ticket id, hold id, message id)
message: str = ""
data: Optional[Dict] = None
# adapters.py (demo—wire to Jira/SNOW, payments, comms)
from tools import *
def create_ticket(a: CreateTicketArgs) -> ToolReceipt:
return ToolReceipt(tool="CreateTicket", ok=True, ref=f"TKT-{a.idem_key[-6:]}", message="Ticket opened")
def place_payment_hold(a: PlacePaymentHoldArgs) -> ToolReceipt:
return ToolReceipt(tool="PlacePaymentHold", ok=True, ref=f"HOLD-{a.order_id}", message="Hold placed")
def send_notice(a: SendNoticeArgs) -> ToolReceipt:
return ToolReceipt(tool="SendNotice", ok=True, ref=f"MSG-{a.idem_key[-8:]}", message="Notice queued")
Event consumer with dedup + backpressure
# consumer.py
from pydantic import BaseModel
from tools import *
from adapters import *
import json, time, hashlib
ALLOWED_TOPICS = {"risk.order_threshold_breach@v3", "cdc.public.orders@v2"}
SEEN = set() # replace with Redis/Bloom filter in prod
class Envelope(BaseModel):
topic: str
key: str
ts_ms: int
schema: str
payload: dict
def idem_key(envelope: Envelope) -> str:
h = hashlib.sha256()
h.update(envelope.topic.encode())
h.update(envelope.key.encode())
h.update(str(envelope.ts_ms).encode())
return "idem-" + h.hexdigest()
def handle(envelope: Envelope):
topic_sig = f"{envelope.topic}@{envelope.schema}"
assert topic_sig in ALLOWED_TOPICS, "Topic not allowed"
ik = idem_key(envelope)
if ik in SEEN: return {"summary":"duplicate", "actions":[], "receipts":[], "citations":[topic_sig]}
SEEN.add(ik)
actions, receipts = [], []
p = envelope.payload
# Pattern 1: derived breach event → ticket + hold
if envelope.topic == "risk.order_threshold_breach":
actions.append({"tool":"CreateTicket", "args":CreateTicketArgs(
summary=f"Order {p['order_id']} risk breach ({p['rule']})",
labels=["risk","orders"], idem_key=ik), "idempotency_key":ik})
if p.get("auto_hold", True):
actions.append({"tool":"PlacePaymentHold", "args":PlacePaymentHoldArgs(
order_id=p["order_id"], reason=p["rule"], idem_key=ik), "idempotency_key":ik})
# Pattern 2: raw CDC upsert → friendly notice for high-value VIPs
if envelope.topic == "cdc.public.orders" and p.get("op") in {"c","u"} and p["after"].get("vip_flag"):
actions.append({"tool":"SendNotice", "args":SendNoticeArgs(
to_account=p["after"]["account_id"], template_id="vip-order-received", idem_key=ik), "idempotency_key":ik})
# Execute with receipts
for a in actions:
if a["tool"] == "CreateTicket": r = create_ticket(a["args"])
elif a["tool"] == "PlacePaymentHold": r = place_payment_hold(a["args"])
elif a["tool"] == "SendNotice": r = send_notice(a["args"])
else: continue
if r.ok: receipts.append(f"{r.tool}:{r.ref}")
return {
"summary": f"Processed {envelope.topic} key={envelope.key}",
"actions": [f"{a['tool']}" for a in actions],
"receipts": receipts,
"citations": [topic_sig]
}
Notes that matter in production
Replace SEEN
with a persistent dedup keyed by (topic, key, watermark)
.
Respect watermarks and out-of-order windows (e.g., 10 minutes) in the stream processor upstream.
For exactly-once side effects: use transactional outbox in the action service or a saga pattern with compensating steps.
Governance & safety rails
Schema contracts: Require Avro/Protobuf/JSON Schema with backward-compatible evolution; fail closed on unknown required fields.
PII stance: PII must be tokenized at source; the agent never sees raw PII fields.
Rate limits: Max actions per minute per key (account/order) to avoid floods.
DLQ: Any validation failure or tool error goes to ops.agent_dlq
with the envelope and error, not silently dropped.
Replayability: Agents must be idempotent on replays; rely on idempotency keys and receipts.
Observability & audit
Log per envelope: topic, partition/offset, event time vs processing time (lag), schema version, idempotency key, executed actions, external receipts, and outcome. Emit metrics for p50/p95 latency, DLQ rate, action rate, and dedup hits. Keep golden traces: a small set of canonical events and expected actions you replay in CI.
Common failure modes (and fixes)
Duplicate side effects on retries → Require idempotency keys; design tools as upsert/no-op.
Out-of-order breaches → Add a stream processor that computes sessionized state and emits a single breach per window/key.
Schema drift breaks agent → Pin to schema versions and adopt compatibility testing in CI with sample payloads.
Backpressure stalls → Implement bounded queues and drop or degrade policies (e.g., ticket only, skip notice) when backlogged.
PII leakage via passthrough → Enforce topic allowlists and strip/deny unapproved fields before agent logic.
End-to-end example scenario
A Flink job publishes risk.order_threshold_breach@v3
when a customer places 3 high-value orders within 10 minutes. The agent consumes the event, opens a ticket with the rule context, places a payment hold (receipt HOLD-12345
), and sends a VIP notice if the account is marked VIP in the CDC payload. The response includes citations to the topic+schema and all receipts. On replay, idempotency keys ensure no duplicate holds or tickets.
Conclusion
Streaming and CDC let agents move from “smart dashboards” to operational reflexes. By treating topics and schemas as contracts, using typed, idempotent tools, and insisting on receipts, you get low-latency automation that’s safe, auditable, and easy to reason about. Pair this with your database and warehouse patterns from earlier parts, and you’ve got an agent platform that spans real-time, transactional, and analytical worlds—without the 3 a.m. pager.