Observability for LLM Applications: Tracing, Costs, and Drift Detection
February 20, 2026 · 5 min read
LLM applications fail in ways traditional monitoring doesn't catch. A response can be grammatically correct but factually wrong. Latency can spike because the model is generating a 3000-token response instead of 300. Prompt drift — where a model's behavior changes after a version update — is invisible without a baseline.
Observability for LLM apps requires tracing individual calls, tracking costs, and continuously comparing outputs against a quality baseline.
LangFuse: Open-Source LLM Observability
LangFuse is a self-hostable observability platform with a generous free tier:
pip install langfuse anthropicfrom langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
import anthropic
import os
langfuse = Langfuse(
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
host=os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com"),
)
client = anthropic.Anthropic()
@observe(name="summarize_article")
def summarize_article(article_text: str, max_words: int = 150) -> str:
# Update trace metadata
langfuse_context.update_current_observation(
input={"article_length": len(article_text), "max_words": max_words},
metadata={"user_id": "user-123"},
)
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=500,
messages=[{
"role": "user",
"content": f"Summarize in {max_words} words: {article_text}"
}]
)
summary = response.content[0].text
# Track token usage
langfuse_context.update_current_observation(
usage={
"input": response.usage.input_tokens,
"output": response.usage.output_tokens,
},
output=summary,
)
return summaryEvery call is now traced with input, output, latency, and token usage visible in the LangFuse dashboard.
Manual Tracing with Spans
For complex multi-step pipelines, add nested spans:
from langfuse.decorators import observe
@observe(name="rag_pipeline")
def answer_question(question: str) -> str:
# Retrieval span
with langfuse_context.new_span(name="retrieval") as span:
chunks = retrieve_documents(question)
span.update(
input={"query": question},
output={"chunk_count": len(chunks)},
)
# Generation span
with langfuse_context.new_span(name="generation") as span:
context = "\n".join([c["text"] for c in chunks])
answer = generate_answer(question, context)
span.update(
input={"question": question, "context_length": len(context)},
output={"answer_length": len(answer)},
)
return answerToken Cost Tracking
Track costs across model versions and over time:
from dataclasses import dataclass
from datetime import datetime
import json
# Pricing as of 2026 ($/MTok)
MODEL_PRICING = {
"claude-opus-4-6": {"input": 3.00, "output": 15.00},
"claude-sonnet-4-6": {"input": 0.80, "output": 4.00},
"claude-haiku-4-5-20251001": {"input": 0.25, "output": 1.25},
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
}
@dataclass
class LLMCall:
model: str
endpoint: str
input_tokens: int
output_tokens: int
latency_ms: int
timestamp: str = None
def __post_init__(self):
self.timestamp = self.timestamp or datetime.utcnow().isoformat()
@property
def cost_usd(self) -> float:
pricing = MODEL_PRICING.get(self.model, {"input": 0, "output": 0})
return (
(self.input_tokens / 1_000_000) * pricing["input"] +
(self.output_tokens / 1_000_000) * pricing["output"]
)
class CostTracker:
def __init__(self, budget_daily_usd: float = 10.0):
self._calls: list[LLMCall] = []
self._budget = budget_daily_usd
def record(self, call: LLMCall):
self._calls.append(call)
self._check_budget()
def _check_budget(self):
today = datetime.utcnow().date().isoformat()
daily_cost = sum(c.cost_usd for c in self._calls if c.timestamp.startswith(today))
if daily_cost > self._budget * 0.8:
print(f"⚠️ Daily LLM budget at {(daily_cost/self._budget)*100:.0f}% (${daily_cost:.2f}/${self._budget})")
def summary(self) -> dict:
return {
"total_calls": len(self._calls),
"total_cost_usd": sum(c.cost_usd for c in self._calls),
"total_input_tokens": sum(c.input_tokens for c in self._calls),
"total_output_tokens": sum(c.output_tokens for c in self._calls),
"avg_latency_ms": sum(c.latency_ms for c in self._calls) / max(len(self._calls), 1),
"by_model": self._breakdown_by_model(),
}
def _breakdown_by_model(self) -> dict:
models = {}
for call in self._calls:
if call.model not in models:
models[call.model] = {"calls": 0, "cost": 0}
models[call.model]["calls"] += 1
models[call.model]["cost"] += call.cost_usd
return models
tracker = CostTracker(budget_daily_usd=20.0)Prometheus Metrics for LLM Calls
Export LLM metrics to Prometheus for Grafana dashboards:
from prometheus_client import Counter, Histogram, Gauge
llm_requests_total = Counter(
"llm_requests_total",
"Total LLM API requests",
["model", "endpoint", "status"]
)
llm_request_duration_seconds = Histogram(
"llm_request_duration_seconds",
"LLM request latency",
["model", "endpoint"],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0]
)
llm_tokens_total = Counter(
"llm_tokens_total",
"Total tokens consumed",
["model", "type"] # type: input/output
)
llm_cost_dollars_total = Counter(
"llm_cost_dollars_total",
"Estimated USD cost of LLM calls",
["model"]
)
def tracked_llm_call(model: str, endpoint: str, call_fn):
import time
start = time.time()
try:
response = call_fn()
status = "success"
llm_tokens_total.labels(model=model, type="input").inc(response.usage.input_tokens)
llm_tokens_total.labels(model=model, type="output").inc(response.usage.output_tokens)
call = LLMCall(
model=model, endpoint=endpoint,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
latency_ms=int((time.time() - start) * 1000)
)
llm_cost_dollars_total.labels(model=model).inc(call.cost_usd)
return response
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start
llm_requests_total.labels(model=model, endpoint=endpoint, status=status).inc()
llm_request_duration_seconds.labels(model=model, endpoint=endpoint).observe(duration)Drift Detection
Prompt drift occurs when a model update or prompt change causes output quality to degrade. Detect it by comparing against a baseline:
import json
import numpy as np
from pathlib import Path
class DriftDetector:
def __init__(self, baseline_file: str):
self.baseline_file = Path(baseline_file)
self.baseline = self._load_baseline()
def _load_baseline(self) -> dict:
if self.baseline_file.exists():
return json.loads(self.baseline_file.read_text())
return {}
def save_baseline(self, metrics: dict):
self.baseline_file.write_text(json.dumps(metrics, indent=2))
def detect_drift(self, current_metrics: dict) -> list[dict]:
alerts = []
for metric_name, current_value in current_metrics.items():
baseline_value = self.baseline.get(metric_name)
if baseline_value is None:
continue
# Alert if metric drops by more than 10%
if isinstance(current_value, float) and current_value < baseline_value * 0.9:
alerts.append({
"metric": metric_name,
"baseline": baseline_value,
"current": current_value,
"drop_pct": (baseline_value - current_value) / baseline_value * 100,
})
# Alert if latency increases by more than 50%
if metric_name.endswith("_latency_ms") and current_value > baseline_value * 1.5:
alerts.append({
"metric": metric_name,
"baseline": baseline_value,
"current": current_value,
"increase_pct": (current_value - baseline_value) / baseline_value * 100,
})
return alerts
def run_quality_check(golden_inputs: list[dict]) -> dict:
"""Run golden dataset evaluation and return metrics."""
results = []
for item in golden_inputs:
response = summarize_article(item["text"])
# Simple keyword coverage metric
keyword_hits = sum(1 for kw in item["expected_keywords"] if kw.lower() in response.lower())
results.append(keyword_hits / len(item["expected_keywords"]))
return {
"keyword_coverage": np.mean(results),
"min_coverage": np.min(results),
"p10_coverage": np.percentile(results, 10),
}Alerting
# Alert on quality degradation
def send_drift_alert(alerts: list[dict], channel: str = "#llm-monitoring"):
if not alerts:
return
import httpx
message = f"⚠️ LLM Quality Drift Detected\n"
for alert in alerts:
if "drop_pct" in alert:
message += f"• {alert['metric']}: dropped {alert['drop_pct']:.1f}% (was {alert['baseline']:.2f}, now {alert['current']:.2f})\n"
else:
message += f"• {alert['metric']}: increased {alert['increase_pct']:.1f}%\n"
httpx.post(
os.environ["SLACK_WEBHOOK_URL"],
json={"text": message, "channel": channel}
)Observability for LLM applications is not optional in production — it's how you know the system is working. Costs without tracking grow silently. Quality without evaluation degrades invisibly. Latency without metrics spikes undetected. The instrumentation overhead is minimal; the operational insight is essential.