Skip to content

Pipeline API

pipeline

Prefect flow orchestration for the content pipeline.

Flows handle scheduling, retries, and status transitions. Business logic lives in agents and tools — flows are thin orchestration wrappers.

Modules: article_flow: Main article production flow (research → draft → gates → publish). brief_flow: Article brief generation from backlog items. batch_flow: Batch processing of multiple articles. publish_flow: WordPress publishing flow. research_news_flow: Research news digest pipeline. quality_gates: Gate dispatch, score posting, and consistency enforcement. draft_validation: Deterministic draft structure validation before quality gates. run_tracker: Pipeline run lifecycle tracking. status: Article status-transition state machine.

produce_article(article_id) async

Produce a single article through the full pipeline.

Steps: 1. Load the article and validate it is in brief_approved status. 2. Optionally load the parent cornerstone draft. 3. Research (brief_approved -> researching) 4. Write (researching -> writing) 5. Quality gates in parallel (writing -> quality_check -> synthesizing) 6. If gates pass: skip synthesis, humanize draft, advance to ready_for_review. 7. If gates fail: run synthesis with truncation guard, loop (up to 3 iterations). On exhaustion: humanize draft, advance to ready_for_review with quality_gates_passed=False.

On any unhandled exception the article is marked as failed.

Returns: True if the article reached ready_for_review (either via gates passing or via gate exhaustion for human review). Raises on unexpected errors (article marked failed before re-raise).

batch_produce(article_ids, concurrency=1) async

Produce a batch of articles through the full pipeline.

Creates a pipeline run, fans out produce_article calls with a concurrency semaphore, and finalises the run when all articles have been processed.

Args: article_ids: Article UUIDs to produce. concurrency: Maximum number of articles processed in parallel.

Returns: The UUID of the pipeline run created for this batch.

generate_briefs(backlog_item_ids) async

Generate briefs for a batch of backlog items.

Creates a pipeline run, processes all items sequentially (bounded by a semaphore), and finalises the run with the appropriate status.

Args: backlog_item_ids: List of backlog item UUIDs to generate briefs for.

Returns: The UUID of the pipeline run created for this batch.

publish_articles(article_ids, concurrency=1) async

Batch publish approved articles to WordPress.

Same pattern as batch_produce: creates a pipeline run, fans out publish tasks with a concurrency semaphore, and finalises the run when all articles have been processed.

Args: article_ids: Article UUIDs to publish. concurrency: Maximum number of articles published in parallel.

Returns: The UUID of the pipeline run created for this batch.

produce_digests(languages=None) async

Produce per-language digest articles from newsworthy source items.

Loads the newsworthy pool from the database in FIFO order, batches it into groups of 3–5 items, and produces one digest per batch per language. Creates its own pipeline run with run_type=digest_production.

Always runs standalone (triggered as a Prefect deployment). Triage metadata is reconstructed from persisted source item fields.

Args: languages: Language codes to produce digests for. Defaults to ["en", "de"].

Returns: List of article UUIDs produced across all batches and languages.

research_news_scan(languages=None) async

Scan sources, triage items, and trigger digest production.

Steps: 1. Scan all active sources for new items. 2. Triage pending items for newsworthiness. 3. Gate check: skip if fewer than 3 newsworthy items. 4. Trigger produce-digests deployment (fire-and-forget).

The scan flow tracks source items triaged (not articles). Digest article production runs as a separate Prefect deployment with its own pipeline run tracking.

Args: languages: Language codes to produce digests for. Defaults to ["en", "de"].

Returns: List of newsworthy TriageResult objects from this scan.

Quality Gates

quality_gates

Shared quality gate dispatch and score-posting helpers.

map_gate_results(gate_keys, results)

Map parallel gate results back to typed outputs by gate key.

Both research_news_flow and article_flow run quality gates via asyncio.gather with a dynamic list of coroutines. This helper maps the positional results back to their typed variables so each flow doesn't repeat the same mapping logic.

When asyncio.gather is called with return_exceptions=True, failed coroutines return BaseException instances instead of raising. Those are collected in the returned errors list so callers can persist error scores before deciding whether to re-raise.

enforce_score_passed_consistency(output, threshold)

Correct passed flag when it disagrees with the score vs threshold.

post_span_scores(span, fact, seo, style, iteration)

Post quality gate scores to the active Langfuse span.

Called after every gate evaluation (pass or fail) so all iterations are visible in Langfuse. Silently no-ops on any Langfuse exception.

Status Machine

status

Article status transition validator.

Defines the valid state machine for article lifecycle transitions and provides a validation function that raises on illegal moves.

InvalidStatusTransitionError(current, target)

Bases: Exception

Raised when an article status transition is not allowed.

DossierQualityError

Bases: Exception

Raised when research dossier fails quality validation twice.

This exception SHALL NOT be retried by Prefect's task retry mechanism. The research_task catches ValueError from quality validation and retries internally once; on the second failure it raises this exception to prevent Prefect from compounding retries.

is_backward_requeue(current_status, target_status)

Return True if target_status is an earlier pipeline stage than current_status.

Requeue from failed is always considered backward because the article restarts from scratch at the target stage.

validate_transition(current, target)

Validate that currenttarget is a legal article status transition.

Raises: InvalidStatusTransitionError: If the transition is not in VALID_TRANSITIONS.

transition_article(session, article_id, expected, target) async

Load current status, validate the transition, and update.

Raises InvalidStatusTransitionError if current != expected or the transition is not in LEGAL_TRANSITIONS.

Run Tracking

run_tracker

Pipeline run lifecycle management.

Provides async helpers to create, track, and finalise pipeline runs. All business logic for determining final run status lives here; the underlying DB mutations are delegated to src.db.queries.

start_run(session, run_type, articles_total) async

Create a new pipeline run and return its ID.

Args: session: Active async database session. run_type: The kind of pipeline execution (batch, single, etc.). articles_total: Number of articles queued for this run.

Returns: The UUID of the newly created pipeline run.

complete_article(session, run_id, *, success) async

Record the outcome of a single article within a pipeline run.

Args: session: Active async database session. run_id: The pipeline run to update. success: True if the article completed successfully.

finish_run(session, run_id) async

Determine the final status of a pipeline run and mark it as finished.

Status logic: - FAILED -- no articles were processed (crash before processing started). - COMPLETED -- all articles succeeded (no failures). - PARTIAL -- some articles succeeded and some failed. - FAILED -- no articles succeeded and at least one failed.

Args: session: Active async database session. run_id: The pipeline run to finalise.

Associate a Prefect flow run ID with a pipeline run.

Args: session: Active async database session. run_id: The pipeline run to update. prefect_flow_run_id: The Prefect flow run identifier.

mark_stale_runs(session, *, threshold_hours=2) async

Detect running pipeline runs older than threshold_hours and mark them stale.

Also cancels the corresponding Prefect flow runs (best-effort) to release concurrency slots.

Returns the number of runs updated.