Skip to content

Database API

db

Async database layer backed by Neon Postgres with pgvector.

Uses SQLAlchemy 2.0 Core (no ORM) with asyncpg. All connections go through Neon's PgBouncer pooler endpoint with prepared_statement_cache_size=0.

Modules: connection: Engine and session factory, URL normalisation for asyncpg. tables: SQLAlchemy Table definitions and MetaData. models: Pydantic models bridging DB rows to application types. queries: Reusable SELECT/INSERT/UPDATE query builders. migrations/: Alembic migration scripts (schema + data seeds).

Connection Management

connection

Async database connection management.

normalise_url_for_asyncpg(raw_url)

Convert a standard Postgres URL to one asyncpg can connect with.

  • Swaps the dialect prefix to postgresql+asyncpg://
  • Strips query params asyncpg doesn't support (e.g. channel_binding)
  • Renames sslmodessl

get_engine() cached

Create or return the cached async engine.

get_session_factory()

Create an async session factory.

get_session() async

Async context manager for database sessions.

Table Definitions

tables

SQLAlchemy table definitions for all 14 database tables.

Query Builders

queries

Async database query functions for common operations.

All functions take an AsyncSession and return Pydantic models. No business logic — pure data access.

get_backlog_items(session, *, status=None, pillar=None, language=None, content_type=None) async

Get backlog items with optional filters.

get_backlog_item_by_keyword(session, keyword, language) async

Get a backlog item by primary keyword and language.

update_backlog_status(session, item_id, status) async

Update a backlog item's status.

bulk_import_backlog(session, items) async

Bulk insert backlog items from CSV import. Returns count inserted.

get_backlog_item(session, item_id) async

Get a single backlog item by ID.

get_backlog_ids_with_articles(session) async

Return backlog IDs that have a linked non-failed article.

Failed articles (e.g. from brief rejection) are excluded so their backlog items can be re-queued for brief generation.

clear_article_backlog_id(session, article_id) async

Clear the backlog_id on a failed article to fully sever the backlog link.

fail_generating_article(session, backlog_id) async

Transition any brief_generating article for this backlog item to failed.

create_article(session, *, backlog_id, content_type, language, pillar, primary_keyword, title, pipeline_run_id=None, status='brief_generating') async

Create a new article and return it.

get_article(session, article_id) async

Get a single article by ID.

get_articles_by_backlog_id(session, backlog_id) async

Get all articles linked to a backlog item, newest first.

get_article_for_update(session, article_id) async

Get a single article by ID with a row-level lock (FOR UPDATE).

Use this when the caller will update the row based on its current state to prevent TOCTOU races under concurrent access.

update_article_status(session, article_id, status) async

Update an article's status.

reset_article_for_requeue(session, article_id, target_status) async

Clear downstream content fields when requeuing an article backward.

Uses _REQUEUE_RESET_RULES to determine which article columns to NULL, whether to reset quality_gates_passed, and which child rows to delete.

Raises ValueError for target statuses not in the reset rules.

set_quality_gates_passed(session, article_id, passed) async

Record whether an article passed quality gates normally or via gate exhaustion.

get_articles_by_pillar(session, pillar, language, content_type=None) async

Get articles by pillar and language, optionally filtered by content type.

get_articles_by_status(session, status) async

Get all articles with a given status.

update_article_draft(session, article_id, draft) async

Update article draft content.

save_writer_output(session, article_id, writer_output_data) async

Persist the full WriterOutput JSON in the writer_output column.

update_article_faq_items(session, article_id, faq_items) async

Set an article's structured FAQ items.

save_research_dossier(session, article_id, dossier) async

Persist a ResearchDossier as JSONB on the articles row.

set_wordpress_ids(session, article_id, wordpress_post_id, wordpress_url) async

Set WordPress post ID and URL after publishing.

set_taxonomy_assignments(session, article_id, taxonomy_assignments) async

Persist LLM-selected taxonomy assignments on the article record.

get_translation_counterpart(session, article_id, target_language) async

Get the translation counterpart of an article in the target language.

Finds the article sharing the same backlog_id but in a different language.

set_published_at(session, article_id) async

Set published_at to current UTC timestamp.

update_article_brief(session, article_id, brief) async

Update an article's brief JSONB field.

update_article_title(session, article_id, title) async

Update an article's title.

link_backlog_to_article(session, backlog_id, article_id) async

Set the article_id on a backlog item to link it to an article.

update_article_final_content(session, article_id, content) async

Set an article's final_content text.

increment_article_cost(session, article_id, cost_usd) async

Atomically add cost_usd to an article's total_cost_usd.

delete_article_images(session, article_id) async

Delete all images for an article (used before re-generating).

insert_article_image(session, article_id, ordinal, r2_key, prompt_used, revised_prompt) async

Insert a generated image record with its R2 storage key.

get_article_images(session, article_id) async

Get image metadata for an article, ordered by ordinal.

get_article_image_r2_key(session, article_id, ordinal) async

Load one image's R2 object key.

get_selected_article_image_r2_key(session, article_id) async

Load the R2 key of the selected image (is_selected=True) for publishing.

select_article_image(session, article_id, ordinal) async

Mark one image as selected and deselect the other.

has_selected_image(session, article_id) async

Check whether an article has a selected image (for publish guard).

save_quality_score(session, score_data) async

Insert a quality score and return it.

get_quality_scores(session, article_id) async

Get all quality scores for an article.

get_latest_scores(session, article_id) async

Get the latest iteration quality scores for an article.

get_active_sources(session) async

Get all active source registry entries.

save_source_items(session, items) async

Bulk insert source items. Returns count inserted.

get_pending_source_items(session, *, limit=50) async

Get pending source items for triage.

update_triage_status(session, item_id, status, reasoning='', *, relevance_score=None, novelty_score=None, accessibility_score=None, suggested_angle=None, patient_relevance=None) async

Update triage status, reasoning, and optional evaluation fields.

get_newsworthy_items(session, *, limit=100, offset=0) async

Get newsworthy source items not yet assigned to an article (FIFO order).

count_newsworthy_items(session) async

Return the count of newsworthy source items not yet assigned to an article.

get_recent_digest_articles(session, *, weeks=4, statuses=None) async

Get research_news articles created within the last N weeks.

Args: statuses: Optional list of status values to filter by. If None, all statuses returned.

Unlink source items from failed or draft research_news articles.

When produce_digests partially executes and then fails, source items are linked to articles via article_id but the articles never complete. This function resets article_id = NULL on those items so they return to the newsworthy pool for the next digest run.

Returns the count of unlinked items.

link_source_items_to_article(session, source_item_ids, article_id) async

Associate newsworthy source items with a digest article.

update_source_last_scanned(session, source_id) async

Update last_scanned_at to current UTC time for a source registry entry.

query_similar_chunks(session, query_embedding, limit=15) async

Return vault chunks most similar to query_embedding via pgvector cosine distance.

The caller is responsible for obtaining the embedding (via OpenAI or other provider) and passing it in. This keeps the queries module IO-free.

Returns a list of dicts with keys: chunk_text, note_path, note_title, tags, source_url.

query_vault_text_only(session, query_text, limit=15) async

Return vault chunks matching query_text via full-text search only.

Fallback for when embedding generation is unavailable. Uses the simple text search config which is language-agnostic (no stemming) — suitable for mixed EN/DE vault content.

Returns a list of dicts with keys: chunk_text, note_path, note_title, tags, source_url.

get_dossier_by_source_item(session, source_item_id) async

Return the research dossier from a completed article sharing source_item_id.

Looks for any article with the same source_item_id that has progressed past the RESEARCHING stage (status >= WRITING) and has a non-null dossier. Returns the raw JSONB dict or None if no dossier is available.

create_research_news_article(session, *, language, title, pipeline_run_id=None, digest_batch_id=None) async

Create a research news article with backlog_id=None and status=writing.

get_source_items_for_article(session, article_id) async

Return all source items linked to a given article.

For digest articles sharing a digest_batch_id, returns source items linked to any article in the same batch — handles the case where the linking loop leaves article_id pointing to only the last language.

find_stuck_articles(session, *, threshold_minutes=30) async

Find articles stuck in transitional statuses for longer than threshold_minutes.

Transitional statuses are those where the article is mid-pipeline (researching, writing, quality_check, synthesizing) and should normally complete within minutes. Articles in these statuses with updated_at older than the threshold are considered stuck.

get_digest_translation_counterpart(session, article_id, target_language) async

Get the translation counterpart of a digest via digest_batch_id.

Finds the article sharing the same digest_batch_id but in a different language. Returns None when no counterpart exists or when digest_batch_id is NULL (legacy digests).

create_pipeline_run(session, *, run_type, articles_total) async

Create a new pipeline run and return it.

update_pipeline_run(session, run_id, **values) async

Update pipeline run fields.

get_active_runs(session) async

Get all currently running pipeline runs.

get_recently_completed_runs(session, limit=3) async

Get the most recently completed or failed pipeline runs.

get_pipeline_run(session, run_id) async

Get a single pipeline run by ID.

get_pipeline_run_by_prefect_id(session, prefect_flow_run_id) async

Get a pipeline run by its linked Prefect flow run ID.

get_articles_by_ids(session, ids) async

Get articles by a list of IDs, ordered by created_at.

increment_run_completed(session, run_id) async

Atomically increment the articles_completed counter on a pipeline run.

increment_run_failed(session, run_id) async

Atomically increment the articles_failed counter on a pipeline run.

set_articles_pipeline_run(session, article_ids, run_id) async

Link articles to a pipeline run.

set_source_items_pipeline_run(session, item_ids, run_id) async

Link source items to a pipeline run.

get_content_template(session, content_type, pillar=None) async

Get an active content template, preferring pillar-specific over cross-pillar.

First tries to find a template matching the given pillar. If none found (or no pillar given), falls back to a cross-pillar template (pillar IS NULL).

get_active_prompt(session, agent_name, content_type) async

Get the active prompt for an agent and content type.

Falls back to content_type='all' if specific type not found.

create_prompt_version(session, prompt_data) async

Create a new prompt version.

list_prompt_versions(session, agent_name, content_type) async

List all prompt versions for an agent and content type.

get_articles_count_by_status(session) async

Count articles grouped by status.

get_published_count(session, *, since) async

Count published articles since a given date.

get_total_cost(session, *, since=None) async

Sum total_cost_usd across articles, optionally since a date.

get_recent_activity(session, *, limit=50) async

Get the most recently updated articles.

create_backlog_item(session, item_data) async

Insert a new backlog item and return it.

update_backlog_item(session, item_id, **values) async

Update specified fields on a backlog item.

get_all_sources(session) async

Get all source registry entries (active and inactive).

create_source(session, source_data) async

Insert a new source registry entry and return it.

update_source(session, source_id, **values) async

Update specified fields on a source registry entry.

get_active_trusted_sources(session, *, language=None) async

Get active trusted sources, optionally filtered by language.

When language is provided, returns sources matching that language OR sources with language='both'.

get_all_trusted_sources(session) async

Get all trusted sources (active and inactive) for dashboard management.

create_trusted_source(session, source_data) async

Insert a new trusted source and return it.

update_trusted_source(session, source_id, **values) async

Update specified fields on a trusted source, auto-setting updated_at.

get_source_items_filtered(session, *, source_id=None, triage_status=None, date_from=None, date_to=None, limit=100, offset=0) async

Get source items with optional filters.

count_source_items_filtered(session, *, source_id=None, triage_status=None, date_from=None, date_to=None) async

Return the count of source items matching the given filters.

activate_prompt_version(session, prompt_id) async

Activate a prompt version and deactivate others for the same agent+content_type.

Looks up the target prompt's agent_name and content_type, sets all matching prompts to active=False, then sets the target to active=True.

get_all_articles(session) async

Get all articles ordered by updated_at descending.

get_articles_summary(session) async

Get all articles with only lightweight columns (no heavy JSONB).

Returns plain dicts — avoids Article model validation overhead for dashboard list views that only need summary fields.

get_all_quality_scores(session) async

Get all quality scores ordered by created_at descending.

get_pipeline_runs_history(session, *, limit=20, offset=0, run_type=None, status=None, since=None) async

Get paginated pipeline runs with optional filters, newest first.

get_pipeline_runs_count(session, *, run_type=None, status=None, since=None) async

Count pipeline runs matching optional filters.

get_articles_in_run_window(session, *, started_at, completed_at, run_id=None) async

Get articles linked to a pipeline run.

Uses the pipeline_run_id FK when run_id is provided and matches at least one article; otherwise falls back to a temporal correlation on updated_at (needed for old runs created before the FK existed).

Returns lightweight dicts with id, title, language, status, boolean flags for draft/final content, wordpress_url, and updated_at.

get_source_items_in_run_window(session, *, started_at, completed_at, run_id=None) async

Get source items linked to a pipeline run.

Uses the pipeline_run_id FK when run_id is provided and matches; otherwise falls back to temporal correlation on discovered_at.

get_quality_summary_for_articles(session, *, article_ids) async

Get latest-iteration quality scores grouped by article.

For each article returns the scores from only the highest iteration, keyed by str(article_id).

get_all_dashboard_users(session) async

Get all dashboard users ordered by username.

get_active_dashboard_users(session) async

Get all active dashboard users.

get_dashboard_user_by_username(session, username) async

Get a dashboard user by username.

get_dashboard_user_by_email(session, email) async

Get an active dashboard user by email address.

create_dashboard_user(session, user_data) async

Insert a new dashboard user and return it.

update_dashboard_user(session, user_id, **values) async

Update specified fields on a dashboard user.

count_dashboard_users(session) async

Count total dashboard users (for bootstrap detection).

create_audit_entry(session, entry_data) async

Insert an audit log entry.

get_audit_log(session, *, user_id=None, username=None, action=None, date_from=None, date_to=None, limit=100, offset=0) async

Get audit log entries with optional filters.

count_audit_log(session, *, user_id=None, username=None, action=None, date_from=None, date_to=None) async

Return the count of audit log entries matching the given filters.