Skip to content

Observability

anchor provides built-in tracing, metrics collection, and cost tracking for pipeline execution. Every span, metric point, and cost entry is a Pydantic model -- fully typed and serialisable.


Tracing with Tracer

Tracer is the low-level API for creating traces and spans. A trace groups all operations from a single pipeline execution; each operation is represented as a span.

from anchor.observability import Tracer, SpanKind

tracer = Tracer()

# Start a trace
trace = tracer.start_trace("my-pipeline", attributes={"user": "demo"})

# Start a span within the trace
span = tracer.start_span(
    trace_id=trace.trace_id,
    name="retrieval",
    kind=SpanKind.RETRIEVAL,
    attributes={"top_k": 10},
)

# ... perform retrieval work ...

# End the span
ended_span = tracer.end_span(span, status="ok", attributes={"items": 5})
print(f"Span took {ended_span.duration_ms:.1f} ms")

# End the trace
ended_trace = tracer.end_trace(trace)
print(f"Trace took {ended_trace.total_duration_ms:.1f} ms")

SpanKind values

Value When to use
SpanKind.PIPELINE Root pipeline span
SpanKind.RETRIEVAL Retrieval / search operations
SpanKind.RERANKING Reranking steps
SpanKind.FORMATTING Output formatting
SpanKind.MEMORY Memory / history operations
SpanKind.INGESTION Indexing and ingestion
SpanKind.QUERY_TRANSFORM Query rewriting / expansion

Note

Tracer is not thread-safe. Use one instance per thread or synchronise externally for concurrent tracing.


Automatic Pipeline Tracing

TracingCallback hooks into the pipeline lifecycle and creates spans automatically for the overall execution and each individual step.

from anchor.observability import (
    TracingCallback,
    InMemorySpanExporter,
    InMemoryMetricsCollector,
)

exporter = InMemorySpanExporter()
metrics = InMemoryMetricsCollector()

callback = TracingCallback(
    exporters=[exporter],
    metrics_collector=metrics,
)

# Attach to your pipeline
pipeline.add_callback(callback)

# Run the pipeline -- spans are recorded automatically
result = pipeline.build("What is context engineering?")

# Inspect the trace
trace = callback.last_trace
print(f"Trace ID: {trace.trace_id}")
print(f"Duration: {trace.total_duration_ms:.1f} ms")
print(f"Spans:    {len(trace.spans)}")

# Inspect individual spans
for span in exporter.get_spans():
    print(f"  {span.name}: {span.duration_ms:.1f} ms [{span.status}]")

Tip

TracingCallback automatically infers SpanKind from the step name using heuristics (e.g. a step named "rerank" maps to SpanKind.RERANKING).


Span Exporters

Exporters receive a batch of completed spans and deliver them to a backend. All exporters implement the SpanExporter protocol.

ConsoleSpanExporter

Logs spans as JSON via Python's logging module. Useful for development.

import logging
from anchor.observability import ConsoleSpanExporter

logging.basicConfig(level=logging.INFO)
exporter = ConsoleSpanExporter(log_level=logging.INFO)

callback = TracingCallback(exporters=[exporter])

InMemorySpanExporter

Stores spans in an in-memory list. Ideal for testing and debugging.

from anchor.observability import InMemorySpanExporter

exporter = InMemorySpanExporter()

# After pipeline execution:
spans = exporter.get_spans()
exporter.clear()  # reset for the next run

FileSpanExporter

Appends spans as JSON-Lines to a file on disk.

from anchor.observability import FileSpanExporter

exporter = FileSpanExporter(path="traces.jsonl")

callback = TracingCallback(exporters=[exporter])

OTLPSpanExporter

Exports spans to an OpenTelemetry collector via OTLP/HTTP. Requires the otlp extra.

pip install astro-anchor[otlp]
from anchor.observability import OTLPSpanExporter

exporter = OTLPSpanExporter(
    endpoint="http://localhost:4318",
    service_name="my-app",
    headers={"Authorization": "Bearer ..."},
)

callback = TracingCallback(exporters=[exporter])

# Shut down cleanly when done
exporter.shutdown()

Warning

OTLPSpanExporter raises ImportError at construction if the opentelemetry-exporter-otlp-proto-http and opentelemetry-sdk packages are not installed.


Metrics Collection

Metrics collectors record MetricPoint values. TracingCallback records step durations and pipeline build times automatically when a collector is configured.

InMemoryMetricsCollector

Stores metrics in memory with summary statistics.

from anchor.observability import InMemoryMetricsCollector, MetricPoint

collector = InMemoryMetricsCollector()

# Record manually
collector.record(MetricPoint(name="step.duration_ms", value=12.5, tags={"step": "retrieval"}))
collector.record(MetricPoint(name="step.duration_ms", value=8.3, tags={"step": "rerank"}))

# Query metrics
all_step_durations = collector.get_metrics("step.duration_ms")
print(f"Recorded {len(all_step_durations)} duration metrics")

# Get summary statistics
summary = collector.get_summary("step.duration_ms")
print(f"  avg: {summary['avg']:.1f} ms")
print(f"  p95: {summary['p95']:.1f} ms")
print(f"  min: {summary['min']:.1f} ms")
print(f"  max: {summary['max']:.1f} ms")

collector.clear()

LoggingMetricsCollector

Emits each metric as a structured JSON log message immediately on record.

import logging
from anchor.observability import LoggingMetricsCollector

logging.basicConfig(level=logging.INFO)
collector = LoggingMetricsCollector(log_level=logging.INFO)

OTLPMetricsExporter

Exports metrics to an OpenTelemetry collector via OTLP/HTTP. Requires the otlp extra.

from anchor.observability import OTLPMetricsExporter

exporter = OTLPMetricsExporter(
    endpoint="http://localhost:4318",
    service_name="my-app",
)

callback = TracingCallback(metrics_collector=exporter)

# Flush and shut down when done
exporter.flush()
exporter.shutdown()

Cost Tracking

CostTracker accumulates per-operation cost entries (tokens, USD) and produces aggregated summaries. It is thread-safe.

from anchor.observability import CostTracker

tracker = CostTracker()

# Record embedding cost
tracker.record(
    operation="embedding",
    model="text-embedding-3-small",
    input_tokens=500,
    cost_per_input_token=0.00002,
)

# Record reranking cost
tracker.record(
    operation="rerank",
    model="rerank-v3",
    input_tokens=2000,
    cost_per_input_token=0.00001,
)

# Get summary
summary = tracker.summary()
print(f"Total cost:    ${summary.total_cost_usd:.4f}")
print(f"Input tokens:  {summary.total_input_tokens}")
print(f"By model:      {summary.by_model}")
print(f"By operation:  {summary.by_operation}")

# Reset for next run
tracker.reset()

CostTrackingCallback

CostTrackingCallback is a pipeline callback that automatically records cost entries when pipeline steps produce items with cost-related metadata.

from anchor.observability import CostTracker, CostTrackingCallback

tracker = CostTracker()
cost_callback = CostTrackingCallback(tracker=tracker)

# Attach both tracing and cost tracking
pipeline.add_callback(cost_callback)

# After pipeline execution
summary = tracker.summary()
print(f"Pipeline cost: ${summary.total_cost_usd:.4f}")

The callback looks for these metadata keys on ContextItem.metadata:

Key Type Description
cost_model str Model identifier (triggers recording)
cost_input_tokens int Input tokens consumed
cost_output_tokens int Output tokens produced
cost_per_input_token float USD per input token
cost_per_output_token float USD per output token

Tip

Combine TracingCallback and CostTrackingCallback on the same pipeline to get both performance traces and cost breakdowns.


Full Example: Tracing + Metrics + Cost

from anchor.observability import (
    CostTracker,
    CostTrackingCallback,
    InMemoryMetricsCollector,
    InMemorySpanExporter,
    TracingCallback,
)

# Set up all observers
span_exporter = InMemorySpanExporter()
metrics_collector = InMemoryMetricsCollector()
cost_tracker = CostTracker()

tracing_cb = TracingCallback(
    exporters=[span_exporter],
    metrics_collector=metrics_collector,
)
cost_cb = CostTrackingCallback(tracker=cost_tracker)

pipeline.add_callback(tracing_cb)
pipeline.add_callback(cost_cb)

# Run the pipeline
result = pipeline.build("What is context engineering?")

# Inspect results
print("--- Spans ---")
for span in span_exporter.get_spans():
    print(f"  {span.name}: {span.duration_ms:.1f} ms")

print("--- Metrics ---")
summary = metrics_collector.get_summary("pipeline.build_time_ms")
if summary:
    print(f"  Build time avg: {summary['avg']:.1f} ms")

print("--- Cost ---")
cost_summary = cost_tracker.summary()
print(f"  Total: ${cost_summary.total_cost_usd:.4f}")

Next Steps