Database API¶
db
¶
Async database layer backed by Neon Postgres with pgvector.
Uses SQLAlchemy 2.0 Core (no ORM) with asyncpg. All connections go through
Neon's PgBouncer pooler endpoint with prepared_statement_cache_size=0.
Modules: connection: Engine and session factory, URL normalisation for asyncpg. tables: SQLAlchemy Table definitions and MetaData. models: Pydantic models bridging DB rows to application types. queries: Reusable SELECT/INSERT/UPDATE query builders. migrations/: Alembic migration scripts (schema + data seeds).
Connection Management¶
connection
¶
Async database connection management.
normalise_url_for_asyncpg(raw_url)
¶
Convert a standard Postgres URL to one asyncpg can connect with.
- Swaps the dialect prefix to
postgresql+asyncpg:// - Strips query params asyncpg doesn't support (e.g.
channel_binding) - Renames
sslmode→ssl
get_engine()
cached
¶
Create or return the cached async engine.
get_session_factory()
¶
Create an async session factory.
get_session()
async
¶
Async context manager for database sessions.
Table Definitions¶
tables
¶
SQLAlchemy table definitions for all 14 database tables.
Query Builders¶
queries
¶
Async database query functions for common operations.
All functions take an AsyncSession and return Pydantic models. No business logic — pure data access.
get_backlog_items(session, *, status=None, pillar=None, language=None, content_type=None)
async
¶
Get backlog items with optional filters.
get_backlog_item_by_keyword(session, keyword, language)
async
¶
Get a backlog item by primary keyword and language.
update_backlog_status(session, item_id, status)
async
¶
Update a backlog item's status.
bulk_import_backlog(session, items)
async
¶
Bulk insert backlog items from CSV import. Returns count inserted.
get_backlog_item(session, item_id)
async
¶
Get a single backlog item by ID.
get_backlog_ids_with_articles(session)
async
¶
Return backlog IDs that have a linked non-failed article.
Failed articles (e.g. from brief rejection) are excluded so their backlog items can be re-queued for brief generation.
clear_article_backlog_id(session, article_id)
async
¶
Clear the backlog_id on a failed article to fully sever the backlog link.
fail_generating_article(session, backlog_id)
async
¶
Transition any brief_generating article for this backlog item to failed.
create_article(session, *, backlog_id, content_type, language, pillar, primary_keyword, title, pipeline_run_id=None, status='brief_generating')
async
¶
Create a new article and return it.
get_article(session, article_id)
async
¶
Get a single article by ID.
get_articles_by_backlog_id(session, backlog_id)
async
¶
Get all articles linked to a backlog item, newest first.
get_article_for_update(session, article_id)
async
¶
Get a single article by ID with a row-level lock (FOR UPDATE).
Use this when the caller will update the row based on its current state to prevent TOCTOU races under concurrent access.
update_article_status(session, article_id, status)
async
¶
Update an article's status.
reset_article_for_requeue(session, article_id, target_status)
async
¶
Clear downstream content fields when requeuing an article backward.
Uses _REQUEUE_RESET_RULES to determine which article columns to NULL,
whether to reset quality_gates_passed, and which child rows to delete.
Raises ValueError for target statuses not in the reset rules.
set_quality_gates_passed(session, article_id, passed)
async
¶
Record whether an article passed quality gates normally or via gate exhaustion.
get_articles_by_pillar(session, pillar, language, content_type=None)
async
¶
Get articles by pillar and language, optionally filtered by content type.
get_articles_by_status(session, status)
async
¶
Get all articles with a given status.
update_article_draft(session, article_id, draft)
async
¶
Update article draft content.
save_writer_output(session, article_id, writer_output_data)
async
¶
Persist the full WriterOutput JSON in the writer_output column.
update_article_faq_items(session, article_id, faq_items)
async
¶
Set an article's structured FAQ items.
save_research_dossier(session, article_id, dossier)
async
¶
Persist a ResearchDossier as JSONB on the articles row.
set_wordpress_ids(session, article_id, wordpress_post_id, wordpress_url)
async
¶
Set WordPress post ID and URL after publishing.
set_taxonomy_assignments(session, article_id, taxonomy_assignments)
async
¶
Persist LLM-selected taxonomy assignments on the article record.
get_translation_counterpart(session, article_id, target_language)
async
¶
Get the translation counterpart of an article in the target language.
Finds the article sharing the same backlog_id but in a different language.
set_published_at(session, article_id)
async
¶
Set published_at to current UTC timestamp.
update_article_brief(session, article_id, brief)
async
¶
Update an article's brief JSONB field.
update_article_title(session, article_id, title)
async
¶
Update an article's title.
link_backlog_to_article(session, backlog_id, article_id)
async
¶
Set the article_id on a backlog item to link it to an article.
update_article_final_content(session, article_id, content)
async
¶
Set an article's final_content text.
increment_article_cost(session, article_id, cost_usd)
async
¶
Atomically add cost_usd to an article's total_cost_usd.
delete_article_images(session, article_id)
async
¶
Delete all images for an article (used before re-generating).
insert_article_image(session, article_id, ordinal, r2_key, prompt_used, revised_prompt)
async
¶
Insert a generated image record with its R2 storage key.
get_article_images(session, article_id)
async
¶
Get image metadata for an article, ordered by ordinal.
get_article_image_r2_key(session, article_id, ordinal)
async
¶
Load one image's R2 object key.
get_selected_article_image_r2_key(session, article_id)
async
¶
Load the R2 key of the selected image (is_selected=True) for publishing.
select_article_image(session, article_id, ordinal)
async
¶
Mark one image as selected and deselect the other.
has_selected_image(session, article_id)
async
¶
Check whether an article has a selected image (for publish guard).
save_quality_score(session, score_data)
async
¶
Insert a quality score and return it.
get_quality_scores(session, article_id)
async
¶
Get all quality scores for an article.
get_latest_scores(session, article_id)
async
¶
Get the latest iteration quality scores for an article.
get_active_sources(session)
async
¶
Get all active source registry entries.
save_source_items(session, items)
async
¶
Bulk insert source items. Returns count inserted.
get_pending_source_items(session, *, limit=50)
async
¶
Get pending source items for triage.
update_triage_status(session, item_id, status, reasoning='', *, relevance_score=None, novelty_score=None, accessibility_score=None, suggested_angle=None, patient_relevance=None)
async
¶
Update triage status, reasoning, and optional evaluation fields.
get_newsworthy_items(session, *, limit=100, offset=0)
async
¶
Get newsworthy source items not yet assigned to an article (FIFO order).
count_newsworthy_items(session)
async
¶
Return the count of newsworthy source items not yet assigned to an article.
get_recent_digest_articles(session, *, weeks=4, statuses=None)
async
¶
Get research_news articles created within the last N weeks.
Args: statuses: Optional list of status values to filter by. If None, all statuses returned.
unlink_orphaned_source_items(session)
async
¶
Unlink source items from failed or draft research_news articles.
When produce_digests partially executes and then fails, source items
are linked to articles via article_id but the articles never complete.
This function resets article_id = NULL on those items so they return
to the newsworthy pool for the next digest run.
Returns the count of unlinked items.
link_source_items_to_article(session, source_item_ids, article_id)
async
¶
Associate newsworthy source items with a digest article.
update_source_last_scanned(session, source_id)
async
¶
Update last_scanned_at to current UTC time for a source registry entry.
query_similar_chunks(session, query_embedding, limit=15)
async
¶
Return vault chunks most similar to query_embedding via pgvector cosine distance.
The caller is responsible for obtaining the embedding (via OpenAI or other provider) and passing it in. This keeps the queries module IO-free.
Returns a list of dicts with keys: chunk_text, note_path, note_title, tags, source_url.
query_vault_text_only(session, query_text, limit=15)
async
¶
Return vault chunks matching query_text via full-text search only.
Fallback for when embedding generation is unavailable. Uses the simple
text search config which is language-agnostic (no stemming) — suitable for
mixed EN/DE vault content.
Returns a list of dicts with keys: chunk_text, note_path, note_title, tags, source_url.
get_dossier_by_source_item(session, source_item_id)
async
¶
Return the research dossier from a completed article sharing source_item_id.
Looks for any article with the same source_item_id that has progressed
past the RESEARCHING stage (status >= WRITING) and has a non-null dossier.
Returns the raw JSONB dict or None if no dossier is available.
create_research_news_article(session, *, language, title, pipeline_run_id=None, digest_batch_id=None)
async
¶
Create a research news article with backlog_id=None and status=writing.
get_source_items_for_article(session, article_id)
async
¶
Return all source items linked to a given article.
For digest articles sharing a digest_batch_id, returns source items
linked to any article in the same batch — handles the case where the
linking loop leaves article_id pointing to only the last language.
find_stuck_articles(session, *, threshold_minutes=30)
async
¶
Find articles stuck in transitional statuses for longer than threshold_minutes.
Transitional statuses are those where the article is mid-pipeline
(researching, writing, quality_check, synthesizing) and should normally
complete within minutes. Articles in these statuses with
updated_at older than the threshold are considered stuck.
get_digest_translation_counterpart(session, article_id, target_language)
async
¶
Get the translation counterpart of a digest via digest_batch_id.
Finds the article sharing the same digest_batch_id but in a different
language. Returns None when no counterpart exists or when
digest_batch_id is NULL (legacy digests).
create_pipeline_run(session, *, run_type, articles_total)
async
¶
Create a new pipeline run and return it.
update_pipeline_run(session, run_id, **values)
async
¶
Update pipeline run fields.
get_active_runs(session)
async
¶
Get all currently running pipeline runs.
get_recently_completed_runs(session, limit=3)
async
¶
Get the most recently completed or failed pipeline runs.
get_pipeline_run(session, run_id)
async
¶
Get a single pipeline run by ID.
get_pipeline_run_by_prefect_id(session, prefect_flow_run_id)
async
¶
Get a pipeline run by its linked Prefect flow run ID.
get_articles_by_ids(session, ids)
async
¶
Get articles by a list of IDs, ordered by created_at.
increment_run_completed(session, run_id)
async
¶
Atomically increment the articles_completed counter on a pipeline run.
increment_run_failed(session, run_id)
async
¶
Atomically increment the articles_failed counter on a pipeline run.
set_articles_pipeline_run(session, article_ids, run_id)
async
¶
Link articles to a pipeline run.
set_source_items_pipeline_run(session, item_ids, run_id)
async
¶
Link source items to a pipeline run.
get_content_template(session, content_type, pillar=None)
async
¶
Get an active content template, preferring pillar-specific over cross-pillar.
First tries to find a template matching the given pillar. If none found (or no pillar given), falls back to a cross-pillar template (pillar IS NULL).
get_active_prompt(session, agent_name, content_type)
async
¶
Get the active prompt for an agent and content type.
Falls back to content_type='all' if specific type not found.
create_prompt_version(session, prompt_data)
async
¶
Create a new prompt version.
list_prompt_versions(session, agent_name, content_type)
async
¶
List all prompt versions for an agent and content type.
get_articles_count_by_status(session)
async
¶
Count articles grouped by status.
get_published_count(session, *, since)
async
¶
Count published articles since a given date.
get_total_cost(session, *, since=None)
async
¶
Sum total_cost_usd across articles, optionally since a date.
get_recent_activity(session, *, limit=50)
async
¶
Get the most recently updated articles.
create_backlog_item(session, item_data)
async
¶
Insert a new backlog item and return it.
update_backlog_item(session, item_id, **values)
async
¶
Update specified fields on a backlog item.
get_all_sources(session)
async
¶
Get all source registry entries (active and inactive).
create_source(session, source_data)
async
¶
Insert a new source registry entry and return it.
update_source(session, source_id, **values)
async
¶
Update specified fields on a source registry entry.
get_active_trusted_sources(session, *, language=None)
async
¶
Get active trusted sources, optionally filtered by language.
When language is provided, returns sources matching that language
OR sources with language='both'.
get_all_trusted_sources(session)
async
¶
Get all trusted sources (active and inactive) for dashboard management.
create_trusted_source(session, source_data)
async
¶
Insert a new trusted source and return it.
update_trusted_source(session, source_id, **values)
async
¶
Update specified fields on a trusted source, auto-setting updated_at.
get_source_items_filtered(session, *, source_id=None, triage_status=None, date_from=None, date_to=None, limit=100, offset=0)
async
¶
Get source items with optional filters.
count_source_items_filtered(session, *, source_id=None, triage_status=None, date_from=None, date_to=None)
async
¶
Return the count of source items matching the given filters.
activate_prompt_version(session, prompt_id)
async
¶
Activate a prompt version and deactivate others for the same agent+content_type.
Looks up the target prompt's agent_name and content_type, sets all
matching prompts to active=False, then sets the target to active=True.
get_all_articles(session)
async
¶
Get all articles ordered by updated_at descending.
get_articles_summary(session)
async
¶
Get all articles with only lightweight columns (no heavy JSONB).
Returns plain dicts — avoids Article model validation overhead for
dashboard list views that only need summary fields.
get_all_quality_scores(session)
async
¶
Get all quality scores ordered by created_at descending.
get_pipeline_runs_history(session, *, limit=20, offset=0, run_type=None, status=None, since=None)
async
¶
Get paginated pipeline runs with optional filters, newest first.
get_pipeline_runs_count(session, *, run_type=None, status=None, since=None)
async
¶
Count pipeline runs matching optional filters.
get_articles_in_run_window(session, *, started_at, completed_at, run_id=None)
async
¶
Get articles linked to a pipeline run.
Uses the pipeline_run_id FK when run_id is provided and matches at
least one article; otherwise falls back to a temporal correlation on
updated_at (needed for old runs created before the FK existed).
Returns lightweight dicts with id, title, language, status, boolean flags for draft/final content, wordpress_url, and updated_at.
get_source_items_in_run_window(session, *, started_at, completed_at, run_id=None)
async
¶
Get source items linked to a pipeline run.
Uses the pipeline_run_id FK when run_id is provided and matches;
otherwise falls back to temporal correlation on discovered_at.
get_quality_summary_for_articles(session, *, article_ids)
async
¶
Get latest-iteration quality scores grouped by article.
For each article returns the scores from only the highest iteration,
keyed by str(article_id).
get_all_dashboard_users(session)
async
¶
Get all dashboard users ordered by username.
get_active_dashboard_users(session)
async
¶
Get all active dashboard users.
get_dashboard_user_by_username(session, username)
async
¶
Get a dashboard user by username.
get_dashboard_user_by_email(session, email)
async
¶
Get an active dashboard user by email address.
create_dashboard_user(session, user_data)
async
¶
Insert a new dashboard user and return it.
update_dashboard_user(session, user_id, **values)
async
¶
Update specified fields on a dashboard user.
count_dashboard_users(session)
async
¶
Count total dashboard users (for bootstrap detection).
create_audit_entry(session, entry_data)
async
¶
Insert an audit log entry.
get_audit_log(session, *, user_id=None, username=None, action=None, date_from=None, date_to=None, limit=100, offset=0)
async
¶
Get audit log entries with optional filters.
count_audit_log(session, *, user_id=None, username=None, action=None, date_from=None, date_to=None)
async
¶
Return the count of audit log entries matching the given filters.