Langchain  

Handling API Payload Truncation in Enterprise RAG

Modern LLMs boast context windows of 128K–200K tokens, but enterprise RAG applications still routinely hit context limits when integrating with external APIs like Salesforce. Naive truncation (e.g., string slicing, dropping trailing records) destroys relational context, breaks retrieval quality, and introduces hallucination risks.

This article presents a production-grade, LangGraph-orchestrated pattern for adaptive payload truncation, combining semantic filtering, dynamic chunking, contextual compression, and fallback routing. We’ll walk through a real-time enterprise use case: a Sales Intelligence Assistant that queries Salesforce Accounts, Opportunities, and Case histories while guaranteeing context window compliance, auditability, and RAG fidelity.

The Context Window Trap in Enterprise RAG

External APIs return unpredictable, deeply nested payloads:

  • Salesforce SOQL queries can return thousands of records with related objects (Opportunity → Cases → Activities → Attachments)

  • A single API response can easily exceed 50K–150K tokens after JSON serialization

  • LLM context budgets must be split across: system prompt, user query, retrieved chunks, tool outputs, and output generation

  • Blind truncation loses critical business context (e.g., dropping a pricing objection case because it appeared late in the payload)

Enterprise Requirement: Truncation must be semantic, bounded, auditable, and retry-aware.

Real-World Use Case: Enterprise Sales Intelligence Assistant

Query: "Summarize the top risks for ACME Corp's Q3 renewal opportunity and pull recent case notes mentioning pricing objections or SLA breaches."

Expected Salesforce Payload:

  • 1 Account record (50 fields)

  • 12 Opportunity records with line items

  • 84 Case records with comments & activity history

  • Custom object metadata (Contract__c, Pricing_History__c)

30 JUne article 318-1

Problem: Raw JSON payload ≈ 110K tokens. LLM context budget for tools: 60K tokens. Naive truncation drops 70% of cases, losing the exact pricing objections the query targets.

Solution: LangGraph orchestrates an adaptive truncation pipeline that:

  1. Prunes non-essential fields

  2. Filters records by semantic relevance to the query

  3. Chunks payloads into budget-aligned segments

  4. Routes chunks through a RAG retriever for contextual compression

  5. Falls back to lightweight summarization if budget is still exceeded

Architecture: LangGraph State Machine for Adaptive Payload Handling

30 JUne article 318

LangGraph’s state machine enables:

  • Deterministic budget tracking across nodes

  • Conditional routing for fallback strategies

  • Parallel chunk processing via Send

  • Full audit trails for compliance

Step-by-Step Implementation

1. State Definition

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, List, Dict, Any, Optional
import tiktoken

class SalesAgentState(TypedDict):
    query: str
    sf_account_id: str
    raw_payload: List[Dict[str, Any]]
    token_budget: int
    processed_chunks: List[List[Dict[str, Any]]]
    compressed_context: List[str]
    response: str
    truncation_metadata: Dict[str, Any]
    retry_count: int
    error: Optional[str]

2. Salesforce Data Fetcher (Paginated)

from simple_salesforce import Salesforce
import os

def fetch_salesforce_data(state: SalesAgentState) -> SalesAgentState:
    sf = Salesforce(username=os.getenv("SF_USER"), 
                    password=os.getenv("SF_PASS"), 
                    security_token=os.getenv("SF_TOKEN"),
                    domain="login")
    
    # SOQL with related objects & pagination
    soql = f"""
    SELECT Id, Name, StageName, Amount, CloseDate, Description, 
           (SELECT Id, Subject, Description, Status FROM Cases),
           (SELECT Id, ActivityDate, Description FROM ActivityHistories)
    FROM Opportunity 
    WHERE AccountId = '{state['sf_account_id']}' 
    ORDER BY CloseDate DESC
    """
    
    records = sf.query_all(soql)['records']
    # Remove SF metadata
    clean = [{k: v for k, v in r.items() if not k.startswith("attributes")} for r in records]
    
    state["raw_payload"] = clean
    return state

3. Adaptive Truncation & Semantic Chunking

This node enforces token budgets while preserving semantic relevance.

import json
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

def adaptive_truncate(state: SalesAgentState) -> SalesAgentState:
    enc = tiktoken.get_encoding("cl100k_base")
    budget = state["token_budget"]
    raw = state["raw_payload"]
    
    # 1. Field Pruning: Keep only query-relevant fields
    essential_fields = {"Id", "Name", "StageName", "Amount", "CloseDate", "Description", "Cases", "ActivityHistories"}
    filtered = [{k: v for k, v in r.items() if k in essential_fields} for r in raw]
    
    # 2. Token Estimation & Chunking
    chunks = []
    current_chunk = []
    current_tokens = 0
    
    for rec in filtered:
        rec_str = json.dumps(rec, default=str)
        rec_tokens = len(enc.encode(rec_str))
        
        if current_tokens + rec_tokens > budget * 0.8:  # 80% buffer for system/retrieval overhead
            chunks.append(current_chunk)
            current_chunk = [rec]
            current_tokens = rec_tokens
        else:
            current_chunk.append(rec)
            current_tokens += rec_tokens
    
    if current_chunk:
        chunks.append(current_chunk)
        
    state["processed_chunks"] = chunks
    state["truncation_metadata"] = {
        "total_records": len(raw),
        "kept_records": sum(len(c) for c in chunks),
        "chunks_generated": len(chunks),
        "estimated_tokens": current_tokens
    }
    return state

4. RAG Integration & Contextual Compression

Instead of passing raw chunks to the LLM, we embed, retrieve, and compress.

from langchain_text_splitters import RecursiveJsonSplitter
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_openai import ChatOpenAI

def compress_with_rag(state: SalesAgentState) -> SalesAgentState:
    if not state["processed_chunks"]:
        state["compressed_context"] = []
        return state
        
    # Flatten chunks into documents
    splitter = RecursiveJsonSplitter(max_chunk_size=1000)
    docs = []
    for i, chunk in enumerate(state["processed_chunks"]):
        chunk_text = json.dumps(chunk, indent=2)
        docs.extend(splitter.create_documents([chunk_text], metadatas=[{"chunk_id": i}]))
    
    # Embed & store
    embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
    vectorstore = FAISS.from_documents(docs, embeddings)
    
    # Contextual Compression
    compressor = LLMChainExtractor.from_llm(ChatOpenAI(model="gpt-4o-mini"))
    retriever = ContextualCompressionRetriever(
        base_compressor=compressor,
        base_retriever=vectorstore.as_retriever(search_kwargs={"k": 8})
    )
    
    # Retrieve & compress based on query
    retrieved = retriever.invoke(state["query"])
    state["compressed_context"] = [doc.page_content for doc in retrieved]
    return state

5. Graph Assembly & Conditional Routing

LangGraph handles fallbacks if compression still exceeds budget.

def check_context_budget(state: SalesAgentState) -> str:
    enc = tiktoken.get_encoding("cl100k_base")
    total_tokens = sum(len(enc.encode(ctx)) for ctx in state["compressed_context"])
    
    if total_tokens <= state["token_budget"]:
        return "proceed_to_llm"
    elif state["retry_count"] < 2:
        return "fallback_summarize"
    else:
        return "abort_with_warning"

def fallback_summarize(state: SalesAgentState) -> SalesAgentState:
    # Lightweight summarization when RAG compression isn't enough
    llm = ChatOpenAI(model="gpt-4o-mini")
    summary = llm.invoke(f"Summarize these Salesforce records for sales risk analysis:\n{state['compressed_context']}")
    state["compressed_context"] = [summary.content]
    state["retry_count"] += 1
    return state

def build_sales_graph():
    workflow = StateGraph(SalesAgentState)
    
    workflow.add_node("fetch_sf", fetch_salesforce_data)
    workflow.add_node("truncate", adaptive_truncate)
    workflow.add_node("compress_rag", compress_with_rag)
    workflow.add_node("summarize", fallback_summarize)
    workflow.add_node("generate", lambda s: {"response": "LLM generation step (omitted for brevity)"})
    
    workflow.add_edge(START, "fetch_sf")
    workflow.add_edge("fetch_sf", "truncate")
    workflow.add_edge("truncate", "compress_rag")
    workflow.add_conditional_edges(
        "compress_rag",
        check_context_budget,
        {
            "proceed_to_llm": "generate",
            "fallback_summarize": "summarize",
            "abort_with_warning": "generate"
        }
    )
    workflow.add_edge("summarize", "compress_rag")  # Re-check budget after summary
    workflow.add_edge("generate", END)
    
    return workflow.compile()

Enterprise Hardening: Production Considerations

ConcernImplementation Pattern
PII/ComplianceAdd a pii_redaction node before truncation using regex + NER models. Log only anonymized metadata.
ObservabilityEmit OpenTelemetry spans per node. Track truncation_metadata in LangSmith for audit trails.
Rate LimitsWrap SF calls with exponential backoff. Use salesforce.BulkAPI for >10K records.
Cost ControlCache embeddings per account. Use gpt-4o-mini for compression, gpt-4o for final reasoning.
DeterminismSet temperature=0 in compression nodes. Use structured output schemas for truncation decisions.
Fallback ResilienceImplement circuit breakers. If SF API fails, return cached summary + explicit warning in response.

Monitoring Truncation Impact

# Post-graph analysis
def audit_truncation_quality(state: SalesAgentState):
    print(f"Records dropped: {state['truncation_metadata']['total_records'] - state['truncation_metadata']['kept_records']}")
    print(f"Compression ratio: {len(state['compressed_context']) / max(len(state['raw_payload']), 1):.2f}")
    # Integrate with human-in-the-loop feedback for continuous tuning

Why This Works for Enterprise RAG

  1. Budget-Aware by Design: Token counting is explicit, not implicit. Nodes fail gracefully instead of silently truncating.

  2. Semantic Preservation: Field pruning + RAG compression keeps query-relevant context while discarding noise.

  3. Deterministic Routing: LangGraph’s conditional edges ensure fallbacks are auditable and bounded.

  4. Scalable: Send parallelism can process multiple account chunks concurrently. Vectorstores cache embeddings across sessions.

  5. Compliance-Ready: Full metadata tracking, PII redaction hooks, and structured logging meet SOC 2 / GDPR requirements.

Conclusion

API payload truncation isn’t a limitation to hack around—it’s a design surface for enterprise RAG reliability. By combining LangGraph’s stateful orchestration with adaptive field pruning, semantic chunking, and contextual compression, you can safely integrate Salesforce (or any high-volume API) into production RAG pipelines without sacrificing accuracy, context window limits, or compliance.