Distributed Tracing for ML Pipelines: OpenTelemetry Integration
You're staring at logs from your LLM inference pipeline-pipelines-training-orchestration)-fundamentals)). A user's request is taking 8 seconds instead of the expected 2 seconds. But where? Is it the embedding-engineering-chunking-embedding-retrieval) model hanging? The reranking step? Network latency between services? Without distributed tracing, you're debugging blind.
Welcome to the problem that kills production ML systems-strategies-ml-systems-monitoring): visibility into multi-stage inference pipelines-scale)-real-time-ml-features)-apache-spark)-training-smaller-models). When requests flow through preprocessing, multiple model forward passes, postprocessing, and async Kafka steps - all happening across different services - traditional logging becomes a needle-in-a-haystack exercise.
That's where OpenTelemetry (OTel) comes in. It gives you end-to-end visibility into your ML pipeline with structured traces that follow requests across services, models, and async operations. In this article, we'll build a production-ready distributed tracing setup using OTel SDKs, context propagation, and visualization in Grafana-grafana-ml-infrastructure-metrics) Tempo.
Let's trace the journey of a request through your system and never lose visibility again.
Table of Contents
- Why Distributed Tracing Matters for ML Pipelines
- The Problem with Traditional Monitoring
- Why ML Pipelines Are Special
- Setting Up the OpenTelemetry SDK for ML Workloads
- Installation and Basic Setup
- Instrumenting Your ML Pipeline with Spans
- Context Propagation: Keeping the Trace Connected
- Propagating via Kafka Headers
- Propagating via gRPC Metadata
- Propagating via Ray Remote Calls
- Tracing Inference Path Latency: Finding the Critical Path
- Sampling Strategies: Reducing Trace Volume Without Losing Signals
- Head-Based Sampling: Decide Early
- Tail-Based Sampling: Decide at the End
- Custom Sampling: Intelligent Decisions
- Collecting and Exporting: The OTLP Collector
- Collector Configuration
- Updating Your SDK to Use the Collector
- Visualization: Understanding Your Traces in Grafana Tempo
- Querying Traces
- Custom Dashboards
- Correlating with Metrics and Logs
- Production Considerations
- Trace Retention
- Cost Optimization
- High-Throughput Setup
- Performance Debugging With Traces: Real-World Scenarios
- Scenario 1: Identifying Unexplained Latency
- Scenario 2: Queueing and Resource Contention
- Scenario 3: Service Degradation Detection
- Instrumentation Best Practices
- Span Naming Conventions
- Critical Attributes
- Error Handling in Traces
- Working Example: Complete Multi-Stage LLM Pipeline
- The Economics of Observability: When to Instrument and When to Sample
- Advanced Patterns: Tracing for Cost Optimization and Model Performance
- Getting Started: The Minimal Viable Tracing Setup
- Implementation Beyond the Basics: What Real Teams Build
- Advanced Instrumentation: Building ML-Aware Tracing
- The Path to Observability Excellence: Building on Your Traces
- Operational Excellence Through Continuous Monitoring
- Summary
Why Distributed Tracing Matters for ML Pipelines
Regular monitoring tells you what is slow. Distributed tracing tells you why and where.
A typical ML inference pipeline looks deceptively simple: request comes in, models run, response goes out. But the reality is messier. You've got:
- Preprocessing steps that tokenize, normalize, or vectorize input
- Multiple model stages (embedding → reranking → generation)
- Async operations pushed to Kafka for non-blocking enrichment
- Cross-service calls via gRPC, HTTP, or Ray remotes
- Caching layers that might short-circuit the entire flow
When latency spikes, you need to know which stage is the culprit. Distributed tracing instruments each stage with spans - atomic units of work - and connects them with trace IDs that flow across service boundaries.
Here's the payoff: you can see that embedding took 150ms, reranking took 1200ms (oh, there's your problem), and generation took 800ms. You can identify the critical path and optimize ruthlessly.
The Problem with Traditional Monitoring
Most teams rely on logs and basic metrics for observability. When your embedding service logs "request completed in 2 seconds," and your reranking service logs "started processing," you can't connect the dots. Was the 2 seconds inside the embedding service, or was it waiting in the queue? Did reranking overlap with embedding, or was it sequential?
Traditional monitoring answers these questions poorly. You're left piecing together timestamps from multiple log files, trying to reconstruct what happened. With thousands of requests per second, this approach becomes impossible.
Distributed tracing solves this by treating the entire request - across all services - as a single "trace." Every operation within that trace gets a unique "span" that records when it started, when it ended, and what happened. The trace ID flows with the request, so you can reconstruct the exact sequence of events.
Why ML Pipelines Are Special
ML inference pipelines are uniquely complex. Unlike typical microservices that are mostly I/O-bound (waiting for databases or APIs), ML services are compute-bound. A single model forward pass can take 2 seconds. You have multiple models running in sequence. Resource contention (GPUs, memory) can cause unpredictable latency.
This means optimization requires understanding not just which service is slow, but which operation within that service is slow. Is the embedding model's first forward pass slow because the model is loading from disk? Is reranking slow because it's doing redundant computations? Is your Kafka consumer lagging, causing downstream components to wait?
Distributed tracing lets you answer these questions with precision. You can see exactly which part of your pipeline is the bottleneck, then optimize it.
Setting Up the OpenTelemetry SDK for ML Workloads
Let's instrument a real ML pipeline. We'll use Python's OTel SDK because that's where most data science happens.
Installation and Basic Setup
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-jaeger \
opentelemetry-instrumentation-requests opentelemetry-instrumentation-grpc \
opentelemetry-instrumentation-sqlalchemyNow, initialize the tracer and exporter:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.resources import Resource
# Configure resource (metadata about your service)
resource = Resource.create({
"service.name": "ml-inference-pipeline",
"service.version": "1.0.0",
"deployment.environment": "production",
})
# Set up Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
# Create tracer provider with batch processor
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(jaeger_exporter))
trace.set_tracer_provider(tracer_provider)
# Get tracer
tracer = trace.get_tracer(__name__)Why batch processing? Because sending every span individually would crush your network. The BatchSpanProcessor collects spans and ships them in batches, far more efficient for high-throughput systems.
Instrumenting Your ML Pipeline with Spans
Now let's trace each stage of inference. We'll instrument preprocessing, model forward passes, and postprocessing:
import time
from opentelemetry import trace, attributes
tracer = trace.get_tracer(__name__)
def preprocess_input(raw_text: str) -> dict:
"""Tokenize and normalize input."""
with tracer.start_as_current_span("preprocessing") as span:
span.set_attribute("input.length", len(raw_text))
# Simulate tokenization
time.sleep(0.1)
tokens = raw_text.lower().split()
span.set_attribute("token.count", len(tokens))
span.set_attribute("status", "success")
return {"tokens": tokens, "text": raw_text}
def embed_text(tokens: list) -> dict:
"""Generate embeddings."""
with tracer.start_as_current_span("embedding") as span:
span.set_attribute("model.name", "sentence-transformers/all-MiniLM-L6-v2")
span.set_attribute("model.version", "1.2.0")
span.set_attribute("batch_size", len(tokens))
# Simulate embedding model forward pass
time.sleep(0.15)
embeddings = [[0.1 * i for i in range(384)] for _ in tokens]
span.set_attribute("embedding.dimension", 384)
return {"embeddings": embeddings, "tokens": tokens}
def rerank_results(embeddings: dict, query: str) -> dict:
"""Rerank results using a reranker model."""
with tracer.start_as_current_span("reranking") as span:
span.set_attribute("model.name", "cross-encoder/ms-marco-MiniLM-L-12-v2")
span.set_attribute("candidate.count", len(embeddings["embeddings"]))
# Simulate reranking (usually the slowest step)
time.sleep(1.2)
span.set_attribute("output.count", len(embeddings["embeddings"]))
return {"ranked": embeddings["embeddings"], "scores": [0.95, 0.87, 0.72]}
def generate_response(context: dict, query: str) -> str:
"""Generate final response using LLM."""
with tracer.start_as_current_span("generation") as span:
span.set_attribute("model.name", "meta-llama/Llama-2-7b-chat")
span.set_attribute("input_tokens", 50)
span.set_attribute("max_new_tokens", 200)
# Simulate LLM forward pass
time.sleep(0.8)
span.set_attribute("output_tokens", 87)
span.set_attribute("stop_reason", "max_tokens")
return "Generated response based on ranked context."
def inference_pipeline(query: str) -> str:
"""End-to-end inference pipeline."""
with tracer.start_as_current_span("inference_request") as root_span:
root_span.set_attribute("query.length", len(query))
root_span.set_attribute("user.id", "user_123")
# Execute pipeline stages
preprocessed = preprocess_input(query)
embedded = embed_text(preprocessed["tokens"])
reranked = rerank_results(embedded, query)
response = generate_response(reranked, query)
root_span.set_attribute("total_stages", 4)
return response
# Run the pipeline
result = inference_pipeline("What is distributed tracing?")
print(result)Notice the pattern: each function wraps its work in a span. You set attributes - custom metadata that describes what happened. The model.version, batch_size, input_tokens attributes give context to your traces. When reranking takes 1.2 seconds, you'll see it in the trace timeline.
Context Propagation: Keeping the Trace Connected
Here's the tricky part: your pipeline doesn't run in a single process. Preprocessing might happen synchronously, but expensive operations (like document retrieval or semantic search) get pushed to Kafka. Model inference might run on a Ray cluster. How do you keep the trace connected across these boundaries?
The answer is trace context propagation. You attach the current trace context (trace ID, span ID) to outgoing messages and HTTP requests, then extract it on the receiving end.
Propagating via Kafka Headers
from opentelemetry.propagate import inject, extract
from opentelemetry import trace, baggage
import json
tracer = trace.get_tracer(__name__)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
def send_to_retrieval_service(query: str):
"""Send query to async retrieval service via Kafka."""
with tracer.start_as_current_span("send_retrieval_request") as span:
# Create headers dict for Kafka
headers = {}
# Inject current trace context into headers
inject(headers)
# Convert headers to Kafka format (list of tuples)
kafka_headers = [(k, v.encode() if isinstance(v, str) else v)
for k, v in headers.items()]
message = {
"query": query,
"request_id": span.get_span_context().trace_id
}
producer.send(
"retrieval-requests",
value=json.dumps(message).encode(),
headers=kafka_headers
)
span.set_attribute("kafka.topic", "retrieval-requests")
# Consumer side: extract context
from kafka import KafkaConsumer
consumer = KafkaConsumer('retrieval-requests')
def process_retrieval_request(record):
"""Process request and continue the trace."""
# Extract trace context from Kafka headers
context = extract(dict(record.headers))
# Use extracted context for this span
with trace.get_tracer(__name__).start_as_current_span(
"retrieve_documents",
context=context
) as span:
query = json.loads(record.value)["query"]
# Do expensive retrieval
documents = retrieve_semantic_neighbors(query)
span.set_attribute("document.count", len(documents))
span.set_attribute("retrieval.latency_ms", 450)
return documentsThe magic: inject(headers) serializes the trace context into headers. extract(dict(record.headers)) deserializes it. The consumer continues the trace as a child span, maintaining the complete request flow.
Propagating via gRPC Metadata
For synchronous services, gRPC metadata works similarly:
from grpc import aio
from opentelemetry.instrumentation.grpc import GrpcInstrumentationServer, GrpcInstrumentationClient
from opentelemetry.propagate import inject, extract
# Instrument gRPC client
GrpcInstrumentationClient().instrument()
# Instrument gRPC server
GrpcInstrumentationServer().instrument()
async def call_ranking_service(candidates: list) -> list:
"""Call ranking service via gRPC."""
with tracer.start_as_current_span("rank_service_rpc") as span:
# gRPC instrumentation handles context propagation automatically
# No manual inject/extract needed!
channel = aio.secure_channel('ranking-service:50051', credentials)
stub = RankingServiceStub(channel)
response = await stub.RankCandidates(
RankRequest(candidates=candidates)
)
span.set_attribute("rpc.service", "RankingService")
span.set_attribute("rpc.method", "RankCandidates")
return response.rankedThe built-in instrumentation handles propagation for you. This is why auto-instrumentation packages are so valuable - they solve the propagation problem at the framework level.
Propagating via Ray Remote Calls
Ray requires explicit context passing:
import ray
from opentelemetry import trace
from opentelemetry.sdk.trace import get_tracer_provider
tracer = trace.get_tracer(__name__)
@ray.remote
def run_model_parallel(batch_id: str, trace_context: dict):
"""Run model inference on Ray worker."""
# Extract context on worker
from opentelemetry.propagate import extract
context = extract(trace_context)
with tracer.start_as_current_span(
"ray_model_inference",
context=context
) as span:
span.set_attribute("ray.batch_id", batch_id)
span.set_attribute("ray.worker_pid", os.getpid())
# Model inference happens here
results = model.forward(batch_id)
return results
def distribute_inference(batches: list):
"""Distribute model runs across Ray cluster."""
with tracer.start_as_current_span("distribute_inference") as span:
# Serialize current trace context
from opentelemetry.propagate import inject
trace_context = {}
inject(trace_context)
# Send to Ray workers with context
futures = [
run_model_parallel.remote(batch_id, trace_context)
for batch_id in batches
]
results = ray.get(futures)
span.set_attribute("batch.count", len(batches))
return resultsFor Ray, you manually serialize and pass the context because Ray's execution model is different from gRPC's built-in instrumentation.
Tracing Inference Path Latency: Finding the Critical Path
Here's where distributed tracing shines: identifying which stage in your multi-model pipeline is the bottleneck.
graph LR
A["Request In<br/>0ms"] -->|100ms| B["Preprocessing<br/>100ms"]
B -->|150ms| C["Embedding<br/>250ms"]
C -->|1200ms| D["Reranking<br/>1450ms"]
D -->|800ms| E["LLM Generation<br/>2250ms"]
E -->|50ms| F["Response Out<br/>2300ms"]
style D fill:#ff6b6b
style C fill:#ffd43b
style E fill:#74c0fc
style B fill:#a8e6cfThis trace tells you reranking is your critical path - 1200ms of your 2300ms total. That's 52% of latency! Your optimization strategy: parallelize reranking, switch to a faster model, or batch requests.
In Jaeger or Grafana Tempo, you'd see this waterfall view automatically. But let's also instrument custom metrics-hpa-custom-metrics) to track per-stage latency:
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.prometheus import PrometheusMetricReader
# Set up metrics
reader = PeriodicExportingMetricReader(jaeger_exporter)
meter_provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter(__name__)
# Create histograms for each stage
preprocessing_latency = meter.create_histogram(
"ml.preprocessing.latency_ms",
description="Preprocessing latency in milliseconds"
)
embedding_latency = meter.create_histogram(
"ml.embedding.latency_ms",
description="Embedding model latency"
)
reranking_latency = meter.create_histogram(
"ml.reranking.latency_ms",
description="Reranking model latency"
)
generation_latency = meter.create_histogram(
"ml.generation.latency_ms",
description="LLM generation latency"
)
def inference_pipeline_with_metrics(query: str) -> str:
"""Pipeline with explicit latency tracking."""
import time
with tracer.start_as_current_span("inference_request") as root_span:
# Preprocessing
start = time.time()
preprocessed = preprocess_input(query)
preprocessing_latency.record(int((time.time() - start) * 1000))
# Embedding
start = time.time()
embedded = embed_text(preprocessed["tokens"])
embedding_latency.record(int((time.time() - start) * 1000))
# Reranking
start = time.time()
reranked = rerank_results(embedded, query)
reranking_latency.record(int((time.time() - start) * 1000))
# Generation
start = time.time()
response = generate_response(reranked, query)
generation_latency.record(int((time.time() - start) * 1000))
return responseNow you can query Prometheus: "What's the p95 latency of reranking?" The answer drives your optimization roadmap.
Sampling Strategies: Reducing Trace Volume Without Losing Signals
If your inference pipeline handles 10,000 requests per second, collecting every single trace would generate petabytes of data. You need sampling strategies.
There are three main approaches:
Head-Based Sampling: Decide Early
Sample at the beginning of the request, before you know if it's interesting.
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased
# Sample 10% of requests
sampler = TraceIdRatioBased(0.1)
tracer_provider = TracerProvider(
resource=resource,
sampler=sampler
)Simple but dumb - you might miss slow requests in that 90% you discard.
Tail-Based Sampling: Decide at the End
Collect all traces, but only export the interesting ones (slow, errors, etc.).
from opentelemetry.sdk.trace.export import SpanExportResult
from opentelemetry.sdk.trace import ReadableSpan
class TailSamplingProcessor:
"""Export only slow or error traces."""
def __init__(self, latency_threshold_ms: int = 500):
self.latency_threshold_ms = latency_threshold_ms
def on_end(self, span: ReadableSpan) -> None:
# Get root span duration
if hasattr(span, 'end_time') and hasattr(span, 'start_time'):
duration_ms = (span.end_time - span.start_time) / 1_000_000
# Export if slow or has errors
should_export = (
duration_ms > self.latency_threshold_ms or
span.status.is_recording
)
if should_export:
self.exporter.export([span])
# Use it
tail_processor = TailSamplingProcessor(latency_threshold_ms=500)
tracer_provider.add_span_processor(tail_processor)Tail-based sampling captures anomalies but requires buffering all traces - expensive memory-wise.
Custom Sampling: Intelligent Decisions
Combine both approaches with custom logic:
from opentelemetry.sdk.trace.sampling import Sampler, SamplingResult, Decision
class SmartSampler(Sampler):
"""Sample 100% of errors, 10% of slow requests, 1% of fast requests."""
def should_sample(self, parent_context, trace_id, name, attributes, links, events):
# Check if error
if attributes and attributes.get("error", False):
return SamplingResult(Decision.RECORD_AND_SAMPLE)
# Sample based on user tier
user_id = attributes.get("user.id", "")
if user_id.startswith("premium_"):
return SamplingResult(Decision.RECORD_AND_SAMPLE)
# Fall back to ratio-based for others
if trace_id % 100 < 10: # 10% sampling
return SamplingResult(Decision.RECORD_AND_SAMPLE)
return SamplingResult(Decision.DROP)
tracer_provider = TracerProvider(resource=resource, sampler=SmartSampler())This sampler captures all errors, all premium user requests, and 10% of others. You get complete visibility where it matters.
Collecting and Exporting: The OTLP Collector
The OpenTelemetry Protocol (OTLP) collector is the hub of your observability stack. Your services send traces to it; it processes, batches, and exports to backend systems.
Collector Configuration
Create collector-config.yaml:
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
send_batch_size: 512
timeout: 5s
send_batch_max_size: 1024
memory_limiter:
check_interval: 1s
limit_mib: 512
spike_limit_mib: 128
attributes:
actions:
- key: deployment.environment
value: production
action: upsert
exporters:
jaeger:
endpoint: jaeger-collector:14250
tls:
insecure: true
otlp:
endpoint: tempo:4317
tls:
insecure: true
awsxray:
region: us-east-1
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch, attributes]
exporters: [jaeger, otlp, awsxray]The pipeline:
- receivers/otlp: Accept gRPC and HTTP traces
- processors/batch: Collect spans, send in batches (efficient!)
- processors/memory_limiter: Don't explode memory if traces back up
- processors/attributes: Add environment metadata
- exporters: Send to Jaeger, Tempo, AWS X-Ray simultaneously
Run the collector:
docker run -p 4317:4317 -p 4318:4318 \
-v $(pwd)/collector-config.yaml:/etc/otel/config.yaml \
otel/opentelemetry-collector:latest \
--config=/etc/otel/config.yamlUpdating Your SDK to Use the Collector
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# Export to collector instead of Jaeger directly
otlp_exporter = OTLPSpanExporter(
endpoint="localhost:4317",
insecure=True
)
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)The collector handles the rest - batching, buffering, retrying, and exporting to multiple backends.
Visualization: Understanding Your Traces in Grafana Tempo
Grafana Tempo is a distributed tracing backend designed for scale. It queries traces by trace ID and integrates with Loki logs and Prometheus metrics.
Querying Traces
In Tempo's query interface, search by tags:
service.name = "ml-inference-pipeline"
root_span.duration > 2000
ml.reranking.latency_ms > 1000
error = true
Or jump directly to a trace by ID from your logs:
[timestamp] service=ml-inference trace_id=a1b2c3d4e5f6g7h8 duration=2450ms
Click the trace ID in your logs, and Tempo renders the waterfall.
Custom Dashboards
Create a Tempo dashboard showing:
graph TD
A["Trace Volume<br/>(requests/sec)"]
B["P95 Latency<br/>(by stage)"]
C["Error Rate<br/>(by service)"]
D["Critical Path Analysis<br/>(slowest stage)"]
E["Model Performance<br/>(inference latency)"]
F["Queue Depth<br/>(Kafka lag)"]
style B fill:#ff6b6b
style D fill:#ff6b6b
style F fill:#ffd43bAdd panels querying:
- Trace count:
count(traces where service.name="ml-inference-pipeline") - P95 latency:
quantile(0.95, root_span.duration) - Error rate:
count(traces where error=true) / count(traces) - Reranking latency:
quantile(0.95, ml.reranking.latency_ms)
Correlating with Metrics and Logs
Link traces to metrics in Prometheus and logs in Loki:
# In Grafana, set trace correlation:
# When you click a trace, it shows related metrics/logs
derivedFields:
- name: trace_id
matcherRegex: 'trace_id=(\w+)'
url: "/api/v1/traces/$${trace_id}"Now when you find a slow trace in Tempo, you can instantly jump to:
- The application logs for that trace
- The Prometheus metrics at that timestamp
- The Loki logs from the same time window
Production Considerations
Trace Retention
Tempo's local storage is ephemeral. For production:
# Use object storage
storage:
trace:
backend: s3
s3:
bucket: ml-traces-prod
endpoint: s3.amazonaws.com
access_key: ${AWS_ACCESS_KEY}
secret_key: ${AWS_SECRET_KEY}
# Configure retention
retention_duration: 720h # Keep 30 daysCost Optimization
Traces can be expensive. Optimize:
- Sampling: Use smart sampling to reduce volume
- Batching: The collector already does this; tune batch sizes
- Filtering: Drop low-value traces (health checks, static endpoints)
- Compression: OTLP exporter compresses by default
High-Throughput Setup
For 10K+ requests/sec:
processors:
batch:
send_batch_size: 1024
timeout: 1s
send_batch_max_size: 2048
memory_limiter:
check_interval: 100ms
limit_mib: 2048 # Allocate more memory
spike_limit_mib: 256Scale horizontally - deploy multiple collectors behind a load balancer.
Performance Debugging With Traces: Real-World Scenarios
Now that you understand the mechanics, let's apply tracing to real problems. This is where the value becomes obvious.
Scenario 1: Identifying Unexplained Latency
Your team reports that inference latency has increased from 2 seconds to 8 seconds. Your Prometheus graphs show CPU usage is normal, GPU utilization is high, and there are no obvious errors. What's happening?
With distributed tracing, you query Tempo for slow traces (duration > 5 seconds) and see:
- Preprocessing: 100ms (unchanged)
- Embedding: 150ms (unchanged)
- Reranking: 7000ms (was 1200ms - there's your problem!)
- Generation: 150ms (unchanged)
The reranking model got slower. Now you investigate: did the model size increase? Did a dependency upgrade break something? Did GPU memory pressure increase? Tracing pointed you directly to the problem.
Scenario 2: Queueing and Resource Contention
Your API servers are reporting 10 second latencies, but when you look at the traces, the actual processing time is only 2 seconds. Where are the other 8 seconds going?
Looking at the detailed traces, you see that the span "queue_wait" is 7950ms. The requests are sitting in a Kafka queue waiting for the retrieval service to process them. This tells you:
- Your embedding service can emit requests faster than your retrieval service can consume them
- You need to either speed up retrieval (optimize queries, add caching) or throttle embedding (add backpressure)
Without tracing, you'd be guessing. With tracing, you know exactly where the bottleneck is.
Scenario 3: Service Degradation Detection
One of your model serving replicas goes bad (maybe it's memory-leaked or hung). Without tracing, you'd see a 50% increase in average latency (since half your traffic hits the bad replica). You wouldn't know which replica is bad until you manually inspect each one.
With distributed tracing and automatic error tracking, you can see:
- Traces routed to replica A: 200ms latency, 0% errors
- Traces routed to replica B: 15000ms latency, 30% errors (timeout)
- Traces routed to replica C: 210ms latency, 0% errors
Now you know exactly which replica to restart.
Instrumentation Best Practices
Effective tracing isn't just about collecting data - it's about collecting useful data. Here are the patterns that work.
Span Naming Conventions
Use hierarchical span names that reflect the operation structure:
inference_request(root)preprocessingembeddingretrieve_documentsrerankinggenerationpostprocessing
This makes it easy to filter and aggregate traces by operation type.
Critical Attributes
Always log these attributes on key spans:
- model.name, model.version: Which model ran
- input.size, output.size: Data dimensions
- latency_ms: Operation duration
- error, error.type: Failures
- cache.hit: Whether an operation was cached
- user.id: For per-user analysis
These attributes enable powerful filtering and analytics.
Error Handling in Traces
When an operation fails, record it:
with tracer.start_as_current_span("model_inference") as span:
try:
result = model.forward(input_data)
span.set_attribute("status", "success")
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
raiseThis makes errors immediately visible in trace analysis.
Working Example: Complete Multi-Stage LLM Pipeline
Here's everything together - a realistic inference pipeline with proper instrumentation:
import asyncio
import json
import time
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.propagate import inject, extract
# Setup
resource = Resource.create({"service.name": "llm-pipeline"})
otlp_exporter = OTLPSpanExporter(endpoint="localhost:4317", insecure=True)
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)
class LLMInferencePipeline:
def __init__(self):
self.embedding_model = MockEmbeddingModel()
self.reranker_model = MockRerankerModel()
self.llm_model = MockLLMModel()
self.kafka_producer = MockKafkaProducer()
async def run(self, query: str, user_id: str) -> str:
with tracer.start_as_current_span("inference_request") as root:
root.set_attribute("user.id", user_id)
root.set_attribute("query.length", len(query))
# Stage 1: Embedding
embeddings = await self._embed_query(query)
# Stage 2: Async retrieval via Kafka
documents = await self._retrieve_documents_async(query)
# Stage 3: Reranking
ranked = await self._rerank(documents, embeddings)
# Stage 4: LLM generation
response = await self._generate(ranked, query)
return response
async def _embed_query(self, query: str):
with tracer.start_as_current_span("embed_query") as span:
span.set_attribute("model.name", "all-MiniLM-L6-v2")
start = time.time()
embedding = self.embedding_model.forward(query)
span.set_attribute("latency_ms", int((time.time() - start) * 1000))
return embedding
async def _retrieve_documents_async(self, query: str):
with tracer.start_as_current_span("retrieve_documents") as span:
# Inject context into Kafka headers
headers = {}
inject(headers)
# Send to Kafka (async)
self.kafka_producer.send_with_context("retrieval-queue", query, headers)
span.set_attribute("kafka.topic", "retrieval-queue")
# Wait for results
await asyncio.sleep(0.3)
return ["doc1", "doc2", "doc3"]
async def _rerank(self, documents: list, query_embedding):
with tracer.start_as_current_span("rerank_documents") as span:
span.set_attribute("candidate.count", len(documents))
span.set_attribute("model.name", "cross-encoder/mmarco")
start = time.time()
ranked = self.reranker_model.forward(documents, query_embedding)
span.set_attribute("latency_ms", int((time.time() - start) * 1000))
return ranked[:5]
async def _generate(self, context: list, query: str):
with tracer.start_as_current_span("generate_response") as span:
span.set_attribute("model.name", "meta-llama/Llama-2-7b-chat")
span.set_attribute("context.docs", len(context))
start = time.time()
response = self.llm_model.forward(context, query)
span.set_attribute("latency_ms", int((time.time() - start) * 1000))
span.set_attribute("output_tokens", 87)
return response
# Mock implementations for demo
class MockEmbeddingModel:
def forward(self, text):
time.sleep(0.1)
return [0.1] * 384
class MockRerankerModel:
def forward(self, docs, embedding):
time.sleep(1.2)
return [(doc, 0.95) for doc in docs]
class MockLLMModel:
def forward(self, context, query):
time.sleep(0.8)
return "This is a generated response based on the provided context."
class MockKafkaProducer:
def send_with_context(self, topic, message, headers):
print(f"Sending to {topic}: {message}")
# Run it
async def main():
pipeline = LLMInferencePipeline()
response = await pipeline.run("What is distributed tracing?", "user_456")
print(response)
asyncio.run(main())Run this, and traces flow to your collector. Visualize in Tempo: you'll see the full waterfall showing each stage, latencies, and the critical path.
The Economics of Observability: When to Instrument and When to Sample
Building comprehensive observability into ML pipelines isn't free. Spans consume resources. Trace export consumes bandwidth. Storing traces indefinitely consumes storage. Most teams need to make intentional choices about what to instrument, what to sample, and what to simply let go unobserved. Getting these choices right means you get maximum value from your observability investment without running into budget constraints.
The first decision is breadth versus depth. You could instrument every single operation - every GPU memory allocation, every network packet, every database query. But this creates massive trace volume. A single request through an LLM pipeline might generate thousands of spans. At 10,000 requests per second, you're looking at 100 million spans per second. Even with aggressive sampling, that's enormous. Most teams instrument the critical path deeply and leave everything else lightly instrumented or unobserved. The critical path is the set of operations that most directly impact the user-visible metric you care about - usually latency. For an LLM inference pipeline, the critical path includes embedding, reranking, and generation. It doesn't include internal vector database-choosing-operating-scale) operations or metadata lookups unless those are blocking the user request.
The second decision is sampling strategy. We covered sampling earlier - head-based, tail-based, and smart sampling. In practice, most teams use a hybrid approach. They do head-based sampling to keep volume manageable, but they increase the sampling rate for certain conditions: all errors, all requests from premium customers, all requests that exceed latency thresholds. This lets you keep overall volume down while ensuring you capture the signals that matter most. The tricky part is tuning the thresholds and sampling rates. Too much sampling and you'll blow out your trace storage budget. Too little and you'll miss important signals. You need observability into your observability itself - tracking how much sampling you're doing, what you're keeping, and what you're discarding. This is harder than it sounds.
The third decision is retention. How long do you keep traces? A week? A month? A year? Cloud trace storage services typically charge per GB-month. If you're sampling 10% of requests and averaging 1 KB per span, you could be looking at terabytes of trace data per year. Many teams keep high-detail traces (every attribute, every span) for 7 days, then aggregate traces (remove sensitive attributes, drop non-critical spans) and keep those for 30 days, then drop traces older than 30 days entirely unless they're relevant to an ongoing investigation. This tiered approach keeps costs manageable while providing good visibility for recent issues.
The fourth decision is what attributes to log. You could log every field in every request - customer ID, feature values, model inputs, model outputs, predictions, everything. But this creates two problems: privacy (you're storing sensitive data in your traces), and volume (more attributes means larger spans and more storage). Most teams log a minimal set of attributes for every span - timestamp, latency, status, error type - and then log additional attributes selectively for debugging purposes. This means that for 99% of normal traffic, traces are small and cheap. When you need to debug a specific issue, you can increase logging verbosity for that specific service or request.
Advanced Patterns: Tracing for Cost Optimization and Model Performance
Once you have basic distributed tracing in place, you can use it for purposes beyond just debugging. Traces are incredibly rich data. If you analyze them systematically, you can extract insights about cost, performance, and user experience.
For cost optimization, you can use traces to understand where compute dollars are being spent. You can see that reranking is using 40% of your GPU resources despite being only 20% of your latency. Maybe you should invest in a faster reranking model. You can see that embeddings are being recomputed repeatedly for the same queries because your caching is ineffective. Maybe you should improve the cache. You can identify specific user cohorts or request patterns that are disproportionately expensive to serve. Maybe you should throttle them differently or charge them appropriately.
For model performance, traces help you understand which parts of your pipeline contribute to accuracy versus latency. You might find that adding more reranking candidates improves accuracy by 2% but increases latency by 30%. Is that tradeoff worth it? Traces let you measure it empirically. You can see that for certain user cohorts, reranking is ineffective - it's not reordering the rankings. Maybe you need different reranking models for different cohorts.
For user experience, you can correlate trace latency with user satisfaction metrics. You might find that once latency exceeds 1 second, user satisfaction drops dramatically, but below 500ms there's no marginal benefit. This helps you set realistic SLOs and optimize toward what matters rather than toward meaningless targets.
Getting Started: The Minimal Viable Tracing Setup
You don't need to implement everything in this article on day one. In fact, most teams do better if they start minimal and expand gradually. Here's a realistic timeline for a team new to OpenTelemetry:
Week 1: Install the SDK, instrument your main request handler with a root span, export to a local Jaeger instance. Don't worry about sampling or complex instrumentation. Just get traces flowing into a backend.
Week 2: Instrument your critical path operations. Add spans for the expensive operations that dominate latency. Set custom attributes on those spans to capture what's happening.
Week 3: Set up the OTLP collector and export to your chosen backend (Jaeger, Tempo, or DataDog). Move from local development to production-ish setup.
Week 4: Implement context propagation for your async operations (Kafka, Ray, etc.). This is usually the point where you discover your architecture has implicit synchronization points that weren't obvious before.
Week 5: Add logging/metrics correlation. Hook your existing logging infrastructure to include trace IDs so you can jump from a log line to its full trace.
Week 6+: Iterate. Tune sampling strategies. Investigate slow traces. Optimize based on what you learn. Add more instrumentation as you understand what questions you want to answer.
This timeline assumes a focused effort by 1-2 engineers. The goal is to get basic observability working and then iterate based on what you learn.
Implementation Beyond the Basics: What Real Teams Build
After you've solved the immediate infrastructure problems, you start building domain-specific tracing that captures the subtle performance characteristics of your ML workload. You discover that generic tracing isn't enough when you're trying to understand why a particular model's latency behaves differently than others.
One pattern that emerges is model-specific span hierarchy. Rather than treating all model inference as a black box, you might add spans for model loading, weights initialization, input preprocessing at the model level, forward pass, and output postprocessing. You instrument cache hits and misses explicitly. You track GPU memory allocation and deallocation. You trace KV cache interactions in transformer models. This level of detail is invisible to generic instrumentation but critical for optimizing ML inference.
Another consideration is multi-model orchestration. Many systems don't run a single model; they orchestrate multiple models in sequence. An embedding model might feed results into a ranker, which feeds results into a generator. Each model has different resource characteristics. The embedding might be memory-bound while the generator is compute-bound. Understanding how they interact, where resources are contended, and which stages become bottlenecks requires tracing at the orchestration level.
You also discover the importance of external service correlation. Your ML pipeline doesn't live in isolation. It calls vector databases, feature stores, model serving-inference-server-multi-model-serving) infrastructure, and external APIs. Tracing these interactions becomes essential. You might find that 200ms of your 250ms p99 latency is spent waiting for the feature store, not running models. Without traces that cross service boundaries, you'd never know.
Integration with feature stores presents its own challenges. Modern feature stores like Feast and Tecton track lineage and metadata. OpenTelemetry spans can complement this by showing which features were requested, how long retrieval took, cache hit rates, and whether features were available in the offline or online store. This information drives optimization decisions about which features to cache and how to prioritize retrieval.
Advanced Instrumentation: Building ML-Aware Tracing
Beyond basic span instrumentation lies ML-specific tracing patterns that capture the nuances of model serving. You might add spans for batch size decisions, dynamic batching queue depth, and prefill versus decode phase separation in LLM serving. These aren't generic tracing concerns; they're specific to how modern inference systems work.
A crucial pattern is tracking model state across requests. Some inference systems maintain model state - cached embeddings, running aggregations, or intermediate results. Tracing this state evolution lets you detect memory leaks and understand resource accumulation patterns that could lead to gradual degradation.
You also discover the value of tracing data transformation pipelines. ML models are sensitive to input shape, dtype, and numerical ranges. Tracing transformations - normalization, padding, tokenization, quantization-pipeline-automated-model-compression)-production-inference-deployment)-llms) - reveals whether data is being processed correctly and whether transformations are expensive. You might find that your quantization step takes more time than inference itself, suggesting an opportunity for optimization.
Error context becomes increasingly important at scale. When a span indicates an error, you want rich context. Not just the exception message, but what the model was trying to do, what the input was, what state the system was in. This helps distinguish between transient errors (retry-able) and systematic failures (requires investigation).
Finally, you start tracking end-to-end metrics that span multiple services. Latency is one dimension, but you also care about throughput, error rates, cache efficiency, and resource utilization. Traces provide the granular data to compute these metrics. You correlate trace data with metrics and logs to paint a complete picture of system behavior.
The Path to Observability Excellence: Building on Your Traces
Having comprehensive trace data is one thing. Using it effectively is another. The most successful organizations don't just collect traces; they build a culture of observability where traces inform every decision about system design and optimization.
Start by identifying your most expensive operations. Use traces to measure what's actually slow in production, not what you think is slow. The embedding model you optimized heavily might actually take only 10 percent of your latency. The semantic search you thought was optimized might be consuming 60 percent. Traces reveal these surprises.
Once you've identified expensive operations, instrument them more heavily. Add spans for suboperations. If vector database retrieval is expensive, trace the initialization, querying, and result processing separately. If model inference is slow, trace the model loading, input preparation, forward pass, and output decoding. This deeper instrumentation guides optimization efforts.
Create SLO dashboards based on trace data. What's the p50, p99, and p999 latency for your critical user journey? Track this over time. Set targets for what's acceptable. When you fall below target, investigate using traces. When you exceed target, use traces to understand why.
Share trace insights with your organization. When you discover that reranking is the bottleneck, communicate this to the team optimizing the reranker. When you find that users in certain regions experience poor latency, investigate whether it's network-based or compute-based using trace data. Traces become the source of truth for performance discussions.
Another powerful pattern is using traces to drive infrastructure changes. You might discover that a certain percentage of requests timeout in a way that indicates network flakiness between services. This drives a decision to increase timeouts or add retries. Or you might discover that certain model calls are orders of magnitude slower than others, revealing that you have some instances on slow hardware. This drives a decision to re-provision hardware or balance load differently.
Traces also reveal opportunities for caching. If you see the same embedding model query happening repeatedly, that's a caching opportunity. If you see the same retrieval happening for different requests, that's also a caching opportunity. Making these opportunities explicit through trace analysis helps prioritize optimization work.
Finally, traces reveal architectural inefficiencies. You might discover that your pipeline serializes operations that could be parallelized. One trace shows that preprocessing takes 50ms, then embedding takes 100ms, then reranking takes 200ms, for a total of 350ms. But these could be parallelized in some ways. Discovering this through traces drives architectural improvements.
Operational Excellence Through Continuous Monitoring
Trace data enables a level of operational visibility that's impossible with metrics alone. You can detect cascading failures, understand user impact, and diagnose problems minutes after they occur instead of hours or days.
Set up alerts triggered by trace patterns. An alert for high latency is basic. An alert for "latency increasing while error rate stays low" suggests resource contention or external dependencies slowing down, not application errors. An alert for "high error rate on reranking service but low error rate on embedding service" pinpoints the problem to a specific service.
Use traces to implement circuit breakers and graceful degradation. If your reranker is timing out, you could detect this through traces and skip reranking for subsequent requests, returning unranked results instead. This degrades user experience marginally while protecting system stability. Traces help you understand what "marginal" degradation means in your specific system.
Build runbooks based on trace data patterns. "If you see this trace pattern, take this action." When a new incident occurs, you can pattern-match against historical trace data to find similar incidents and the actions that resolved them. This accelerates incident response and prevents knowledge loss when engineers leave.
Summary
Distributed tracing transforms ML pipeline debugging from black-box guessing to surgical precision. With OpenTelemetry, you:
- Instrument every stage with spans carrying custom attributes
- Propagate context across sync/async/distributed boundaries
- Sample intelligently to reduce volume without losing signals
- Export to scalable backends like Grafana Tempo via the OTLP collector
- Correlate traces with metrics and logs for complete observability
- Use traces for optimization, not just debugging
The next time your inference pipeline slows down, you won't debug blind. You'll see exactly which stage is the bottleneck, which model version caused the regression, and which user cohorts experience the worst performance.
Start with a single service, add spans for your critical path, and watch your visibility improve. Your observability investment will pay for itself the first time you debug a production issue in minutes instead of hours.