AI Automation & Agents  

Orchestrating GenAI Agents at Scale: Batch Processing and Stateful Entities for Enterprise Knowledge Synthesis

Table of Contents

  • Introduction

  • How Do You Handle Batch Processing in Event Hub or Queue Triggered Functions?

  • What Are Durable Entities in Durable Functions Used For?

  • Conclusion

Introduction

At NexusMind, we’ve deployed a fleet of autonomous GenAI agents that continuously synthesize enterprise knowledge from SharePoint, Confluence, Salesforce, and ServiceNow. These agents don’t just answer questions—they proactively detect knowledge gaps, reconcile conflicting documentation, and generate executive briefings from fragmented data sources.

But processing millions of documents in real time demands more than just LLMs. It requires intelligent batching to avoid throttling APIs and stateful coordination to track agent progress across days. As the lead cloud architect, I engineered a hybrid serverless system using Azure Functions, Event Hubs, and Durable Entities to power this cognitive backbone.

This article dives into two critical patterns—through the lens of live GenAI agent orchestration—with production-grade code.

System Design

PlantUML Diagram

How Do You Handle Batch Processing in Event Hub or Queue Triggered Functions?

GenAI agents must process thousands of document updates per minute without overwhelming SharePoint’s API (which throttles at 10K requests/hour per user). The solution: batch-aware triggers.

Event Hub Trigger with Batch Processing

We ingest document change events from Microsoft Graph via Event Hubs. Azure Functions natively batch-process these:

import azure.functions as func
import json
from typing import List

def main(events: List[func.EventHubEvent]) -> None:
    """
    Processes a batch of document change events.
    Each batch = 100–1000 events (configurable in host.json).
    """
    documents_to_index = []
    
    for event in events:
        change = json.loads(event.get_body().decode('utf-8'))
        if change['eventType'] == 'updated':
            documents_to_index.append({
                'id': change['resourceId'],
                'source': change['source'],
                'lastModified': change['lastModified']
            })
    
    # Single RAG pipeline call for the entire batch
    if documents_to_index:
        update_vector_store_in_batch(documents_to_index)

Critical host.json Configuration

{
  "version": "2.0",
  "extensions": {
    "eventHubs": {
      "batchCheckpointFrequency": 5,
      "maxBatchSize": 1000,
      "prefetchCount": 1000,
      "enableReceiverRuntimeMetric": true
    }
  },
  "functionTimeout": "00:10:00"
}

Why Batching Matters:

  • Reduces LLM/vector DB calls by 90%

  • Respects SharePoint throttling limits

  • Minimizes cold-start overhead per document

Queue Trigger Alternative (for Retry Control)

For high-value documents (e.g., HR policies), we use Service Bus sessions to ensure ordered, retriable batches:

def main(msg: func.ServiceBusMessage) -> None:
    batch = json.loads(msg.get_body().decode('utf-8'))
    
    try:
        # Process entire batch as a unit
        rag_pipeline.ingest_batch(batch['documents'])
        
        # Emit success event
        emit_to_event_grid({
            'type': 'KnowledgeBatchProcessed',
            'batchId': batch['id'],
            'count': len(batch['documents'])
        })
    except RateLimitError:
        # Requeue with delay for retry
        raise func.RetryMessageException(delay=60)

What Are Durable Entities in Durable Functions Used For?

GenAI agents need long-lived state. Example: An agent monitoring a product launch must:

  • Track 200+ Confluence pages

  • Detect when all required docs are updated

  • Generate a compliance report only when complete

Durable Entities provide lightweight, scalable state management:

Entity Definition: DocumentSetTracker

import azure.durable_functions as df

def entity_function(context: df.DurableEntityContext):
    """Tracks status of a set of enterprise documents."""
    state = context.get_state(lambda: {
        'total_docs': 0,
        'processed_docs': set(),
        'owner': None,
        'deadline': None
    })

    op = context.operation_name
    if op == "add_document":
        doc_id = context.get_input()
        state['processed_docs'].add(doc_id)
        context.set_state(state)
        
        # Auto-trigger report when complete
        if len(state['processed_docs']) >= state['total_docs']:
            context.signal_entity(
                entity_id=df.EntityId("ReportGenerator", state['owner']),
                operation="generate_report",
                input_=state
            )
            
    elif op == "initialize":
        config = context.get_input()
        state.update({
            'total_docs': config['doc_count'],
            'owner': config['owner'],
            'deadline': config['deadline']
        })
        context.set_state(state)

main = df.Entity.create(entity_function)

Orchestrator: Launch Agent Workflow

def orchestrator_function(context: df.DurabilityOrchestrationContext):
    """Launches a GenAI agent for a new product launch."""
    launch_config = context.get_input()
    
    # Create a unique entity per launch
    entity_id = df.EntityId("DocumentSetTracker", launch_config['launch_id'])
    
    # Initialize tracker
    yield context.call_entity(entity_id, "initialize", launch_config)
    
    # Ingest all documents (triggers batch processing)
    yield context.call_activity("QueueDocumentBatch", launch_config['doc_list'])
    
    # Wait for completion (entity auto-signals when done)
    report = yield context.wait_for_external_event("ReportReady")
    return report

main = df.Orchestrator.create(orchestrator_function)

Activity Function: Queue for Batch Processing

def main(doc_list: List[str]) -> None:
    """Sends docs to Service Bus for batch RAG ingestion."""
    batch_id = str(uuid.uuid4())
    batch = {
        'id': batch_id,
        'documents': doc_list,
        'agent_id': os.environ['AGENT_ID']
    }
    
    # Send to session-enabled queue (session = batch_id)
    service_bus_client = ServiceBusClient.from_connection_string(
        conn_str=os.environ['SERVICE_BUS_CONN']
    )
    with service_bus_client:
        sender = service_bus_client.get_queue_sender("rag-ingest-queue")
        message = ServiceBusMessage(
            body=json.dumps(batch),
            session_id=batch_id  # Ensures ordered processing
        )
        sender.send_messages(message)

Why Durable Entities?

  • State isolation: Each product launch has its own entity

  • Event-driven completion: No polling needed

  • Cost efficiency: Entities cost pennies/month vs. VMs or databases

1

2

3

Conclusion

In GenAI-powered enterprises, batching prevents chaos, and entities enable intelligence.

  • Use Event Hub/Queue batch triggers to respect API limits and optimize LLM costs

  • Leverage Durable Entities for lightweight, long-lived agent state

  • Always design for idempotency—GenAI pipelines will retry

At NexusMind, this architecture processes 4.2M documents daily with 99.98% success rate, enabling agents to synthesize knowledge faster than human teams.