Skip to content

Pipeline API Reference

API reference for the pipeline module -- the orchestration layer of anchor.

ContextPipeline

The main orchestrator that assembles context from multiple sources into a token-aware, priority-ranked context window.

from anchor import ContextPipeline

class ContextPipeline:
    def __init__(
        self,
        max_tokens: int = 8192,
        tokenizer: Tokenizer | None = None,
        budget: TokenBudget | None = None,
    ) -> None: ...

Parameters:

Parameter Type Default Description
max_tokens int 8192 Maximum token budget for the context window. Must be positive.
tokenizer Tokenizer \| None None Custom tokenizer. Falls back to the built-in TiktokenCounter.
budget TokenBudget \| None None Optional token budget for fine-grained per-source allocation.

Raises: ValueError if max_tokens <= 0.

Properties

Property Type Description
max_tokens int The maximum token budget for the context window.
formatter Formatter The current output formatter.
steps list[PipelineStep] A copy of the registered pipeline steps.
system_items list[ContextItem] A copy of the registered system items.
budget TokenBudget \| None The optional token budget.

Methods

add_step(step) -> ContextPipeline

Add a pipeline step. Returns self for chaining.

def add_step(self, step: PipelineStep) -> ContextPipeline: ...

with_memory(memory) -> ContextPipeline

Attach a memory provider. Any object satisfying the MemoryProvider protocol (i.e. having a get_context_items() -> list[ContextItem] method) is accepted.

def with_memory(self, memory: MemoryProvider) -> ContextPipeline: ...

with_budget(budget) -> ContextPipeline

Attach a TokenBudget for fine-grained allocation.

def with_budget(self, budget: TokenBudget) -> ContextPipeline: ...

with_formatter(formatter) -> ContextPipeline

Set the output formatter.

def with_formatter(self, formatter: Formatter) -> ContextPipeline: ...

add_system_prompt(content, priority=10) -> ContextPipeline

Add a system prompt as a high-priority context item with source=SourceType.SYSTEM.

def add_system_prompt(self, content: str, priority: int = 10) -> ContextPipeline: ...

add_callback(callback) -> ContextPipeline

Register an event callback for pipeline observability.

def add_callback(self, callback: PipelineCallback) -> ContextPipeline: ...

with_query_enricher(enricher) -> ContextPipeline

Attach a query enricher for memory-aware query expansion. The enricher is called after memory items are collected but before pipeline steps execute.

def with_query_enricher(self, enricher: ContextQueryEnricher) -> ContextPipeline: ...

step(fn=None, *, name=None, on_error="raise")

Decorator to register a synchronous function as a pipeline step. Usable with or without arguments: @pipeline.step or @pipeline.step(name="x", on_error="skip").

Raises: TypeError if the function is async (use async_step instead).

async_step(fn=None, *, name=None, on_error="raise")

Decorator to register an async function as a pipeline step. Same usage pattern as step.

Raises: TypeError if the function is not async.

build(query) -> ContextResult

Execute the full pipeline synchronously and return assembled context.

def build(self, query: str | QueryBundle) -> ContextResult: ...

Accepts either a plain string (auto-wrapped in QueryBundle) or a QueryBundle.

abuild(query) -> ContextResult

Execute the full pipeline asynchronously. Supports both sync and async steps: sync steps are called directly, async steps are awaited.

async def abuild(self, query: str | QueryBundle) -> ContextResult: ...

PipelineStep

A single composable step in the context pipeline. This is a dataclass.

from anchor import PipelineStep

@dataclass(slots=True)
class PipelineStep:
    name: str
    fn: SyncStepFn | AsyncStepFn
    is_async: bool = False
    on_error: Literal["raise", "skip"] = "raise"
    metadata: dict[str, Any] = field(default_factory=dict)

Fields:

Field Type Default Description
name str (required) Human-readable name for diagnostics.
fn StepFn (required) The callable implementing the step logic.
is_async bool False Whether fn is an async function.
on_error "raise" \| "skip" "raise" Error handling policy.
metadata dict[str, Any] {} Arbitrary metadata.

Methods

execute(items, query) -> list[ContextItem]

Execute the step synchronously. Raises TypeError if the step is async.

aexecute(items, query) -> list[ContextItem]

Execute the step asynchronously. Works for both sync and async step functions.


Factory Functions

retriever_step(name, retriever, top_k=10) -> PipelineStep

Create a step from a Retriever protocol implementation. Appends retrieved items to the current list.

from anchor import retriever_step
step = retriever_step("search", my_retriever, top_k=5)
Parameter Type Default Description
name str (required) Step name for diagnostics.
retriever Retriever (required) Object with retrieve(query, top_k) method.
top_k int 10 Maximum items to retrieve.

async_retriever_step(name, retriever, top_k=10) -> PipelineStep

Async variant. Wraps an AsyncRetriever (must have aretrieve(query, top_k)).

from anchor import async_retriever_step
step = async_retriever_step("async-search", my_async_retriever, top_k=5)
Parameter Type Default Description
name str (required) Step name for diagnostics.
retriever AsyncRetriever (required) Object with aretrieve(query, top_k) method.
top_k int 10 Maximum items to retrieve.

filter_step(name, predicate) -> PipelineStep

Create a step that filters items by a predicate function. Items where the predicate returns False are removed.

from anchor import filter_step
step = filter_step("score-gate", lambda item: item.score > 0.5)
Parameter Type Default Description
name str (required) Step name for diagnostics.
predicate Callable[[ContextItem], bool] (required) Returns True to keep an item.

postprocessor_step(name, processor) -> PipelineStep

Create a step from a PostProcessor protocol implementation.

from anchor import postprocessor_step
step = postprocessor_step("dedup", my_deduplicator)
Parameter Type Default Description
name str (required) Step name for diagnostics.
processor PostProcessor (required) Object with process(items, query) method.

async_postprocessor_step(name, processor) -> PipelineStep

Async variant. Wraps an AsyncPostProcessor (must have aprocess(items, query)).

Parameter Type Default Description
name str (required) Step name for diagnostics.
processor AsyncPostProcessor (required) Object with aprocess(items, query) method.

reranker_step(name, reranker, top_k=10) -> PipelineStep

Create a step from a Reranker protocol implementation. The reranker scores and returns the top-k most relevant items.

from anchor import reranker_step
step = reranker_step("rerank", my_reranker, top_k=3)
Parameter Type Default Description
name str (required) Step name for diagnostics.
reranker Reranker (required) Object with rerank(query, items, top_k) method.
top_k int 10 Maximum items the reranker should return.

async_reranker_step(name, reranker, top_k=10) -> PipelineStep

Async variant. Wraps an AsyncReranker (must have arerank(query, items, top_k)).

Parameter Type Default Description
name str (required) Step name for diagnostics.
reranker AsyncReranker (required) Object with arerank(query, items, top_k) method.
top_k int 10 Maximum items the reranker should return.

query_transform_step(name, transformer, retriever, top_k=10) -> PipelineStep

Create a step that expands the query into multiple variants, retrieves for each, and merges results using Reciprocal Rank Fusion (RRF). New items are deduplicated by ID.

from anchor import query_transform_step
step = query_transform_step("multi-query", my_transformer, my_retriever, top_k=5)
Parameter Type Default Description
name str (required) Step name for diagnostics.
transformer QueryTransformer (required) Expands a single query into multiple queries.
retriever Retriever (required) Retriever to run against each expanded query.
top_k int 10 Maximum items to retrieve per query variant.

classified_retriever_step(name, classifier, retrievers, default=None, top_k=10) -> PipelineStep

Create a step that classifies the query and routes to the appropriate retriever.

from anchor import classified_retriever_step
step = classified_retriever_step(
    "router",
    classifier=my_classifier,
    retrievers={"technical": tech_retriever, "general": general_retriever},
    default="general",
    top_k=5,
)
Parameter Type Default Description
name str (required) Step name for diagnostics.
classifier QueryClassifier (required) Object with classify(query) -> str method.
retrievers dict[str, Retriever] (required) Mapping from class label to retriever.
default str \| None None Fallback key when the label is not found.
top_k int 10 Maximum items to retrieve.

Raises: RetrieverError if the label has no matching retriever and no default.

auto_promotion_step(extractor, store, consolidator=None, name="auto_promotion", on_error="skip") -> PipelineStep

Create a step that extracts and stores memories from context. This is a side-effect-only step that returns items unchanged.

Parameter Type Default Description
extractor MemoryExtractor (required) Extracts MemoryEntry objects from conversation turns.
store MemoryEntryStore (required) Persistence backend for memory entries.
consolidator MemoryConsolidator \| None None Optional deduplication against existing entries.
name str "auto_promotion" Step name for diagnostics.
on_error "raise" \| "skip" "skip" Error handling policy.

graph_retrieval_step(graph, store, entity_extractor, ...) -> PipelineStep

Create a step that retrieves memory entries linked to graph entities via BFS traversal.

Parameter Type Default Description
graph SimpleGraphMemory (required) Graph memory instance to traverse.
store MemoryEntryStore (required) Store holding MemoryEntry objects.
entity_extractor Callable[[str], list[str]] (required) Maps query string to entity IDs.
max_depth int 2 Maximum BFS traversal depth.
max_items int 5 Maximum ContextItem objects to return.
name str "graph_retrieval" Step name for diagnostics.
on_error "raise" \| "skip" "skip" Error handling policy.

create_eviction_promoter(extractor, store, consolidator=None) -> Callable

Create an on_evict callback that promotes evicted conversation turns to long-term memory. Designed for SlidingWindowMemory(on_evict=...).

from anchor import create_eviction_promoter, SlidingWindowMemory

promoter = create_eviction_promoter(extractor, store, consolidator)
memory = SlidingWindowMemory(max_tokens=4096, on_evict=promoter)
Parameter Type Default Description
extractor MemoryExtractor (required) Extracts MemoryEntry objects from turns.
store MemoryEntryStore (required) Persistence backend.
consolidator MemoryConsolidator \| None None Optional deduplication.

Returns: A callable with signature (list[ConversationTurn]) -> None.

Note

Errors inside the eviction promoter are logged but never propagated to prevent crashing the memory pipeline.


PipelineCallback

A runtime-checkable protocol for pipeline event callbacks. All methods are optional -- implement only the ones you need.

from anchor import PipelineCallback

class PipelineCallback(Protocol):
    def on_pipeline_start(self, query: QueryBundle) -> None: ...
    def on_step_start(self, step_name: str, items: list[ContextItem]) -> None: ...
    def on_step_end(self, step_name: str, items: list[ContextItem], time_ms: float) -> None: ...
    def on_step_error(self, step_name: str, error: Exception) -> None: ...
    def on_pipeline_end(self, result: ContextResult) -> None: ...
Method Called When
on_pipeline_start Pipeline execution begins.
on_step_start A step is about to execute.
on_step_end A step completed successfully.
on_step_error A step raised an exception.
on_pipeline_end Pipeline execution completed.

See Also