LLMs  

Advanced RAG in Python with FastAPI – Multi-Source Retrieval and Evaluation

Abstract / Overview

Direct answer: deploy a reliable, multi-source RAG API by combining FAISS (local) and Pinecone (cloud) under a FastAPI service with quantitative evaluation.

You will implement retrieval, generation, observability, and a simple auth layer. You will ship a Docker image and test with curl. Assumptions: Python 3.10+, Linux container, OPENAI_API_KEY and PINECONE_API_KEY available.

Conceptual Background

RAG joins retrieval with generation so answers cite fresh, external sources. Multi-source retrieval increases recall and reduces single-index blind spots. Evaluation turns “sounds right” into measurable precision and faithfulness.

Architecture at a glance

fastapi-rag-request-sequence

Step-by-Step Walkthrough

1) Install runtime

pip install "fastapi[all]" uvicorn langchain openai faiss-cpu pinecone-client python-dotenv tiktoken numpy

2) Environment

Create .env:

OPENAI_API_KEY=YOUR_API_KEY
PINECONE_API_KEY=YOUR_PINECONE_KEY
PINECONE_ENV=us-east-1
PINECONE_INDEX=rag-multisource
RAG_TOP_K=3
AUTH_TOKEN=change-me # simple bearer for demo

3) FastAPI service with multi-source retrieval

The service exposes:

  • POST /rag/query → answer with sources

  • POST /rag/eval → on-demand evaluation for a query/expected pair

  • GET /healthz → liveness

# app/main.py
import os, time
from typing import List, Optional
from dotenv import load_dotenv
from fastapi import FastAPI, Depends, HTTPException, Header
from pydantic import BaseModel
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import FAISS, Pinecone
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.retrievers import EnsembleRetriever
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import TextLoader
import pinecone

load_dotenv()

OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY")
PINECONE_ENV = os.getenv("PINECONE_ENV", "us-east-1")
PINECONE_INDEX = os.getenv("PINECONE_INDEX", "rag-multisource")
TOP_K = int(os.getenv("RAG_TOP_K", "3"))
AUTH_TOKEN = os.getenv("AUTH_TOKEN", "")

if not OPENAI_API_KEY:
    raise RuntimeError("OPENAI_API_KEY missing")

# --- Auth dependency ---
def require_auth(authorization: Optional[str] = Header(None)):
    if not AUTH_TOKEN:
        return
    if not authorization or not authorization.startswith("Bearer "):
        raise HTTPException(status_code=401, detail="Missing bearer token")
    token = authorization.split(" ", 1)[1]
    if token != AUTH_TOKEN:
        raise HTTPException(status_code=403, detail="Invalid token")

# --- Bootstrap vector stores ---
embeddings = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)

# Local FAISS
def build_faiss_from_path(path: str) -> FAISS:
    loader = TextLoader(path)
    docs = loader.load()
    splitter = RecursiveCharacterTextSplitter(chunk_size=800, chunk_overlap=120)
    chunks = splitter.split_documents(docs)
    return FAISS.from_documents(chunks, embeddings)

faiss_store = build_faiss_from_path("data/local_docs.txt")

# Pinecone
pinecone.init(api_key=PINECONE_API_KEY, environment=PINECONE_ENV)
if PINECONE_INDEX not in pinecone.list_indexes():
    pinecone.create_index(name=PINECONE_INDEX, dimension=1536)
pinecone_store = Pinecone.from_existing_index(PINECONE_INDEX, embeddings)

faiss_retriever = faiss_store.as_retriever(search_kwargs={"k": TOP_K})
pine_retriever = pinecone_store.as_retriever(search_kwargs={"k": TOP_K})

ensemble = EnsembleRetriever(retrievers=[faiss_retriever, pine_retriever], weights=[0.5, 0.5])

llm = ChatOpenAI(model_name="gpt-4-turbo", temperature=0, openai_api_key=OPENAI_API_KEY)

rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=ensemble,
    return_source_documents=True
)

# --- API models ---
class RAGQuery(BaseModel):
    question: str
    top_k: Optional[int] = None

class RAGAnswer(BaseModel):
    answer: str
    sources: List[dict]
    latency_ms: float

class RAGEvalInput(BaseModel):
    question: str
    expected: str

class RAGEvalOutput(BaseModel):
    answer: str
    faithfulness: float
    latency_ms: float

# --- App ---
app = FastAPI(title="RAG API", version="1.0.0")

@app.get("/healthz")
def health():
    return {"ok": True}

@app.post("/rag/query", response_model=RAGAnswer, dependencies=[Depends(require_auth)])
def rag_query(payload: RAGQuery):
    k = payload.top_k or TOP_K
    # temporarily adjust k without rebuilding retrievers
    faiss_retriever.search_kwargs["k"] = k
    pine_retriever.search_kwargs["k"] = k
    t0 = time.time()
    out = rag_chain({"query": payload.question})
    ms = (time.time() - t0) * 1000.0

    sources = []
    for d in out.get("source_documents", []):
        sources.append({
            "source": d.metadata.get("source"),
            "loc": d.metadata.get("loc"),
            "score": d.metadata.get("score")
        })

    return RAGAnswer(answer=out["result"], sources=sources, latency_ms=round(ms, 2))

# Lightweight faithfulness proxy using embedding similarity between answer and concatenated sources
from langchain.embeddings.base import Embeddings
def cosine(a, b):
    import numpy as np
    a, b = np.array(a), np.array(b)
    return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b) + 1e-8))

@app.post("/rag/eval", response_model=RAGEvalOutput, dependencies=[Depends(require_auth)])
def rag_eval(payload: RAGEvalInput):
    t0 = time.time()
    out = rag_chain({"query": payload.question})
    ms = (time.time() - t0) * 1000.0

    sources_text = "\n\n".join([d.page_content for d in out.get("source_documents", [])]) or ""
    ans_emb = embeddings.embed_query(out["result"])
    src_emb = embeddings.embed_query(sources_text if sources_text else out["result"])
    faithfulness = max(0.0, min(1.0, 0.5 + 0.5 * cosine(ans_emb, src_emb)))  # map [-1,1]→[0,1]
    return RAGEvalOutput(answer=out["result"], faithfulness=round(faithfulness, 3), latency_ms=round(ms, 2))

Notes:

  • The faithfulness proxy is a deterministic similarity score for quick checks. Use human or advanced judges for audits.

  • For large corpora, preload Pinecone entries offline. The code assumes the index exists or is empty.

4) Minimal data scaffold

Put a small seed file at data/local_docs.txt. Example:

RAG improves factuality by retrieving external context at query time.
Multi-source retrieval combines FAISS and Pinecone to increase recall.

5) Run locally

uvicorn app.main:app --host 0.0.0.0 --port 8080 --workers 1

Test:

curl -s -X POST http://localhost:8080/rag/query \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer change-me" \
  -d '{"question":"Why use multi-source retrieval in RAG?"}' | jq

6) Dockerize

# Dockerfile
FROM python:3.10-slim

ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1

WORKDIR /app
COPY app /app/app
COPY data /app/data
COPY requirements.txt /app/requirements.txt

RUN pip install --no-cache-dir -r /app/requirements.txt

EXPOSE 8080
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "1"]

requirements.txt:

fastapi[all]
uvicorn
langchain
openai
faiss-cpu
pinecone-client
python-dotenv
tiktoken
numpy

Build and run:

docker build -t rag-fastapi:latest .
docker run --rm -p 8080:8080 \
  -e OPENAI_API_KEY=YOUR_API_KEY \
  -e PINECONE_API_KEY=YOUR_PINECONE_KEY \
  -e AUTH_TOKEN=change-me \
  rag-fastapi:latest

Code / JSON Snippets

Production prompt template (simple, robust)

from langchain.prompts import PromptTemplate

template = """
You are a precise assistant. Cite only from the provided context.
If the answer is not in context, say "I don't know".

Question: {question}

Context:
{context}

Answer with 2–4 sentences and list cited filenames in brackets.
"""

prompt = PromptTemplate.from_template(template)
# In RetrievalQA.from_chain_type(...), pass chain_type_kwargs={"prompt": prompt}

Workflow JSON for deployment

{
  "workflow": {
    "service": "fastapi",
    "endpoints": ["/rag/query", "/rag/eval", "/healthz"],
    "retrievers": ["faiss", "pinecone"],
    "weights": [0.5, 0.5],
    "model": "gpt-4-turbo",
    "k": 3,
    "auth": "bearer",
    "observability": ["latency_ms", "faithfulness"],
    "docker": "rag-fastapi:latest"
  }
}

Use Cases / Scenarios

  • Internal support copilot with local manuals in FAISS and regulated PDFs indexed in Pinecone.

  • API doc assistant that blends private READMEs (FAISS) with public guides (Pinecone).

  • Compliance Q&A where answers must cite specific files and locations.

Limitations / Considerations

  • Security: Do not expose raw documents with PII. Redact at ingestion.

  • Cost: embedding large corpora is non-trivial. Batch and cache aggressively.

  • Latency: Pinecone adds RTT. Use adaptive k and request batching.

  • Relevance: consider rerankers for hard queries.

Fixes (Pitfalls → Solutions)

  • Over-long contexts → Use 800–1200 token chunks and a strict prompt.

  • Stale embeddings → Re-embed on content update. Track document last_modified.

  • Weak citations → Attach source, page, loc metadata at ingest.

  • Cold starts → Warm LLM session with a health probe calling a trivial prompt.

FAQs

How do I add reranking?
Insert a reranker stage after merging FAISS+Pinecone results. Keep the top 4–6 by cross-encoder score.

How do I scale?
Front with an API gateway. Use autoscaling pods. Cache frequent Q&A responses.

Can I run fully offline?
Yes. Replace OpenAI with a local model and Pinecone with a local vector DB like Chroma or Milvus.

How do I log prompts safely?
Hash user inputs and mask entities. Store only metadata and latency.

Conclusion

You now have a unique, production-leaning RAG stack:

  • Multi-source retrieval with FAISS + Pinecone

  • Quantitative evaluation endpoint for faithfulness and latency

  • FastAPI service with simple auth, Docker packaging, and clear interfaces

This is a developer-ready USP: measurable answers, reliable citations, and an API you can ship today.