Langchain  

Dynamic Transaction Routing with LangGraph

Introduction

In financial services, not all transactions are created equal. A $50 domestic transfer requires a fundamentally different processing path than a $5M cross-border wire to a high-risk jurisdiction. Traditional linear LLM chains fail here because they force every input through the same sequence of operations, wasting compute and introducing latency.

LangGraph solves this by enabling stateful, cyclic, and conditionally routed multi-agent systems. By leveraging Conditional Edges, we can dynamically route transactions to specialized sub-agents (Auto-Approval, Enhanced Due Diligence, or Human-in-the-Loop Compliance) based on real-time risk scoring.

This article provides an end-to-end guide to building a real-time Anti-Money Laundering (AML) and Fraud Detection routing engine using LangGraph.

Prerequisites

Before building this system, ensure your environment meets the following requirements:

Python 3.10+

Required for modern type hinting and asyncio performance.

Core Libraries

  • langgraph>=0.2.0 (For stateful graph orchestration)

  • langchain-core, langchain-openai (For LLM interactions and structured output)

  • pydantic>=2.0 (For strict state validation, critical in finance)

API Keys

An OpenAI API key (or equivalent endpoint for models like gpt-4o or locally hosted Llama-3) configured via environment variables.

Conceptual Knowledge

Basic understanding of LangGraph nodes, edges, and the StateGraph paradigm.

Observability Setup (Recommended)

A LangSmith account to trace decision paths for regulatory audits.

Installation

pip install langgraph langchain-openai pydantic

The Real-Time Use Case: Cross-Border Payment Triage

Scenario

A payment gateway receives a high-velocity stream of cross-border wire transfers. Each transaction payload includes sender/receiver details, amount, and jurisdiction.

The Goal

  1. Ingest the transaction.

  2. Use an LLM (augmented with deterministic rules) to assign a risk_level (LOW, MEDIUM, HIGH).

  3. Conditionally route the transaction:

    • LOW: Route to Auto-Approve Agent (sub-100ms latency).

    • MEDIUM: Route to EDD Agent (Enhanced Due Diligence) to check mock sanctions lists.

    • HIGH: Route to Compliance Agent, which triggers a Human-in-the-Loop (HITL) interrupt for manual review.

Step 1: Define the Strict State Schema

In financial systems, state must be strictly typed to prevent hallucinated or malformed data from propagating. We use Pydantic.

from typing import TypedDict, Annotated, List, Literal
from pydantic import BaseModel, Field
import operator

# Define the possible risk levels
RiskLevel = Literal["LOW", "MEDIUM", "HIGH"]
Decision = Literal["APPROVED", "FLAGGED_EDD", "PENDING_MANUAL_REVIEW", "REJECTED"]

class TransactionPayload(BaseModel):
    tx_id: str = Field(description="Unique transaction identifier")
    amount_usd: float = Field(description="Transaction amount in USD")
    sender_country: str = Field(description="ISO 3166-1 alpha-3 code of sender")
    receiver_country: str = Field(description="ISO 3166-1 alpha-3 code of receiver")
    description: str = Field(description="Payment reference or description")

class GraphState(BaseModel):
    transaction: TransactionPayload
    risk_level: RiskLevel | None = Field(default=None, description="Assessed risk level")
    risk_reasoning: str | None = Field(default=None, description="LLM reasoning for the risk score")
    edd_checks: List[str] = Field(default_factory=list, description="Results of EDD sanctions checks")
    final_decision: Decision | None = Field(default=None, description="Final routing decision")
    messages: Annotated[List[str], operator.add] = Field(default_factory=list, description="Audit trail of agent actions")

Step 2: Define the Agent Nodes

Each node represents a specialized agent or function. We will use gpt-4o-mini for fast, structured risk assessment.

import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langgraph.graph import StateGraph, END

# Initialize a fast, structured-output-capable LLM
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.0)

# 1. Triage Node: Assesses initial risk
def triage_node(state: GraphState) -> dict:
    tx = state.transaction
    
    prompt = ChatPromptTemplate.from_messages([
        ("system", """You are an expert AML compliance officer. Assess the risk of this transaction.
        Rules:
        - HIGH: Amount > $100,000 OR involves high-risk jurisdictions (e.g., 'XXX', 'RUS', 'IRN').
        - MEDIUM: Amount between $10,000 and $100,000 OR vague payment descriptions.
        - LOW: Amount < $10,000 and clear, legitimate descriptions between low-risk countries.
        Output ONLY the risk level and a brief reasoning."""),
        ("human", "Transaction: {tx_data}")
    ])
    
    # Use structured output for deterministic parsing
    class RiskAssessment(BaseModel):
        risk_level: RiskLevel
        reasoning: str

    structured_llm = llm.with_structured_output(RiskAssessment)
    chain = prompt | structured_llm
    
    result = chain.invoke({"tx_data": tx.model_dump_json()})
    
    return {
        "risk_level": result.risk_level,
        "risk_reasoning": result.reasoning,
        "messages": [f"[TRIAGE] Assessed risk as {result.risk_level}: {result.reasoning}"]
    }

# 2. Auto-Approve Node (LOW Risk)
def auto_approve_node(state: GraphState) -> dict:
    return {
        "final_decision": "APPROVED",
        "messages": [f"[AUTO-APPROVE] Transaction {state.transaction.tx_id} cleared instantly."]
    }

# 3. Enhanced Due Diligence Node (MEDIUM Risk)
def edd_node(state: GraphState) -> dict:
    tx = state.transaction
    # Mock external API call to a sanctions database (e.g., Refinitiv, LexisNexis)
    mock_sanctions_check = "CLEAR" if tx.receiver_country != "XXX" else "MATCH_FOUND"
    
    decision = "REJECTED" if mock_sanctions_check == "MATCH_FOUND" else "FLAGGED_EDD"
    
    return {
        "edd_checks": [f"Sanctions check on {tx.receiver_country}: {mock_sanctions_check}"],
        "final_decision": decision,
        "messages": [f"[EDD] Performed sanctions check. Result: {mock_sanctions_check}. Decision: {decision}"]
    }

# 4. Compliance / HITL Node (HIGH Risk)
def compliance_node(state: GraphState) -> dict:
    # In a real system, this would trigger a LangGraph "interrupt" to pause execution 
    # and wait for human approval via an external UI/API.
    return {
        "final_decision": "PENDING_MANUAL_REVIEW",
        "messages": [f"[COMPLIANCE] HIGH RISK DETECTED. Halted transaction {state.transaction.tx_id}. Escalated to human reviewer."]
    }

Step 3: Implement the Conditional Edge (The Core Logic)

This is where LangGraph shines. Instead of hardcoding the next step, we define a router function that inspects the state and returns the name of the next node to execute.

# The Router Function
def route_by_risk(state: GraphState) -> Literal["auto_approve", "edd", "compliance"]:
    """
    Inspects the state's risk_level and routes to the appropriate specialized agent.
    """
    if state.risk_level == "LOW":
        return "auto_approve"
    elif state.risk_level == "MEDIUM":
        return "edd"
    else:
        return "compliance"

# Build the Graph
builder = StateGraph(GraphState)

# Add all nodes
builder.add_node("triage", triage_node)
builder.add_node("auto_approve", auto_approve_node)
builder.add_node("edd", edd_node)
builder.add_node("compliance", compliance_node)

# Define the ENTRY point
builder.set_entry_point("triage")

# Add the CONDITIONAL EDGE
# This tells LangGraph: "After 'triage' finishes, run 'route_by_risk'. 
# Depending on the string it returns, go to that specific node."
builder.add_conditional_edges(
    source="triage",
    path=route_by_risk,
    # Explicit mapping for graph visualization and validation
    path_map={
        "auto_approve": "auto_approve",
        "edd": "edd",
        "compliance": "compliance"
    }
)

# Define the EXIT points (all specialized nodes lead to END)
builder.add_edge("auto_approve", END)
builder.add_edge("edd", END)
builder.add_edge("compliance", END)

# Compile the graph
graph = builder.compile()

# Optional: Visualize the graph (requires graphviz)
# graph.get_graph().draw_mermaid_png()

Step 4: Real-Time Execution & Testing

Let's simulate a real-time stream of three distinct transactions hitting our gateway. We will use graph.ainvoke for non-blocking, high-throughput processing.

import asyncio

async def process_stream():
    # Simulated high-velocity transaction stream
    live_transactions = [
        TransactionPayload(
            tx_id="TXN-001", amount_usd=500.0, sender_country="USA", 
            receiver_country="CAN", description="Software subscription payment"
        ),
        TransactionPayload(
            tx_id="TXN-002", amount_usd=45000.0, sender_country="GBR", 
            receiver_country="ARE", description="Consulting services invoice #992"
        ),
        TransactionPayload(
            tx_id="TXN-003", amount_usd=250000.0, sender_country="DEU", 
            receiver_country="XXX", description="Unspecified wire transfer"
        )
    ]

    print(" Starting Real-Time Transaction Processing Stream...\n")

    for tx in live_transactions:
        print(f"--- Processing {tx.tx_id} (${tx.amount_usd}) ---")
        
        initial_state = GraphState(transaction=tx)
        
        # Stream the execution to see step-by-step agent actions in real-time
        async for event in graph.astream(initial_state, stream_mode="updates"):
            for node_name, node_output in event.items():
                print(f"  ↳ [{node_name.upper()}] Executed")
                if "messages" in node_output:
                    for msg in node_output["messages"]:
                        print(f"     {msg}")
        
        # Fetch final state to confirm decision
        final_state = await graph.ainvoke(initial_state)
        print(f"  FINAL DECISION: {final_state['final_decision']}\n")

if __name__ == "__main__":
    asyncio.run(process_stream())

Expected Output

 Starting Real-Time Transaction Processing Stream...

--- Processing TXN-001 ($500.0) ---
  ↳ [TRIAGE] Executed
     [TRIAGE] Assessed risk as LOW: Amount is under $10,000 and involves low-risk jurisdictions (USA to CAN) with a clear description.
  ↳ [AUTO_APPROVE] Executed
     [AUTO-APPROVE] Transaction TXN-001 cleared instantly.
   FINAL DECISION: APPROVED

--- Processing TXN-002 ($45000.0) ---
  ↳ [TRIAGE] Executed
     [TRIAGE] Assessed risk as MEDIUM: Amount is between $10,000 and $100,000.
  ↳ [EDD] Executed
     [EDD] Performed sanctions check. Result: CLEAR. Decision: FLAGGED_EDD
   FINAL DECISION: FLAGGED_EDD

--- Processing TXN-003 ($250000.0) ---
  ↳ [TRIAGE] Executed
     [TRIAGE] Assessed risk as HIGH: Amount exceeds $100,000 and involves a high-risk jurisdiction (XXX).
  ↳ [COMPLIANCE] Executed
     [COMPLIANCE] HIGH RISK DETECTED. Halted transaction TXN-003. Escalated to human reviewer.
   FINAL DECISION: PENDING_MANUAL_REVIEW

Step 5: Integrating with Real-Time Streams (Kafka/WebSockets)

In production, you won't iterate over a static list. You will tie the astream or ainvoke to an async message broker.

from aiokafka import AIOKafkaConsumer
import json

async def kafka_consumer_loop():
    consumer = AIOKafkaConsumer(
        'financial_transactions',
        bootstrap_servers='localhost:9092',
        value_deserializer=lambda v: json.loads(v.decode('utf-8'))
    )
    await consumer.start()
    
    try:
        async for msg in consumer:
            tx_data = TransactionPayload(**msg.value)
            state = GraphState(transaction=tx_data)
            
            # Process asynchronously without blocking the consumer
            final_state = await graph.ainvoke(state)
            
            # Emit decision to a downstream topic (e.g., 'transaction_decisions')
            # await producer.send('transaction_decisions', final_state.model_dump())
    finally:
        await consumer.stop()
17

Production Best Practices for Financial LangGraph Systems

Human-in-the-Loop (HITL) via Breakpoints

For the compliance node, use LangGraph's interrupt feature:

# In compilation:
graph = builder.compile(
    checkpointer=memory_saver,
    interrupt_before=["compliance"]
)

This pauses the graph, saves the state to a database (PostgreSQL/Redis), and allows a compliance officer to approve/reject via an API, resuming the graph with graph.invoke(None, config).

Idempotency

Ensure your tx_id is used as a deduplication key. If a Kafka message is replayed, the system should recognize the tx_id and return the cached decision rather than re-running the LLM.

Observability & Auditability

Financial regulators require explainability. Wrap the graph execution in LangSmith:

os.environ["LANGCHAIN_TRACING_V2"] = "true"

The messages array in our GraphState acts as a built-in, immutable audit trail of why a decision was made.

Fallback Mechanisms

If the LLM API times out or returns a schema validation error, the triage_node should catch the exception and default to a HIGH-risk routing (fail-safe/fail-closed) to ensure no risky transaction slips through auto-approval.

State Pruning

For long-running HITL workflows, ensure your checkpointer doesn't bloat. Prune intermediate LLM token outputs from the saved state, retaining only the structured GraphState Pydantic model.

Conclusion

Conditional edges in LangGraph transform AI from a static text generator into a dynamic, state-aware routing engine. By defining a strict Pydantic state, specialized agent nodes, and a deterministic routing function, you can build a financial triage system that is both highly performant (sub-100ms for low-risk) and rigorously compliant (escalating high-risk anomalies).

As regulatory scrutiny on AI in finance intensifies in 2026, this pattern of explicit state tracking, conditional routing, and human-in-the-loop fallbacks will become the gold standard for production AI architecture.