Skip to content

Production Patterns

Battle-tested patterns for deploying anchor in production: error handling, observability, performance tuning, and testing strategies.


Error Handling and Resilience

Production pipelines must handle failures gracefully. Use on_error policies and callbacks to keep your system running even when individual steps fail.

Step-Level Error Policies

from anchor import ContextPipeline, retriever_step, filter_step

pipeline = (
    ContextPipeline(max_tokens=8192)
    # If retrieval fails, skip this step and continue with other sources
    .add_step(
        retriever_step("primary-search", primary_retriever, top_k=10),
        on_error="skip",
    )
    # Fallback retriever in case the primary is down
    .add_step(
        retriever_step("fallback-search", fallback_retriever, top_k=5),
        on_error="skip",
    )
    # If filtering fails, raise immediately -- data quality matters
    .add_step(
        filter_step("quality-gate", lambda item: item.score > 0.3),
        on_error="raise",
    )
)

Error Policy Options

  • on_error="raise" (default) -- stop the pipeline and propagate the exception
  • on_error="skip" -- log the error and continue with items from previous steps
  • on_error="empty" -- log the error and continue with an empty item list

Error Monitoring with Callbacks

import logging

from anchor import ContextPipeline, TracingCallback

logger = logging.getLogger("anchor")

class ErrorAlertCallback(TracingCallback):
    """Send alerts when pipeline steps fail."""

    def on_step_error(self, step_name: str, error: Exception) -> None:
        logger.error(f"Step '{step_name}' failed: {error}")
        # In production: send to PagerDuty, Slack, etc.
        send_alert(
            severity="warning",
            message=f"Pipeline step '{step_name}' failed: {error}",
        )

    def on_build_complete(self, result) -> None:
        if result.diagnostics.get("steps_skipped", 0) > 0:
            logger.warning(
                f"Pipeline completed with "
                f"{result.diagnostics['steps_skipped']} skipped steps"
            )

pipeline = (
    ContextPipeline(max_tokens=8192)
    .with_callback(ErrorAlertCallback())
    # ... steps ...
)

Callback Safety

Callback errors are swallowed and logged at WARNING level. A failing callback never breaks the pipeline.


Token Budget Tuning

Choosing the right token budget is critical for balancing context quality and cost. Start with a preset and tune from there.

Built-in Presets

from anchor import ContextPipeline, TokenBudget

# Presets for common model context windows
pipeline_fast = ContextPipeline(max_tokens=TokenBudget.SMALL)    # 4,096 tokens
pipeline_std  = ContextPipeline(max_tokens=TokenBudget.MEDIUM)   # 16,384 tokens
pipeline_lg   = ContextPipeline(max_tokens=TokenBudget.LARGE)    # 32,768 tokens
pipeline_xl   = ContextPipeline(max_tokens=TokenBudget.XL)       # 65,536 tokens

Custom Budget Allocation

For fine-grained control, allocate tokens across sources:

from anchor import ContextPipeline, TokenBudgetConfig

budget = TokenBudgetConfig(
    total=16384,
    system_prompt=1024,       # reserve for system prompt
    memory_conversation=4096, # conversation history
    memory_facts=512,         # persistent facts
    retrieval=8192,           # retrieved documents
    overflow_policy="truncate_lowest_priority",
)

pipeline = ContextPipeline(budget=budget)

Overflow Policies

  • "truncate_lowest_priority" -- drop lowest-priority items first (default)
  • "truncate_oldest" -- drop oldest items first
  • "error" -- raise TokenBudgetExceeded if total is exceeded

Monitoring Token Usage

result = pipeline.build(query)
diag = result.diagnostics

print(f"Tokens used:     {diag['tokens_used']}")
print(f"Tokens budget:   {diag['tokens_budget']}")
print(f"Utilization:     {diag['token_utilization']:.1%}")
print(f"Items included:  {diag['items_included']}")
print(f"Items overflow:  {diag['items_overflow']}")

# Alert if utilization is consistently low (wasting budget)
if diag["token_utilization"] < 0.3:
    logger.info("Token utilization below 30% -- consider reducing budget")

Memory Management at Scale

Eviction Strategies

from anchor import (
    MemoryManager,
    SlidingWindowMemory,
    ImportanceEviction,
    SummaryEviction,
    InMemoryEntryStore,
)

# FIFO eviction (default) -- simplest, good for most cases
fifo_memory = SlidingWindowMemory(max_tokens=4096)

# Importance-based eviction -- keeps high-value turns longer
importance_memory = SlidingWindowMemory(
    max_tokens=4096,
    eviction_policy=ImportanceEviction(
        score_fn=lambda turn: 1.0 if "action" in turn.content.lower() else 0.5,
    ),
)

# Summary eviction -- summarizes evicted turns into a condensed fact
summary_memory = SlidingWindowMemory(
    max_tokens=4096,
    eviction_policy=SummaryEviction(
        summarize_fn=my_summarize_function,
        summary_max_tokens=256,
    ),
)

Consolidation and Garbage Collection

memory = MemoryManager(
    conversation_tokens=4096,
    persistent_store=InMemoryEntryStore(),
)

# Periodically consolidate duplicate/overlapping facts
consolidated = memory.consolidate_facts(
    similarity_threshold=0.85,
    merge_strategy="keep_newest",
)
print(f"Consolidated {consolidated} duplicate facts")

# Remove stale facts older than 30 days
removed = memory.gc_facts(max_age_days=30)
print(f"Garbage collected {removed} stale facts")

Consolidation in Production

Run consolidate_facts() on a schedule (e.g., daily cron job) rather than on every request. Consolidation requires comparing all fact pairs, which is O(n^2) in the number of facts.


Hybrid Retrieval Optimization

Weight Tuning

from anchor import HybridRetriever, DenseRetriever, SparseRetriever

# Start with 70/30 dense/sparse and tune based on benchmarks
hybrid = HybridRetriever(
    retrievers=[dense_retriever, sparse_retriever],
    weights=[0.7, 0.3],
    rrf_k=60,
)

# Grid search over weights using retrieval metrics
best_score = 0
best_weights = [0.5, 0.5]

for dense_weight in [0.5, 0.6, 0.7, 0.8, 0.9]:
    sparse_weight = 1.0 - dense_weight
    hybrid = HybridRetriever(
        retrievers=[dense_retriever, sparse_retriever],
        weights=[dense_weight, sparse_weight],
        rrf_k=60,
    )

    # Benchmark using your dataset (see the Evaluation Workflow cookbook)
    score = run_benchmark(hybrid, benchmark_dataset)
    if score > best_score:
        best_score = score
        best_weights = [dense_weight, sparse_weight]

print(f"Best weights: dense={best_weights[0]}, sparse={best_weights[1]}")

Reranker Selection

from anchor import CrossEncoderReranker, reranker_step

# Word-overlap scorer (fast, no API needed)
def overlap_scorer(query: str, doc: str) -> float:
    q_words = set(query.lower().split())
    d_words = set(doc.lower().split())
    return len(q_words & d_words) / max(len(q_words), 1)

fast_reranker = CrossEncoderReranker(score_fn=overlap_scorer, top_k=10)

# Cross-encoder scorer (slower, more accurate, requires a model)
# from sentence_transformers import CrossEncoder
# model = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
# def cross_encoder_scorer(query: str, doc: str) -> float:
#     return model.predict([(query, doc)])[0]
# accurate_reranker = CrossEncoderReranker(
#     score_fn=cross_encoder_scorer, top_k=10
# )

Reranker Trade-offs

  • Word-overlap: fast (< 1ms per doc), no dependencies, moderate quality
  • Cross-encoder: slow (50-100ms per doc), high quality, needs GPU for scale
  • Compromise: retrieve top 50 with dense, rerank top 10 with cross-encoder

Observability Setup

TracingCallback

import time
from anchor import ContextPipeline, TracingCallback

class ProductionTracer(TracingCallback):
    """Full lifecycle tracing for production monitoring."""

    def on_build_start(self, query) -> None:
        self.start_time = time.time()
        logger.info(f"Pipeline build started: {query.query_str[:50]}")

    def on_step_start(self, step_name: str) -> None:
        logger.debug(f"Step '{step_name}' started")

    def on_step_complete(self, step_name: str, items_count: int) -> None:
        logger.debug(f"Step '{step_name}' completed: {items_count} items")

    def on_step_error(self, step_name: str, error: Exception) -> None:
        logger.error(f"Step '{step_name}' failed: {error}")

    def on_build_complete(self, result) -> None:
        elapsed = time.time() - self.start_time
        logger.info(
            f"Pipeline build complete: "
            f"{result.diagnostics['items_included']} items, "
            f"{elapsed:.2f}s"
        )
        # Emit metrics to your monitoring system
        metrics.histogram("pipeline.build_time_ms", elapsed * 1000)
        metrics.gauge(
            "pipeline.token_utilization",
            result.diagnostics["token_utilization"],
        )

CostTracker

from anchor import CostTracker

tracker = CostTracker()

pipeline = (
    ContextPipeline(max_tokens=8192)
    .with_callback(tracker)
    # ... steps ...
)

# After building context
result = pipeline.build(query)

# Inspect costs
print(f"Total tokens: {tracker.total_tokens}")
print(f"Estimated cost: ${tracker.estimated_cost:.4f}")
print(f"Per-step breakdown: {tracker.step_costs}")

OpenTelemetry (OTLP) Export

from anchor import ContextPipeline
from anchor.observability import OTLPExporter

# Export traces to your OTLP-compatible backend
exporter = OTLPExporter(
    endpoint="http://localhost:4317",
    service_name="my-rag-service",
    headers={"Authorization": "Bearer <token>"},
)

pipeline = (
    ContextPipeline(max_tokens=8192)
    .with_callback(exporter)
    # ... steps ...
)

Structured Logging

Combine TracingCallback with structured logging (e.g., structlog) to get machine-parseable logs with correlation IDs that link pipeline builds to upstream HTTP requests.

OTLP Dependencies

OTLP export requires pip install astro-anchor[otlp].


Testing Context Pipelines

Unit Testing Individual Steps

import pytest
from anchor import ContextItem, QueryBundle, SourceType

def test_quality_filter():
    """Test that the quality filter removes low-score items."""
    items = [
        ContextItem(content="Good result", source=SourceType.RETRIEVAL, score=0.9),
        ContextItem(content="Bad result", source=SourceType.RETRIEVAL, score=0.05),
        ContextItem(content="OK result", source=SourceType.RETRIEVAL, score=0.4),
    ]

    quality_filter = lambda item: item.score > 0.3
    filtered = [item for item in items if quality_filter(item)]

    assert len(filtered) == 2
    assert all(item.score > 0.3 for item in filtered)


def test_custom_retriever_returns_context_items():
    """Test that a custom retriever returns properly typed results."""
    retriever = MyCustomRetriever()
    retriever.index(sample_items)

    query = QueryBundle(query_str="test query")
    results = retriever.retrieve(query, top_k=5)

    assert isinstance(results, list)
    assert all(isinstance(item, ContextItem) for item in results)
    assert len(results) <= 5

Integration Testing the Full Pipeline

def test_full_pipeline_build():
    """Test end-to-end pipeline build with all components."""
    # Setup
    retriever = create_test_retriever(sample_docs)
    memory = create_test_memory()
    memory.add_user_message("Hello")
    memory.add_assistant_message("Hi there!")

    pipeline = (
        ContextPipeline(max_tokens=4096)
        .add_step(retriever_step("search", retriever, top_k=5))
        .with_memory(memory)
        .with_formatter(GenericTextFormatter())
        .add_system_prompt("You are a test assistant.")
    )

    # Act
    result = pipeline.build("What is context engineering?")

    # Assert
    assert result.formatted_output is not None
    assert result.diagnostics["items_included"] > 0
    assert result.diagnostics["token_utilization"] > 0
    assert result.diagnostics["token_utilization"] <= 1.0

    # Verify sources are present
    sources = {item.source for item in result.window.items}
    assert SourceType.SYSTEM in sources
    assert SourceType.RETRIEVAL in sources
    assert SourceType.MEMORY in sources


def test_pipeline_handles_empty_retrieval():
    """Test that the pipeline handles no results gracefully."""
    empty_retriever = create_test_retriever([])  # no docs

    pipeline = (
        ContextPipeline(max_tokens=4096)
        .add_step(retriever_step("search", empty_retriever, top_k=5))
        .add_system_prompt("You are a test assistant.")
    )

    result = pipeline.build("query with no results")

    # Should still build successfully with just the system prompt
    assert result.formatted_output is not None
    assert result.diagnostics["items_included"] >= 1  # at least system prompt

Deterministic Tests

Use deterministic embedding functions (like the embed_fn in the examples) for tests. Real embedding models produce slightly different vectors across runs, which makes assertions flaky.


Performance Tips

Async Pipelines

import asyncio
from anchor import ContextPipeline, async_retriever_step

# Use async steps for I/O-bound operations (API calls, database queries)
pipeline = (
    ContextPipeline(max_tokens=8192)
    .add_step(async_retriever_step("vector-db", async_retriever, top_k=10))
    .add_step(async_retriever_step("graph-db", async_graph_retriever, top_k=5))
)

# abuild() runs async steps concurrently when possible
result = await pipeline.abuild(query)

Caching Retrieved Results

from functools import lru_cache
from anchor import ContextItem, QueryBundle

class CachedRetriever:
    """Wraps a retriever with an LRU cache for repeated queries."""

    def __init__(self, inner_retriever, cache_size: int = 256):
        self._inner = inner_retriever
        self._cache_size = cache_size

        @lru_cache(maxsize=cache_size)
        def _cached_retrieve(query_str: str, top_k: int):
            q = QueryBundle(query_str=query_str)
            return tuple(self._inner.retrieve(q, top_k=top_k))

        self._cached_retrieve = _cached_retrieve

    def retrieve(self, query: QueryBundle, top_k: int = 10) -> list[ContextItem]:
        return list(self._cached_retrieve(query.query_str, top_k))

Lazy Loading Heavy Dependencies

from anchor import ContextPipeline

class LazyEmbeddingRetriever:
    """Load the embedding model only on first use."""

    def __init__(self, model_name: str):
        self._model_name = model_name
        self._model = None
        self._retriever = None

    def _ensure_loaded(self):
        if self._model is None:
            # Heavy import and model load happens only once
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer(self._model_name)
            self._retriever = DenseRetriever(
                vector_store=InMemoryVectorStore(),
                context_store=InMemoryContextStore(),
                embed_fn=lambda text: self._model.encode(text).tolist(),
            )

    def retrieve(self, query, top_k=10):
        self._ensure_loaded()
        return self._retriever.retrieve(query, top_k=top_k)

Profile Before Optimizing

Use result.diagnostics["steps"] to identify which pipeline steps are slowest before adding complexity like caching or async. The step timing breakdown is included in every BuildResult.

Connection Pooling

When using external vector databases (Pinecone, Weaviate, Qdrant), reuse client connections across requests. Create the client once at startup and pass it to your retriever, rather than creating a new connection per query.