The AI pipeline is not
optional
Most AI systems don't fail because of a bad model. They fail because there's no structure around it. Here's how to build one that holds in production.
I've seen production systems where the model was excellent — GPT-4, Claude, Gemini — and the surrounding code was a mess of hardcoded prompts, zero validation, and no separation of concerns. The model couldn't save them.
The fix is never a better model. It's a better pipeline.
What is a pipeline, exactly?
A pipeline is the ordered sequence of processing stages your system runs through before and after the LLM call. Every input passes through it. Every output exits through it.
Think of it as the connective tissue between user intent and model response. The model does one thing well. The pipeline handles everything the model shouldn't have to handle: input sanity, security, context assembly, cost routing, output safety.
Two flavors exist:
Workflow pipelines run linearly — stage 1 → 2 → 3 → LLM → done. Predictable, debuggable, easy to test. Right for 80% of production use cases.
Agent pipelines let the LLM decide the enrichment order. More flexible, higher cost, harder to reason about. Use only when the task genuinely requires dynamic orchestration.
The design principles
- Block early, spend late. Cheap checks go first. The LLM call is the most expensive operation — it only runs after everything else has cleared.
- Fail fast. A blocked context stops the pipeline immediately. No downstream stage runs on a blocked request.
- Single responsibility. Each stage does one thing. Validation doesn't touch security. Security doesn't touch RAG.
- Configurable. Any stage can be disabled for testing or bypassed by environment flag. New stages can be inserted without touching existing ones.
- Observable. The context is serializable. Every stage snapshots it. Every failure has a named reason.
The context object
Every stage communicates through a shared context object. The rules are simple: each stage reads what previous stages added and appends its own data. Nothing is deleted. The context only grows.
This makes the pipeline fully auditable — you can inspect exactly what each stage contributed. And because it's serializable, you can snapshot it after every stage for debugging, crash recovery, and observability.
from dataclasses import dataclass, field
from typing import Any
import json
@dataclass
class PipelineContext:
"""
Shared state passed through every stage.
Each stage reads previous enrichments and adds its own.
Never deletes. Always serializable.
"""
raw_input: str
user_id: str
stages_passed: list[str] = field(default_factory=list)
enrichments: dict[str, Any] = field(default_factory=dict)
final_prompt: str | None = None
response: str | None = None
blocked: bool = False
block_reason: str | None = None
def add(self, key: str, value: Any):
self.enrichments[key] = value
self.stages_passed.append(key)
def block(self, reason: str):
self.blocked = True
self.block_reason = reason
def serialize(self) -> str:
# Snapshot at any stage — for logging, retries, crash recovery
return json.dumps(self.__dict__, default=str)
Stage by stage
Token bucket or sliding window per user. Catches burst abuse before any parsing happens. This is the absolute cheapest gate — a single counter lookup.
~freeRegex, length checks, null bytes, encoding checks. No ML. Rejects garbage before it wastes a single downstream cycle. Saves more cost than any other stage.
~freeCompiled regex for known injection patterns. Optional embedding-based classifier for borderline cases. Goal: raise the cost of attack, not achieve perfection.
low costClassify intent, expand sparse queries, normalize language. Users write "fix my code" — your system needs to know it's a debugging task in Python before retrieval.
low costRAG retrieval, tool manifest selection, user profile loading. Filter retrieved chunks by relevance score — don't dump noise into the context window.
medium costLoad chat history. Apply sliding window, summarization, or compaction to stay within the token budget. This is separate from preparation — data loading vs. decision making.
medium costAssemble the final prompt from all enrichments. Route to the right model — simple tasks go to fast models. Decide whether the task needs extended thinking.
medium costThe LLM call. Finally. Streaming, tool use, reasoning — all here. Also includes output security: scan for policy violations before the response leaves this stage.
high costStage 9 — output formatting — deserves its own note. PII redaction, response contract enforcement (JSON schema, markdown stripping for voice channels, truncation for UI constraints) are genuinely distinct from generation. Keeping them as a separate, swappable stage means you can change output format per channel without touching the execution layer.
Wiring it together
The pipeline runner is intentionally minimal. It loops through stages, skips disabled ones, stops on a blocked context, and snapshots after each stage.
class AIPipeline:
def __init__(self, stages: list, enabled: set[str] | None = None):
self.stages = stages
self.enabled = enabled # None = all stages active
def run(self, raw_input: str, user_id: str) -> PipelineContext:
ctx = PipelineContext(raw_input=raw_input, user_id=user_id)
for stage in self.stages:
name = stage.__class__.__name__
if self.enabled and name not in self.enabled:
continue # configurable: skip disabled stages
if ctx.blocked:
break # fail fast: don't process blocked context
ctx = stage.run(ctx)
self._snapshot(ctx, stage_name=name) # observe every step
return ctx
# Usage — framework and LLM agnostic
pipeline = AIPipeline(stages=[
RateLimitStage(limit=100, window_seconds=60),
ValidationStage(ValidationConfig(max_length=2000)),
SecurityStage(),
QueryShapingStage(),
IntelligenceStage(vector_store=vs, user_store=us),
ContextHydrationStage(history_store=hs, max_tokens=2000),
PreparationStage(system_prompt=SYSTEM_PROMPT),
ExecutionStage(llm_client=llm),
OutputFormattingStage(),
])
result = pipeline.run(raw_input="How do I fix this?", user_id="user_42")
if result.blocked:
print(f"Blocked: {result.block_reason}")
else:
print(result.response)
Context rules
What model routing actually saves you
Most teams default their strongest model for every request. That's expensive and unnecessary. A simple clarification question doesn't need the same model as a multi-step debugging task.
A primitive routing rule in the preparation layer — based on intent classification and query complexity from earlier stages — can reduce LLM costs by 40–60% with no quality regression on the requests that matter. The data to make that decision is already in the context by stage 7. You just have to use it.
Most AI reliability problems are pipeline problems. The model is the last mile. Everything before it determines whether that last mile goes well.
Build the pipeline first. Then worry about the model.
Building something like this?
Hit a pattern that worked better?