Skip to content

Observability

The pipeline uses a three-layer observability stack: structured logging (structlog), LLM tracing (Langfuse), and error tracking (Sentry). All three are initialized idempotently by init_telemetry() in src/telemetry.py.

Telemetry Initialization

Every @flow function and the dashboard must call init_telemetry() at startup. This function is safe to call multiple times -- only the first invocation has any effect.

from src.telemetry import init_telemetry

init_telemetry()  # Activates structlog, optionally Sentry and Langfuse

The function checks environment variables to decide which backends to enable:

Backend Required Variable Effect When Missing
structlog (always active) JSON output in non-local environments
Sentry SENTRY_DSN Error tracking disabled
Langfuse LANGFUSE_PUBLIC_KEY LLM tracing disabled

Structured Logging (structlog)

All logging uses structlog with bound loggers. Key conventions:

  • JSON output in staging/production, human-readable in local development
  • Context binding via bind_article(article_id) at the start of each article flow, cleared in finally with clear_contextvars()
  • Standard fields: agent_name, model, tokens_in, tokens_out, cost_usd, duration_ms, tool_iterations, cache_read_tokens, cache_creation_tokens

Every agent_call_complete log event includes the full cost and performance breakdown for that LLM call.

Langfuse Integration

Langfuse provides LLM-specific observability: traces, generations, sessions, tags, and scores.

Architecture

Flow (root span)
  |
  +-- Agent call 1 (generation observation)
  |     +-- input: message count, prompt length
  |     +-- output: response preview (500 chars)
  |     +-- usage: input/output tokens
  |     +-- cost: total USD
  |     +-- metadata: content_type, prompt_version, schema_class, cache stats
  |
  +-- Agent call 2 (generation observation)
  |     ...
  |
  +-- Quality scores (posted to root span)
        +-- factuality_iter1, factuality_iter2, ...
        +-- seo_passed_iter1, seo_passed_iter2, ...
        +-- style_passed_iter1, style_passed_iter2, ...

Trace Structure

Each Prefect flow creates a root Langfuse span. Agent calls within that flow create nested generation observations automatically via Langfuse's context manager pattern.

Root spans are created with lf.start_as_current_observation(as_type="span"):

Flow Span Name Session ID Pattern Tags
produce_article produce_article article:{article_id} ["article", content_type, language]
produce_digests produce_digests digest:{uuid} ["digest"]
research_news_scan research_news_scan scan:{uuid} ["research-scan"]

Generation Observations

Every call_agent() invocation creates a generation observation (when Langfuse is configured):

Input recorded:

  • messages_count: number of conversation messages
  • system_prompt_len: character length of the system prompt

Output recorded:

  • First 500 characters of the response text

Usage recorded:

  • input: input tokens
  • output: output tokens

Cost recorded:

  • total: calculated USD cost (including cache savings)

Metadata recorded:

  • content_type: prompt content type
  • prompt_version: active prompt version number
  • schema_class: structured output schema title (if applicable)
  • duration_ms: wall-clock time
  • cache_read_tokens: tokens served from Anthropic's prompt cache
  • cache_creation_tokens: tokens written to cache
  • tool_iterations: number of tool use loop cycles

Session Grouping

Sessions group related traces in the Langfuse UI:

  • Article sessions: article:{article_id} -- all LLM calls for one article production run
  • Digest sessions: digest:{uuid} -- all calls for one digest production batch
  • Scan sessions: scan:{uuid} -- all calls for one research news scan

This allows filtering the Langfuse dashboard by article to see the complete trace of research, writing, quality gates, and synthesis.

Quality Scores

Quality gate results are posted as Langfuse scores on the article's root span. This happens after every quality gate evaluation (not just the final iteration), providing visibility into quality progression:

span.score(name="factuality_iter1", value=85.0, data_type="NUMERIC")
span.score(name="seo_passed_iter1", value=1.0, data_type="NUMERIC")
span.score(name="style_passed_iter1", value=0.0, data_type="NUMERIC")

Granular Sub-Scores

In addition to top-level pass/fail scores, granular sub-scores from each quality gate are posted:

  • Factuality: claims_checked, claims_verified, claims_flagged
  • SEO: keyword_density, secondary_keywords, word_count, heading_hierarchy, faq_present
  • Style: readability, structure, voice, humanizer, medical_language

Synthesis feedback is also posted as a string score (synthesis_feedback_iter1) containing the change log, enabling post-hoc analysis of what gates requested and how the writer responded.

Error Handling

All Langfuse operations are wrapped in try/except blocks. Failures are logged as warnings but never interrupt the pipeline. If Langfuse is down or misconfigured, the pipeline continues with degraded observability.

Each flow calls lf.flush() in its finally block to ensure pending observations are sent before the flow exits.

Context Propagation

Langfuse context is propagated using langfuse.propagate_attributes():

from langfuse import propagate_attributes

with propagate_attributes(session_id="article:123", tags=["article", "satellite", "en"]):
    # All call_agent() invocations here inherit session_id and tags
    ...

This context manager is entered after the root span is created and ensures all nested generation observations inherit the session and tag metadata.

Sentry Integration

Sentry provides error tracking and exception monitoring:

  • Integrations: HttpxIntegration (HTTP client errors), SqlalchemyIntegration (database errors)
  • Traces sample rate: 0.0 (performance tracing disabled to avoid cost -- Langfuse handles LLM tracing)
  • Environment: set from settings.environment
  • Server name: set from SENTRY_APP_NAME (distinguishes worker from dashboard)
  • Release: set from SENTRY_RELEASE environment variable (typically set by CI/CD)
  • Filtered events: Prefect infrastructure exceptions (PrefectHTTPStatusError, ObjectAlreadyExists) are filtered via before_send to reduce Sentry noise from expected transient errors

Sentry context is enriched during agent validation failures:

sentry_sdk.set_context("agent_validation_error", {
    "agent_name": agent_name,
    "schema": model_class.__name__,
    "response_preview": response_content[:500],
})

Configuration

Environment Variables

Variable Required Default Description
LANGFUSE_PUBLIC_KEY No (disabled) Langfuse public key
LANGFUSE_SECRET_KEY No -- Langfuse secret key
LANGFUSE_HOST No https://cloud.langfuse.com Langfuse host URL
SENTRY_DSN No (disabled) Sentry DSN
SENTRY_APP_NAME No -- Application name for Sentry server_name
SENTRY_RELEASE No -- Release identifier

Dashboard

The dashboard also calls init_telemetry() after the Streamlit secrets bridge block. This enables structured logging and Sentry for the dashboard process (Langfuse is not used in the dashboard since it has no LLM calls).