Observability¶
The pipeline uses a three-layer observability stack: structured logging (structlog), LLM tracing (Langfuse), and error tracking (Sentry). All three are initialized idempotently by init_telemetry() in src/telemetry.py.
Telemetry Initialization¶
Every @flow function and the dashboard must call init_telemetry() at startup. This function is safe to call multiple times -- only the first invocation has any effect.
from src.telemetry import init_telemetry
init_telemetry() # Activates structlog, optionally Sentry and Langfuse
The function checks environment variables to decide which backends to enable:
| Backend | Required Variable | Effect When Missing |
|---|---|---|
| structlog | (always active) | JSON output in non-local environments |
| Sentry | SENTRY_DSN |
Error tracking disabled |
| Langfuse | LANGFUSE_PUBLIC_KEY |
LLM tracing disabled |
Structured Logging (structlog)¶
All logging uses structlog with bound loggers. Key conventions:
- JSON output in staging/production, human-readable in local development
- Context binding via
bind_article(article_id)at the start of each article flow, cleared infinallywithclear_contextvars() - Standard fields:
agent_name,model,tokens_in,tokens_out,cost_usd,duration_ms,tool_iterations,cache_read_tokens,cache_creation_tokens
Every agent_call_complete log event includes the full cost and performance breakdown for that LLM call.
Langfuse Integration¶
Langfuse provides LLM-specific observability: traces, generations, sessions, tags, and scores.
Architecture¶
Flow (root span)
|
+-- Agent call 1 (generation observation)
| +-- input: message count, prompt length
| +-- output: response preview (500 chars)
| +-- usage: input/output tokens
| +-- cost: total USD
| +-- metadata: content_type, prompt_version, schema_class, cache stats
|
+-- Agent call 2 (generation observation)
| ...
|
+-- Quality scores (posted to root span)
+-- factuality_iter1, factuality_iter2, ...
+-- seo_passed_iter1, seo_passed_iter2, ...
+-- style_passed_iter1, style_passed_iter2, ...
Trace Structure¶
Each Prefect flow creates a root Langfuse span. Agent calls within that flow create nested generation observations automatically via Langfuse's context manager pattern.
Root spans are created with lf.start_as_current_observation(as_type="span"):
| Flow | Span Name | Session ID Pattern | Tags |
|---|---|---|---|
produce_article |
produce_article |
article:{article_id} |
["article", content_type, language] |
produce_digests |
produce_digests |
digest:{uuid} |
["digest"] |
research_news_scan |
research_news_scan |
scan:{uuid} |
["research-scan"] |
Generation Observations¶
Every call_agent() invocation creates a generation observation (when Langfuse is configured):
Input recorded:
messages_count: number of conversation messagessystem_prompt_len: character length of the system prompt
Output recorded:
- First 500 characters of the response text
Usage recorded:
input: input tokensoutput: output tokens
Cost recorded:
total: calculated USD cost (including cache savings)
Metadata recorded:
content_type: prompt content typeprompt_version: active prompt version numberschema_class: structured output schema title (if applicable)duration_ms: wall-clock timecache_read_tokens: tokens served from Anthropic's prompt cachecache_creation_tokens: tokens written to cachetool_iterations: number of tool use loop cycles
Session Grouping¶
Sessions group related traces in the Langfuse UI:
- Article sessions:
article:{article_id}-- all LLM calls for one article production run - Digest sessions:
digest:{uuid}-- all calls for one digest production batch - Scan sessions:
scan:{uuid}-- all calls for one research news scan
This allows filtering the Langfuse dashboard by article to see the complete trace of research, writing, quality gates, and synthesis.
Quality Scores¶
Quality gate results are posted as Langfuse scores on the article's root span. This happens after every quality gate evaluation (not just the final iteration), providing visibility into quality progression:
span.score(name="factuality_iter1", value=85.0, data_type="NUMERIC")
span.score(name="seo_passed_iter1", value=1.0, data_type="NUMERIC")
span.score(name="style_passed_iter1", value=0.0, data_type="NUMERIC")
Granular Sub-Scores¶
In addition to top-level pass/fail scores, granular sub-scores from each quality gate are posted:
- Factuality:
claims_checked,claims_verified,claims_flagged - SEO:
keyword_density,secondary_keywords,word_count,heading_hierarchy,faq_present - Style:
readability,structure,voice,humanizer,medical_language
Synthesis feedback is also posted as a string score (synthesis_feedback_iter1) containing the change log, enabling post-hoc analysis of what gates requested and how the writer responded.
Error Handling¶
All Langfuse operations are wrapped in try/except blocks. Failures are logged as warnings but never interrupt the pipeline. If Langfuse is down or misconfigured, the pipeline continues with degraded observability.
Each flow calls lf.flush() in its finally block to ensure pending observations are sent before the flow exits.
Context Propagation¶
Langfuse context is propagated using langfuse.propagate_attributes():
from langfuse import propagate_attributes
with propagate_attributes(session_id="article:123", tags=["article", "satellite", "en"]):
# All call_agent() invocations here inherit session_id and tags
...
This context manager is entered after the root span is created and ensures all nested generation observations inherit the session and tag metadata.
Sentry Integration¶
Sentry provides error tracking and exception monitoring:
- Integrations:
HttpxIntegration(HTTP client errors),SqlalchemyIntegration(database errors) - Traces sample rate: 0.0 (performance tracing disabled to avoid cost -- Langfuse handles LLM tracing)
- Environment: set from
settings.environment - Server name: set from
SENTRY_APP_NAME(distinguishes worker from dashboard) - Release: set from
SENTRY_RELEASEenvironment variable (typically set by CI/CD) - Filtered events: Prefect infrastructure exceptions (
PrefectHTTPStatusError,ObjectAlreadyExists) are filtered viabefore_sendto reduce Sentry noise from expected transient errors
Sentry context is enriched during agent validation failures:
sentry_sdk.set_context("agent_validation_error", {
"agent_name": agent_name,
"schema": model_class.__name__,
"response_preview": response_content[:500],
})
Configuration¶
Environment Variables¶
| Variable | Required | Default | Description |
|---|---|---|---|
LANGFUSE_PUBLIC_KEY |
No | (disabled) | Langfuse public key |
LANGFUSE_SECRET_KEY |
No | -- | Langfuse secret key |
LANGFUSE_HOST |
No | https://cloud.langfuse.com |
Langfuse host URL |
SENTRY_DSN |
No | (disabled) | Sentry DSN |
SENTRY_APP_NAME |
No | -- | Application name for Sentry server_name |
SENTRY_RELEASE |
No | -- | Release identifier |
Dashboard¶
The dashboard also calls init_telemetry() after the Streamlit secrets bridge block. This enables structured logging and Sentry for the dashboard process (Langfuse is not used in the dashboard since it has no LLM calls).