Advanced Retrieval¶
This guide covers advanced retrieval features: reranker pipelines, async retrievers, query routing, cross-modal search, late interaction, and memory-aware retrieval. For core retrieval, see Retrieval Guide.
Reranker Pipeline¶
All rerankers conform to the Reranker protocol with rerank(query, items, top_k).
CrossEncoderReranker¶
Scores each item with a user-provided (query_str, doc_content) -> float function.
from anchor.retrieval import CrossEncoderReranker
reranker = CrossEncoderReranker(score_fn=my_scorer, top_k=10)
results = reranker.rerank(query, items)
CohereReranker¶
Batch reranking via a callback that takes (query_str, documents, top_k) and returns (original_index, score) tuples.
from anchor.retrieval import CohereReranker
def cohere_rerank(query: str, docs: list[str], top_k: int) -> list[tuple[int, float]]:
response = co.rerank(query=query, documents=docs, top_n=top_k)
return [(r.index, r.relevance_score) for r in response.results]
reranker = CohereReranker(rerank_fn=cohere_rerank, top_k=10)
FlashRankReranker¶
Local cross-encoder reranking using flashrank. The model is lazily loaded.
Warning
Requires: pip install astro-anchor[flashrank]
from anchor.retrieval import FlashRankReranker
reranker = FlashRankReranker(model_name="ms-marco-MiniLM-L-12-v2", top_k=10)
results = reranker.rerank(query, items)
RoundRobinReranker¶
Merges multiple result sets round-robin (deduplicating by ID), or re-sorts by existing score via rerank().
from anchor.retrieval import RoundRobinReranker
rr = RoundRobinReranker(top_k=10)
merged = rr.rerank_multiple(query, [dense_results, sparse_results], top_k=10)
RerankerPipeline¶
Chains multiple rerankers sequentially. Intermediate stages pass all items through; top_k is applied only at the end.
from anchor.retrieval import RerankerPipeline
pipeline = RerankerPipeline(rerankers=[cross_encoder, cohere_reranker], top_k=5)
results = pipeline.rerank(query, items)
Tip
Place cheaper rerankers first (e.g., FlashRankReranker) to reduce the candidate set before expensive API-based rerankers (CohereReranker).
Async Retrievers¶
Async variants implement the AsyncRetriever protocol (aretrieve() method) for non-blocking I/O during embedding lookups and API calls.
AsyncDenseRetriever¶
Async embedding-based retriever. Use aindex() to embed items, or index() for pre-embedded items (those with "embedding" in metadata).
from anchor.retrieval import AsyncDenseRetriever
async def async_embed(text: str) -> list[float]:
return await embedding_service.embed(text)
retriever = AsyncDenseRetriever(embed_fn=async_embed)
await retriever.aindex(items)
results = await retriever.aretrieve(query, top_k=5)
AsyncHybridRetriever¶
Fans out to multiple async retrievers concurrently via asyncio.gather and fuses results with RRF.
from anchor.retrieval import AsyncHybridRetriever
hybrid = AsyncHybridRetriever(retrievers=[ret_a, ret_b], weights=[0.7, 0.3], k=60)
results = await hybrid.aretrieve(query, top_k=10)
Note
Failed sub-retrievers are logged and skipped. If all fail, an empty list is returned.
Async Rerankers¶
AsyncCrossEncoderReranker-- scores items concurrently viaasyncio.gather.AsyncCohereReranker-- batch reranking via an async callback.
from anchor.retrieval import AsyncCrossEncoderReranker
reranker = AsyncCrossEncoderReranker(score_fn=async_scorer)
results = await reranker.arerank(query, items, top_k=5)
Query Routing¶
Query routers direct queries to different retriever backends based on the query content. Use RoutedRetriever to wrap a router and a set of named retrievers.
KeywordRouter¶
Routes based on keyword matching. First keyword match wins; falls back to a default route.
from anchor.retrieval import KeywordRouter, RoutedRetriever
router = KeywordRouter(
routes={
"code": ["function", "class", "import", "def"],
"docs": ["documentation", "guide", "tutorial"],
},
default="general",
case_sensitive=False,
)
routed = RoutedRetriever(
router=router,
retrievers={"code": code_retriever, "docs": docs_retriever, "general": general_retriever},
)
results = routed.retrieve(query, top_k=10)
CallbackRouter¶
Routes using a user-provided function that takes a QueryBundle and returns a route name (or None for the default).
from anchor.retrieval import CallbackRouter, RoutedRetriever
def classify(q):
if len(q.query_str) > 100:
return "detailed"
return None # falls back to default
router = CallbackRouter(callback=classify, default="quick")
MetadataRouter¶
Routes based on a metadata field in the QueryBundle. Inspects query.metadata[key] and uses its value as the route name.
from anchor.retrieval import MetadataRouter, RoutedRetriever
from anchor.models.query import QueryBundle
router = MetadataRouter(metadata_key="domain", default="general")
query = QueryBundle(query_str="explain auth", metadata={"domain": "security"})
# Routes to the "security" retriever
RoutedRetriever¶
Wraps any router and a mapping of named retrievers.
| Parameter | Type | Description |
|---|---|---|
router | KeywordRouter \| CallbackRouter \| MetadataRouter | The query router. |
retrievers | dict[str, Retriever] | Mapping of route names to retrievers. |
default_retriever | str \| None | Fallback retriever name if route not found in retrievers. |
Warning
If the route maps to an unknown retriever and no default_retriever is configured, RetrieverError is raised.
Cross-Modal Retrieval¶
Cross-modal retrieval searches across different content types (text, image, audio) in a shared embedding space.
CrossModalEncoder¶
Encodes content from multiple modalities using modality-specific callbacks.
import math
from anchor.retrieval import CrossModalEncoder
def text_encoder(text: str) -> list[float]:
vals = [ord(c) for c in text[:20]]
raw = [math.sin(sum(vals)), math.cos(sum(vals))]
norm = math.sqrt(sum(v * v for v in raw)) or 1.0
return [v / norm for v in raw]
encoder = CrossModalEncoder(
encoders={"text": text_encoder, "image": image_encoder}
)
embedding = encoder.encode("hello world", modality="text")
print(encoder.modalities) # ["image", "text"]
SharedSpaceRetriever¶
Retriever that searches across modalities in a shared embedding space.
from anchor.retrieval import SharedSpaceRetriever
retriever = SharedSpaceRetriever(
encoder=encoder,
query_modality="text",
similarity_fn=None, # defaults to cosine similarity
)
retriever.index(items, modality="text")
results = retriever.retrieve(query, top_k=5)
Items can specify their modality via metadata["modality"]. When no modality is provided to index(), it defaults to "text".
Late Interaction¶
Late interaction models (ColBERT-style) produce per-token embeddings and compute fine-grained relevance via MaxSim scoring.
MaxSimScorer¶
ColBERT-style scorer: for each query token, finds the maximum cosine similarity across all document tokens, then sums the maxima.
from anchor.retrieval import MaxSimScorer
scorer = MaxSimScorer()
score = scorer.score(query_tokens, doc_tokens)
LateInteractionScorer¶
Configurable wrapper that defaults to MaxSimScorer but accepts a custom scoring function.
from anchor.retrieval import LateInteractionScorer
scorer = LateInteractionScorer(score_fn=None) # uses MaxSim
LateInteractionRetriever¶
Two-stage retriever: a first-stage retriever generates candidates, then a token-level encoder re-scores each candidate.
from anchor.retrieval import LateInteractionRetriever, LateInteractionScorer
retriever = LateInteractionRetriever(
first_stage=dense_retriever,
encoder=token_level_encoder, # implements TokenLevelEncoder protocol
scorer=LateInteractionScorer(),
first_stage_k=100,
)
results = retriever.retrieve(query, top_k=10)
| Parameter | Type | Description |
|---|---|---|
first_stage | Retriever | Candidate generation retriever. |
encoder | TokenLevelEncoder | Produces per-token embeddings. |
scorer | LateInteractionScorer \| None | Scoring function. Defaults to MaxSim. |
first_stage_k | int | Number of candidates from the first stage (default 100). |
The TokenLevelEncoder protocol requires a single method:
Memory Retrieval¶
ScoredMemoryRetriever combines recency, relevance, and importance signals to retrieve from a MemoryEntryStore.
ScoredMemoryRetriever¶
from anchor.retrieval import ScoredMemoryRetriever
retriever = ScoredMemoryRetriever(
store=memory_store,
embed_fn=embed_fn, # optional
vector_store=vector_store, # optional
decay=None, # optional MemoryDecay
alpha=0.3, # recency weight
beta=0.5, # relevance weight
gamma=0.2, # importance weight
)
# Add entries
retriever.add_entry(memory_entry)
# Retrieve with filters
top_memories = retriever.retrieve(
"what did the user say about testing?",
top_k=5,
user_id="user-123",
memory_type="preference",
)
The composite score formula:
- recency: Exponential decay (7-day half-life) or custom
MemoryDecay. - relevance: Cosine similarity from
VectorStoreor keyword overlap. - importance: The entry's
relevance_scorefield.
MemoryRetrieverAdapter¶
Bridges ScoredMemoryRetriever to the Retriever protocol for pipeline integration. Converts MemoryEntry objects to ContextItem objects.
What's Next¶
- Retrieval Guide -- core dense, sparse, and hybrid retrieval
- Storage Guide -- store protocols and implementations
- Retrieval API Reference -- full signatures for all classes