Pipeline API Reference¶
API reference for the pipeline module -- the orchestration layer of anchor.
ContextPipeline¶
The main orchestrator that assembles context from multiple sources into a token-aware, priority-ranked context window.
from anchor import ContextPipeline
class ContextPipeline:
def __init__(
self,
max_tokens: int = 8192,
tokenizer: Tokenizer | None = None,
budget: TokenBudget | None = None,
) -> None: ...
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
max_tokens | int | 8192 | Maximum token budget for the context window. Must be positive. |
tokenizer | Tokenizer \| None | None | Custom tokenizer. Falls back to the built-in TiktokenCounter. |
budget | TokenBudget \| None | None | Optional token budget for fine-grained per-source allocation. |
Raises: ValueError if max_tokens <= 0.
Properties¶
| Property | Type | Description |
|---|---|---|
max_tokens | int | The maximum token budget for the context window. |
formatter | Formatter | The current output formatter. |
steps | list[PipelineStep] | A copy of the registered pipeline steps. |
system_items | list[ContextItem] | A copy of the registered system items. |
budget | TokenBudget \| None | The optional token budget. |
Methods¶
add_step(step) -> ContextPipeline¶
Add a pipeline step. Returns self for chaining.
with_memory(memory) -> ContextPipeline¶
Attach a memory provider. Any object satisfying the MemoryProvider protocol (i.e. having a get_context_items() -> list[ContextItem] method) is accepted.
with_budget(budget) -> ContextPipeline¶
Attach a TokenBudget for fine-grained allocation.
with_formatter(formatter) -> ContextPipeline¶
Set the output formatter.
add_system_prompt(content, priority=10) -> ContextPipeline¶
Add a system prompt as a high-priority context item with source=SourceType.SYSTEM.
add_callback(callback) -> ContextPipeline¶
Register an event callback for pipeline observability.
with_query_enricher(enricher) -> ContextPipeline¶
Attach a query enricher for memory-aware query expansion. The enricher is called after memory items are collected but before pipeline steps execute.
step(fn=None, *, name=None, on_error="raise")¶
Decorator to register a synchronous function as a pipeline step. Usable with or without arguments: @pipeline.step or @pipeline.step(name="x", on_error="skip").
Raises: TypeError if the function is async (use async_step instead).
async_step(fn=None, *, name=None, on_error="raise")¶
Decorator to register an async function as a pipeline step. Same usage pattern as step.
Raises: TypeError if the function is not async.
build(query) -> ContextResult¶
Execute the full pipeline synchronously and return assembled context.
Accepts either a plain string (auto-wrapped in QueryBundle) or a QueryBundle.
abuild(query) -> ContextResult¶
Execute the full pipeline asynchronously. Supports both sync and async steps: sync steps are called directly, async steps are awaited.
PipelineStep¶
A single composable step in the context pipeline. This is a dataclass.
from anchor import PipelineStep
@dataclass(slots=True)
class PipelineStep:
name: str
fn: SyncStepFn | AsyncStepFn
is_async: bool = False
on_error: Literal["raise", "skip"] = "raise"
metadata: dict[str, Any] = field(default_factory=dict)
Fields:
| Field | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Human-readable name for diagnostics. |
fn | StepFn | (required) | The callable implementing the step logic. |
is_async | bool | False | Whether fn is an async function. |
on_error | "raise" \| "skip" | "raise" | Error handling policy. |
metadata | dict[str, Any] | {} | Arbitrary metadata. |
Methods¶
execute(items, query) -> list[ContextItem]¶
Execute the step synchronously. Raises TypeError if the step is async.
aexecute(items, query) -> list[ContextItem]¶
Execute the step asynchronously. Works for both sync and async step functions.
Factory Functions¶
retriever_step(name, retriever, top_k=10) -> PipelineStep¶
Create a step from a Retriever protocol implementation. Appends retrieved items to the current list.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name for diagnostics. |
retriever | Retriever | (required) | Object with retrieve(query, top_k) method. |
top_k | int | 10 | Maximum items to retrieve. |
async_retriever_step(name, retriever, top_k=10) -> PipelineStep¶
Async variant. Wraps an AsyncRetriever (must have aretrieve(query, top_k)).
from anchor import async_retriever_step
step = async_retriever_step("async-search", my_async_retriever, top_k=5)
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name for diagnostics. |
retriever | AsyncRetriever | (required) | Object with aretrieve(query, top_k) method. |
top_k | int | 10 | Maximum items to retrieve. |
filter_step(name, predicate) -> PipelineStep¶
Create a step that filters items by a predicate function. Items where the predicate returns False are removed.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name for diagnostics. |
predicate | Callable[[ContextItem], bool] | (required) | Returns True to keep an item. |
postprocessor_step(name, processor) -> PipelineStep¶
Create a step from a PostProcessor protocol implementation.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name for diagnostics. |
processor | PostProcessor | (required) | Object with process(items, query) method. |
async_postprocessor_step(name, processor) -> PipelineStep¶
Async variant. Wraps an AsyncPostProcessor (must have aprocess(items, query)).
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name for diagnostics. |
processor | AsyncPostProcessor | (required) | Object with aprocess(items, query) method. |
reranker_step(name, reranker, top_k=10) -> PipelineStep¶
Create a step from a Reranker protocol implementation. The reranker scores and returns the top-k most relevant items.
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name for diagnostics. |
reranker | Reranker | (required) | Object with rerank(query, items, top_k) method. |
top_k | int | 10 | Maximum items the reranker should return. |
async_reranker_step(name, reranker, top_k=10) -> PipelineStep¶
Async variant. Wraps an AsyncReranker (must have arerank(query, items, top_k)).
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name for diagnostics. |
reranker | AsyncReranker | (required) | Object with arerank(query, items, top_k) method. |
top_k | int | 10 | Maximum items the reranker should return. |
query_transform_step(name, transformer, retriever, top_k=10) -> PipelineStep¶
Create a step that expands the query into multiple variants, retrieves for each, and merges results using Reciprocal Rank Fusion (RRF). New items are deduplicated by ID.
from anchor import query_transform_step
step = query_transform_step("multi-query", my_transformer, my_retriever, top_k=5)
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name for diagnostics. |
transformer | QueryTransformer | (required) | Expands a single query into multiple queries. |
retriever | Retriever | (required) | Retriever to run against each expanded query. |
top_k | int | 10 | Maximum items to retrieve per query variant. |
classified_retriever_step(name, classifier, retrievers, default=None, top_k=10) -> PipelineStep¶
Create a step that classifies the query and routes to the appropriate retriever.
from anchor import classified_retriever_step
step = classified_retriever_step(
"router",
classifier=my_classifier,
retrievers={"technical": tech_retriever, "general": general_retriever},
default="general",
top_k=5,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
name | str | (required) | Step name for diagnostics. |
classifier | QueryClassifier | (required) | Object with classify(query) -> str method. |
retrievers | dict[str, Retriever] | (required) | Mapping from class label to retriever. |
default | str \| None | None | Fallback key when the label is not found. |
top_k | int | 10 | Maximum items to retrieve. |
Raises: RetrieverError if the label has no matching retriever and no default.
auto_promotion_step(extractor, store, consolidator=None, name="auto_promotion", on_error="skip") -> PipelineStep¶
Create a step that extracts and stores memories from context. This is a side-effect-only step that returns items unchanged.
| Parameter | Type | Default | Description |
|---|---|---|---|
extractor | MemoryExtractor | (required) | Extracts MemoryEntry objects from conversation turns. |
store | MemoryEntryStore | (required) | Persistence backend for memory entries. |
consolidator | MemoryConsolidator \| None | None | Optional deduplication against existing entries. |
name | str | "auto_promotion" | Step name for diagnostics. |
on_error | "raise" \| "skip" | "skip" | Error handling policy. |
graph_retrieval_step(graph, store, entity_extractor, ...) -> PipelineStep¶
Create a step that retrieves memory entries linked to graph entities via BFS traversal.
| Parameter | Type | Default | Description |
|---|---|---|---|
graph | SimpleGraphMemory | (required) | Graph memory instance to traverse. |
store | MemoryEntryStore | (required) | Store holding MemoryEntry objects. |
entity_extractor | Callable[[str], list[str]] | (required) | Maps query string to entity IDs. |
max_depth | int | 2 | Maximum BFS traversal depth. |
max_items | int | 5 | Maximum ContextItem objects to return. |
name | str | "graph_retrieval" | Step name for diagnostics. |
on_error | "raise" \| "skip" | "skip" | Error handling policy. |
create_eviction_promoter(extractor, store, consolidator=None) -> Callable¶
Create an on_evict callback that promotes evicted conversation turns to long-term memory. Designed for SlidingWindowMemory(on_evict=...).
from anchor import create_eviction_promoter, SlidingWindowMemory
promoter = create_eviction_promoter(extractor, store, consolidator)
memory = SlidingWindowMemory(max_tokens=4096, on_evict=promoter)
| Parameter | Type | Default | Description |
|---|---|---|---|
extractor | MemoryExtractor | (required) | Extracts MemoryEntry objects from turns. |
store | MemoryEntryStore | (required) | Persistence backend. |
consolidator | MemoryConsolidator \| None | None | Optional deduplication. |
Returns: A callable with signature (list[ConversationTurn]) -> None.
Note
Errors inside the eviction promoter are logged but never propagated to prevent crashing the memory pipeline.
PipelineCallback¶
A runtime-checkable protocol for pipeline event callbacks. All methods are optional -- implement only the ones you need.
from anchor import PipelineCallback
class PipelineCallback(Protocol):
def on_pipeline_start(self, query: QueryBundle) -> None: ...
def on_step_start(self, step_name: str, items: list[ContextItem]) -> None: ...
def on_step_end(self, step_name: str, items: list[ContextItem], time_ms: float) -> None: ...
def on_step_error(self, step_name: str, error: Exception) -> None: ...
def on_pipeline_end(self, result: ContextResult) -> None: ...
| Method | Called When |
|---|---|
on_pipeline_start | Pipeline execution begins. |
on_step_start | A step is about to execute. |
on_step_end | A step completed successfully. |
on_step_error | A step raised an exception. |
on_pipeline_end | Pipeline execution completed. |
See Also¶
- Pipeline Guide -- walkthrough with complete examples
- Models API Reference --
ContextItem,ContextResult,TokenBudget - Exceptions Reference -- error classes