AI Automation & Agents  

AI Agents in Practice: Master Data & Entity Resolution Agent (Prompts + Code)

Introduction

As your agents begin to read and write across apps, warehouses, and streams, identity fragmentation becomes the silent killer: the same customer appears as five records; suppliers are duplicated across ERPs; employees show up as both contractors and full-timers. This article delivers a Master Data & Entity Resolution Agent that deduplicates and links records into golden entities—safely, audibly, and with receipts—so every downstream agent (support, billing, analytics) acts on a consistent truth.


The Use Case

Traditional MDM projects are slow and centralized. Meanwhile, agents need good-enough, explainable resolution now. Our agent ingests candidate records (customers/suppliers/products), computes match proposals (deterministic + fuzzy), requests approvals for risky merges, executes merges through governed APIs, and emits linkage edges for analytics. Crucially, it never claims success without receipts (merge job IDs, link IDs, ticket IDs) and it records why two records were linked (feature attributions and thresholds).


Prompt Contract (agent interface)

# file: contracts/entity_resolution_v1.yaml
role: "EntityResolutionAgent"
scope: >
  Propose, approve, and execute entity links/merges across systems using governed rules.
  Ask once if critical fields are missing (domain, records[], ruleset, risk_tolerance).
  Never assert success without a receipt (link_id, merge_id, ticket id).
inputs:
  domain: enum["customer","supplier","employee","product"]
  ruleset: string                  # e.g., "cust_rules_v3"
  risk_tolerance: enum["low","medium","high"]
  records:                         # candidates from one or more systems
    - {system: string, id: string, attrs: object}
output:
  type: object
  required: [summary, proposals, decisions, citations, next_steps, tool_proposals]
  properties:
    summary: {type: string, maxWords: 100}
    proposals:
      type: array
      items:
        type: object
        required: [pair, score, features, action]
        properties:
          pair: {type: array, items: string}         # ["crm:123","billing:987"]
          score: {type: number}                      # 0..1
          features: {type: array, items: string}     # e.g., "email_exact","name_jaro=0.92"
          action: {type: string, enum: ["auto_link","review","reject"]}
    decisions:
      type: array
      items:
        type: object
        required: [pair, decision, reason, receipt]
        properties:
          pair: {type: array, items: string}
          decision: {type: string, enum: ["linked","merged","rejected","needs_review"]}
          reason: {type: string}
          receipt: {type: string}
    citations: {type: array, items: string}          # ruleset id, policy claims
    next_steps: {type: array, items: string, maxItems: 6}
    tool_proposals:
      type: array
      items:
        type: object
        required: [name, args, preconditions, idempotency_key]
        properties:
          name:
            type: string
            enum: [LoadRules, ComputeFeatures, ProposeMatches, RequestApproval,
                   CreateLink, MergeRecords, OpenTicket, EmitLineage]
          args: {type: object}
          preconditions: {type: string}
          idempotency_key: {type: string}
policy_id: "mdm_policy.v4"
citation_rule: "Minimal-span references to ruleset id and feature thresholds."

Tool Interfaces (typed, with receipts)

# tools.py
from pydantic import BaseModel
from typing import List, Dict, Optional

class LoadRulesArgs(BaseModel):
    ruleset: str

class ComputeFeaturesArgs(BaseModel):
    domain: str
    records: List[Dict]  # [{system,id,attrs}]

class ProposeMatchesArgs(BaseModel):
    features: List[Dict] # output from ComputeFeatures
    risk_tolerance: str  # "low"|"medium"|"high"

class RequestApprovalArgs(BaseModel):
    pair: List[str]
    score: float
    features: List[str]
    approvers: List[str]

class CreateLinkArgs(BaseModel):
    canonical_id: str
    member_id: str
    domain: str

class MergeRecordsArgs(BaseModel):
    domain: str
    into_id: str
    from_id: str
    field_strategy: Dict[str,str]  # {"email":"prefer_non_null","name":"longest","address":"most_recent"}

class OpenTicketArgs(BaseModel):
    title: str
    description: str
    severity: str
    owners: List[str]

class EmitLineageArgs(BaseModel):
    domain: str
    canonical_id: str
    members: List[str]

class ToolReceipt(BaseModel):
    tool: str
    ok: bool
    ref: str        # e.g., link_id, merge_id, ticket id
    message: str = ""
    data: Optional[Dict] = None
# adapters.py  (demo logic; wire to your MDM/CRM/ERP/catalog in prod)
from tools import *
import uuid, random

RULES = {
  "cust_rules_v3": {
    "auto_link_threshold": 0.93,
    "review_threshold": 0.80,
    "feature_weights": {"email_exact":0.6,"phone_e164":0.25,"name_jaro":0.1,"addr_city_exact":0.05}
  }
}
APPROVERS = {"customer": ["dataops@company","crm-owner@company"]}

def load_rules(a: LoadRulesArgs) -> ToolReceipt:
    return ToolReceipt(tool="LoadRules", ok=True, ref=a.ruleset, data=RULES.get(a.ruleset, {}))

def compute_features(a: ComputeFeaturesArgs) -> ToolReceipt:
    feats = []
    recs = a.records
    # toy pairwise features for first two records
    for i in range(len(recs)):
        for j in range(i+1, len(recs)):
            r1, r2 = recs[i], recs[j]
            email_exact = int(r1["attrs"].get("email","").lower()==r2["attrs"].get("email","").lower() and r1["attrs"].get("email"))
            phone_e164 = int(r1["attrs"].get("phone")==r2["attrs"].get("phone") and r1["attrs"].get("phone"))
            name_jaro  = round(random.uniform(0.7, 0.99), 2)  # stand-in
            feats.append({"pair":[f"{r1['system']}:{r1['id']}", f"{r2['system']}:{r2['id']}"],
                          "features":[f"email_exact={email_exact}", f"phone_e164={phone_e164}", f"name_jaro={name_jaro}"]})
    return ToolReceipt(tool="ComputeFeatures", ok=True, ref="feats-1", data={"features":feats})

def propose_matches(a: ProposeMatchesArgs) -> ToolReceipt:
    # toy scorer reading features
    out = []
    for f in a.features:
        score = 0.0
        for feat in f["features"]:
            k,v = feat.split("="); v = float(v)
            if k=="email_exact": score += 0.6*v
            if k=="phone_e164":  score += 0.25*v
            if k=="name_jaro":   score += 0.15*v
        action = "reject"
        if score >= 0.93: action = "auto_link"
        elif score >= 0.80: action = "review" if a.risk_tolerance!="high" else "auto_link"
        out.append({"pair":f["pair"], "score":round(score,2), "features":f["features"], "action": action})
    return ToolReceipt(tool="ProposeMatches", ok=True, ref="prop-1", data={"proposals":out})

def request_approval(a: RequestApprovalArgs) -> ToolReceipt:
    return ToolReceipt(tool="RequestApproval", ok=True, ref="APR-001", message="Approval requested", data={"approvers":a.approvers})

def create_link(a: CreateLinkArgs) -> ToolReceipt:
    return ToolReceipt(tool="CreateLink", ok=True, ref=f"LINK-{uuid.uuid4().hex[:8]}", message="Linked to canonical")

def merge_records(a: MergeRecordsArgs) -> ToolReceipt:
    return ToolReceipt(tool="MergeRecords", ok=True, ref=f"MERGE-{uuid.uuid4().hex[:8]}", message="Merged into canonical")

def open_ticket(a: OpenTicketArgs) -> ToolReceipt:
    return ToolReceipt(tool="OpenTicket", ok=True, ref="MDM-217", message="Review ticket opened")

def emit_lineage(a: EmitLineageArgs) -> ToolReceipt:
    return ToolReceipt(tool="EmitLineage", ok=True, ref=f"EDGE-{uuid.uuid4().hex[:8]}", message="Lineage emitted")

Agent Loop (proposal → verification → execution → receipts)

# agent_entity_resolution.py
import uuid, json
from typing import Any, Dict, List
from tools import *
from adapters import *

ALLOWED = {"LoadRules","ComputeFeatures","ProposeMatches","RequestApproval","CreateLink","MergeRecords","OpenTicket","EmitLineage"}

def new_idem(): return f"idem-{uuid.uuid4()}"

def verify(p: Dict[str,Any]) -> str:
    need = {"name","args","preconditions","idempotency_key"}
    if not need.issubset(p): return "Missing proposal fields"
    if p["name"] not in ALLOWED: return "Tool not allowed"
    return ""

def run(p: Dict[str,Any]) -> ToolReceipt:
    n,a = p["name"], p["args"]
    return (
        load_rules(LoadRulesArgs(**a)) if n=="LoadRules" else
        compute_features(ComputeFeaturesArgs(**a)) if n=="ComputeFeatures" else
        propose_matches(ProposeMatchesArgs(**a)) if n=="ProposeMatches" else
        request_approval(RequestApprovalArgs(**a)) if n=="RequestApproval" else
        create_link(CreateLinkArgs(**a)) if n=="CreateLink" else
        merge_records(MergeRecordsArgs(**a)) if n=="MergeRecords" else
        open_ticket(OpenTicketArgs(**a)) if n=="OpenTicket" else
        emit_lineage(EmitLineageArgs(**a)) if n=="EmitLineage" else
        ToolReceipt(tool=n, ok=False, ref="none", message="Unknown tool")
    )

# --- Model shim producing a plan per contract (replace with your LLM) ---
def plan(req: Dict[str,Any]) -> Dict[str,Any]:
    return {
      "summary": f"Resolve {req['domain']} records using {req['ruleset']} with {req['risk_tolerance']} risk tolerance.",
      "proposals": [],
      "decisions": [],
      "citations": [req["ruleset"]],
      "next_steps": ["Load rules","Compute features","Propose matches","Auto-link high confidence","Route reviews","Emit lineage"],
      "tool_proposals": [
        {"name":"LoadRules","args":{"ruleset":req["ruleset"]},
         "preconditions":"Get thresholds & weights.","idempotency_key": new_idem()},
        {"name":"ComputeFeatures","args":{"domain":req["domain"],"records":req["records"]},
         "preconditions":"Build pairwise features.","idempotency_key": new_idem()},
        {"name":"ProposeMatches","args":{"features":[],"risk_tolerance":req["risk_tolerance"]},
         "preconditions":"Score pairs.","idempotency_key": new_idem()},
        # The following steps will be expanded after proposals, but we include exemplars:
        {"name":"OpenTicket","args":{"title":"MDM review queue","description":"Approve medium-confidence matches","severity":"low","owners":APPROVERS[req["domain"]]},
         "preconditions":"Track manual reviews.","idempotency_key": new_idem()}
      ]
    }

def handle(req: Dict[str,Any]) -> str:
    p = plan(req)
    receipts: List[ToolReceipt] = []
    # 1) Load rules, compute features, propose matches
    for tp in p["tool_proposals"][:3]:
        err = verify(tp)
        if err: receipts.append(ToolReceipt(tool=tp["name"], ok=False, ref="blocked", message=err)); continue
        if tp["name"]=="ProposeMatches":
            # fill features from prior step
            feats = [r for r in receipts if r.tool=="ComputeFeatures"][-1].data["features"] if any(r.tool=="ComputeFeatures" for r in receipts) else []
            tp["args"]["features"] = feats
        receipts.append(run(tp))

    idx = {r.tool:r for r in receipts}
    proposals = idx["ProposeMatches"].data["proposals"] if idx.get("ProposeMatches") else []

    # 2) Execute decisions: auto-link; review others; (optional) merge
    decisions = []
    for pr in proposals:
        pair = pr["pair"]; score = pr["score"]
        if pr["action"]=="auto_link":
            # choose first as canonical for demo
            rec = run({"name":"CreateLink","args":{"canonical_id":pair[0],"member_id":pair[1],"domain":req["domain"]},
                       "preconditions":"High confidence link.","idempotency_key": new_idem()})
            receipts.append(rec)
            decisions.append({"pair":pair,"decision":"linked","reason":f"score={score}>=auto_threshold","receipt":rec.ref})
        elif pr["action"]=="review":
            rec = run({"name":"RequestApproval","args":{"pair":pair,"score":score,"features":pr["features"],"approvers":APPROVERS[req["domain"]]},
                       "preconditions":"Medium confidence requires approval.","idempotency_key": new_idem()})
            receipts.append(rec)
            decisions.append({"pair":pair,"decision":"needs_review","reason":"below auto threshold","receipt":rec.ref})
        else:
            decisions.append({"pair":pair,"decision":"rejected","reason":"low score","receipt":""})

    # 3) Emit lineage for any new canonical links
    linked = [d for d in decisions if d["decision"]=="linked"]
    if linked:
        # group by canonical
        by_can = {}
        for d in linked:
            can, mem = d["pair"][0], d["pair"][1]
            by_can.setdefault(can, []).append(mem)
        for can, members in by_can.items():
            receipts.append(run({"name":"EmitLineage","args":{"domain":req["domain"],"canonical_id":can,"members":members},
                                 "preconditions":"Expose edges to catalog/BI.","idempotency_key": new_idem()}))

    # 4) Open ticket (already planned) for reviews
    receipts.append(run(p["tool_proposals"][-1]))

    lines = [p["summary"], ""]
    lines.append("Proposals:")
    for pr in proposals:
        lines.append(f"- {pr['pair']} score={pr['score']} → {pr['action']} (features: {', '.join(pr['features'])})")
    lines.append("\nDecisions:")
    for d in decisions:
        lines.append(f"- {d['pair']} → {d['decision']} ({d['reason']}) {('['+d['receipt']+']') if d['receipt'] else ''}")
    # show receipts of emitted edges/links
    for r in receipts:
        if r.tool in {"CreateLink","MergeRecords","EmitLineage","OpenTicket"} and r.ok:
            lines.append(f"{r.tool}: {r.ref} — {r.message}")
    lines.append("\nNext steps:")
    for s in p["next_steps"]: lines.append(f"• {s}")
    lines.append("\nCitations: " + ", ".join(p["citations"]))
    return "\n".join(lines)

if __name__ == "__main__":
    example = {
      "domain":"customer",
      "ruleset":"cust_rules_v3",
      "risk_tolerance":"medium",
      "records":[
        {"system":"crm","id":"123","attrs":{"email":"[email protected]","phone":"+14085551234","name":"Alex Carter","city":"SF"}},
        {"system":"billing","id":"987","attrs":{"email":"[email protected]","phone":"+14085551234","name":"A. Carter","city":"San Francisco"}},
        {"system":"support","id":"S-45","attrs":{"email":"[email protected]","phone":"+14085551234","name":"Alex C","city":"San Fran"}}
      ]
    }
    print(handle(example))

The Prompt You’d Send to the Model (concise and testable)

System:
You are EntityResolutionAgent. Follow the contract:
- Ask once if domain, ruleset, risk_tolerance, or records[] are missing.
- Cite the ruleset id and feature thresholds in minimal spans.
- Propose tools; never assert success without receipts.
- Output JSON with: summary, proposals[], decisions[], citations[], next_steps[], tool_proposals[].

User:
Resolve these customer records using cust_rules_v3, medium risk tolerance:
[{"system":"crm","id":"123","attrs":{"email":"[email protected]","phone":"+14085551234","name":"Alex Carter","city":"SF"}},
 {"system":"billing","id":"987","attrs":{"email":"[email protected]","phone":"+14085551234","name":"A. Carter","city":"San Francisco"}},
 {"system":"support","id":"S-45","attrs":{"email":"[email protected]","phone":"+14085551234","name":"Alex C","city":"San Fran"}}]

How to adapt quickly

  • Wire to your MDM/CRM/ERP: Map CreateLink/MergeRecords to first-party APIs (Salesforce merge, SAP/Oracle vendor merge) with idempotency keys.

  • Ground the scorer: Replace the toy scorer with a hybrid: deterministic keys (email/phone/tax id) + ML similarity (name/address/companies). Persist per-feature explanations for every decision.

  • Governance: Encode thresholds in versioned rulesets; enforce sensitivity ceilings (e.g., tax IDs never leave the secure enclave); require human approvals for merges above a blast-radius size.

  • Observability: Log feature vectors, ruleset version, decision, and receipts. Emit edges to your catalog/lineage so BI and other agents can traverse canonical→member graphs.

  • Safety: Prefer link over merge by default; schedule merges in low-risk windows; keep a revert procedure with restore receipts.


Conclusion

Agents become dependable when they act on consistent identities. A Master Data & Entity Resolution Agent gives you just that: explainable proposals, governed approvals, idempotent links/merges, and auditable receipts. Put this alongside your database, streaming, data-quality, and cost agents, and your AI stack will operate on a single, trustworthy view of customers, suppliers, and products—no more duplicate surprises.