Skip to content

Run the Pipeline

The pipeline is orchestrated with Prefect. Flows can be triggered manually via the Prefect UI/CLI or run on a schedule.

Prerequisites

  • Database migrations applied (uv run alembic upgrade head)
  • Environment variables configured (see Configuration)
  • Prefect worker running (for deployed flows) or local execution

Triggering Flows

Via Prefect UI

  1. Proxy the Prefect server locally: fly proxy 4200:4200
  2. Navigate to Deployments at http://localhost:4200
  3. Select the flow (e.g., batch-produce)
  4. Click Run and provide parameters

Via Prefect CLI

# Run a single article through the full pipeline
prefect deployment run 'produce-article/produce-article' \
  --param article_id=<UUID>

# Generate briefs for backlog items
prefect deployment run 'generate-briefs/generate-briefs' \
  --param backlog_item_ids='["<UUID1>", "<UUID2>"]'

# Batch produce multiple articles
prefect deployment run 'batch-produce/batch-produce' \
  --param article_ids='["<UUID1>", "<UUID2>"]' \
  --param concurrency=5

# Publish approved articles to WordPress
prefect deployment run 'publish-articles/publish-articles' \
  --param article_ids='["<UUID1>"]'

Via Python (Local)

For development, you can run flows directly:

import asyncio
from src.pipeline.article_flow import produce_article

asyncio.run(produce_article(article_id=uuid))

Flow Parameters

produce_article

Parameter Type Description
article_id UUID The article to process (must be in brief_approved status)

generate_briefs

Parameter Type Description
backlog_item_ids list[UUID] Backlog items to generate briefs for

batch_produce

Parameter Type Description
article_ids list[UUID] Articles to produce
concurrency int Max parallel articles (default: 5)

publish_articles

Parameter Type Description
article_ids list[UUID] Articles to publish to WordPress
concurrency int Max parallel publishes (default: 5)

republish_articles

Parameter Type Description
article_ids list[UUID] Articles to republish (must be in published status)
concurrency int Max parallel republishes (default: 1)

Re-publishes already-published articles to WordPress. Use this to fix partial publishes where some WordPress operations failed.

resume_article

Parameter Type Description
article_id UUID The article to resume production for

Resumes article production from whatever status the article is currently in. Dispatches to the appropriate pipeline step based on what the article already has in the database.

recover_stuck_articles

Parameter Type Description
threshold_minutes int Minutes of inactivity before an article is considered stuck (default: 30)

Periodic sweep (every 30 min) that finds articles stuck in transitional statuses and triggers resume-article for each. Runs on the recovery work pool.

research_news_scan

Parameter Type Description
languages list[str] | None Language codes (default: ["en", "de"])

Runs weekly on Sunday at 06:00 UTC. Skips the week if fewer than 3 newsworthy items are found. Calls produce_digests as a sub-flow when enough items are found.

produce_digests

Parameter Type Description
languages list[str] | None Language codes (default: ["en", "de"])
triage_results list[TriageResult] | None Pre-computed triage results (auto-provided when called as sub-flow)

Can run standalone (loads newsworthy items from DB) or as a sub-flow of research_news_scan.

Monitoring

  • Prefect UI -- flow run status, logs, task timelines (self-hosted, access via fly proxy 4200:4200)
  • Dashboard Pipeline page -- overview of recent runs, status counts, cost tracking
  • Langfuse -- LLM trace details, session grouping, quality scores per iteration (see Observability)
  • Slack alerts -- failure notifications via webhook (if SLACK_WEBHOOK_URL is set)

Cost Limits

Each flow has a per-run cost ceiling:

Flow Limit (USD)
produce_article $5.00
batch_produce $100.00
research_news_scan $2.00
produce_digests $9.00

If a flow exceeds its budget, it raises CostLimitExceeded and stops. See Cost Management for details.