Pipeline Flows¶
All flows are orchestrated with Prefect 3.x and defined in src/pipeline/.
Flow Overview¶
| Flow | Module | Trigger | Description |
|---|---|---|---|
generate_briefs |
brief_flow.py |
Manual | Creates article briefs from backlog items |
produce_article |
article_flow.py |
Manual | Full lifecycle: research, write, quality gates, review |
batch_produce |
batch_flow.py |
Manual | Parallel article production with bounded concurrency |
resume_article |
resume_flow.py |
Manual | Resumes article production from current status |
recover_stuck_articles |
resume_flow.py |
Every 30 min | Periodic sweep: finds stuck articles and triggers resume |
publish_articles |
publish_flow.py |
Manual | Publishes approved articles to WordPress as drafts |
republish_articles |
publish_flow.py |
Manual | Re-publishes already-published articles (partial publish recovery) |
research_news_scan |
research_news_flow.py |
Weekly (Sun 6AM UTC) | Scans RSS feeds, triages relevance, triggers digest production |
produce_digests |
research_news_flow.py |
Sub-flow / Manual | Produces per-language digest articles from newsworthy items |
Article Lifecycle Status Machine¶
Articles progress through a defined state machine enforced by src/pipeline/status.py:
stateDiagram-v2
[*] --> brief_generating
brief_generating --> brief_pending
brief_pending --> brief_approved
brief_approved --> researching
researching --> writing
writing --> quality_check
quality_check --> synthesizing
synthesizing --> ready_for_review
synthesizing --> writing: rewrite loop
ready_for_review --> approved
approved --> published
brief_generating --> failed
brief_pending --> failed
brief_approved --> failed
researching --> failed
writing --> failed
quality_check --> failed
synthesizing --> failed
ready_for_review --> failed
approved --> failed
The brief_generating status is set when the brief generator agent begins work, preventing premature UI actions (approve/reject) on articles that do not yet have a brief.
Article Production Flow¶
The produce_article flow is the core pipeline. It chains the following steps:
graph TD
A[Brief Approved] --> B[Researcher]
B --> C[Writer]
C --> D[Quality Gates]
D --> E{All pass?}
E -->|No| F[Synthesis]
F --> G[Rewrite]
G --> D
E -->|Yes| H[Image Generation]
H --> I[Ready for Review]
Quality gates run in parallel (config-driven per content type):
- Factuality Checker -- validates medical claims against research dossier (mandatory)
- SEO Optimizer -- deterministic checks + LLM evaluation of keyword usage, heading structure, meta descriptions (skipped for
research_news) - Style Checker -- readability, structure compliance, voice/tone, AI-pattern detection, medical language (mandatory)
The synthesis agent reconciles feedback from all quality gates and produces a revised draft. If any gate fails, the writer rewrites the article (up to 3 iterations). After all quality gates pass, image generation runs for satellite and cornerstone articles (skipped for research news).
Quality Gate Configuration¶
Active gates are configured per content type in src/config.py:
| Content Type | Active Gates |
|---|---|
satellite |
factuality, seo, style |
cornerstone |
factuality, seo, style |
research_news |
factuality, style |
Score thresholds are also content-type-specific:
| Content Type | Pass Threshold |
|---|---|
satellite |
70 |
cornerstone |
85 |
research_news |
70 |
Langfuse Trace Integration¶
Each produce_article run creates a Langfuse root span with:
- Session ID:
article:{article_id}-- groups all LLM calls for one article - Tags:
["article", content_type, language]-- for filtering in the Langfuse UI - Quality scores: factuality, SEO, and style scores posted per iteration
- Input/output: article context at start, pass/fail result at end
See Observability for the full Langfuse integration details.
Brief Generation Flow¶
The generate_briefs flow processes backlog items in parallel (bounded by a semaphore of 10):
- Load backlog item and content template
- Create article record in
brief_generatingstatus - Run the brief generator agent
- Store brief and transition to
brief_pending - Link backlog item to article and update backlog status
On failure, the article transitions to failed status so it does not remain stuck in brief_generating.
Research News Flow¶
The research_news_scan flow runs weekly and delegates to a produce_digests sub-flow:
- Source scanning -- fetches new items from all active RSS/PubMed sources
- Triage -- AI classifies each item for newsworthiness (batches capped at 20 items to prevent structured output truncation)
- Pool-aware gate check -- counts the full unassigned newsworthy pool in the DB (cross-week), skips if fewer than 3 items total (not just this week's triage output)
- Digest production -- calls
produce_digestssub-flow
Digest Batching (FIFO Pool)¶
The produce_digests sub-flow (also deployable standalone) batches the newsworthy pool into digests of 3-5 items each:
- Loads all newsworthy items from the database in FIFO order (
discovered_at ASC, limit 100) - Reconstructs triage metadata from persisted
SourceItemfields for cross-week items (in-memory triage results from the current scan take precedence via a merge dict) - Batches the pool:
while remaining >= 3, slice up to 5 items per batch - For each batch, produces one digest per language via the digest writer agent
- Runs quality gates (factuality + style) with a synthesis rewrite loop (up to 3 iterations)
- Items remaining below the 3-item minimum stay in the pool for the next scan
A single scan run may produce multiple digests per language if the pool is large enough. For example, 8 items produce 2 digests (5 + 3) per language.
Constants: MIN_NEWSWORTHY_ITEMS = 3, MAX_ITEMS_PER_DIGEST = 5.
Resume Flow¶
The resume_article flow resumes article production from whatever status the article is currently in. It reads the article's status and dispatches to the appropriate pipeline step:
brief_approved/researching— delegates to the fullproduce_articleflowwriting— loads brief and dossier from DB, runs writer → quality gates → synthesisquality_check/synthesizing— loads existing draft, runs quality gates → synthesisready_for_review— no-op (already complete)
For research news / digest articles, the resume flow reconstructs the digest brief and dossier from the database and runs the digest-specific gate loop.
Stuck Article Recovery¶
The recover_stuck_articles flow runs on a 30-minute schedule. It finds articles stuck in transitional statuses (researching, writing, quality_check, synthesizing) where updated_at is older than a threshold (default: 30 minutes) and triggers the resume-article deployment for each.
This uses a dedicated recovery work pool (recovery-pool) so recovery flows are never blocked by production work in the main content-pool.
Draft Validation¶
Before running quality gates, draft_validation.py performs deterministic structure checks (zero LLM cost):
- Detects truncated drafts (ends mid-sentence)
- Checks word count against minimum thresholds
- Validates required structural elements from the content template
Failed drafts trigger an immediate rewrite without wasting expensive gate compute.
Republish Flow¶
The republish_articles flow re-publishes already-published articles to WordPress. It shares the same core publish logic as publish_articles but filters for PUBLISHED status and skips the atomic claim transition. Use this to fix partial publishes where some WordPress operations (taxonomy, Polylang, ACF) failed on the initial publish.
Supporting Modules¶
| Module | Purpose |
|---|---|
status.py |
Article status state machine with VALID_TRANSITIONS dict and transition_article() |
quality_gates.py |
Shared map_gate_results() helper for mapping parallel gate results to typed outputs |
run_tracker.py |
Pipeline run lifecycle tracking (start, complete, finish, stale detection) |
draft_validation.py |
Deterministic draft structure validation (truncation, word count, required elements) |
resume_flow.py |
Resume article production from current status + stuck article recovery |
Deployment Configuration¶
Flows are deployed via prefect.yaml at the repository root. Nine deployments are registered across two work pools:
Content Pool (production work):
research-news-scan-- weekly Sunday 06:00 UTC (concurrency: 1)produce-digests-- manual trigger (concurrency: 1)generate-briefs-- manual trigger (concurrency: 1)batch-produce-- manual trigger (concurrency: 1)produce-article-- manual triggerresume-article-- manual trigger (concurrency: 1)publish-articles-- manual trigger (concurrency: 1)republish-articles-- manual trigger (concurrency: 1)
Recovery Pool (never blocked by production):
recover-stuck-articles-- every 30 minutes (concurrency: 1)
The recovery pool is separate so that stuck article recovery is never blocked by production flows filling the content pool.
See Deployment for setup instructions.