From 7505f7fdebae8d7246f7815bc70ce1d42fa66749 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 29 Apr 2026 15:49:50 +0000 Subject: [PATCH] huskies: merge 843 --- server/src/db/content_store.rs | 105 ++++++++ server/src/db/mod.rs | 430 ++------------------------------- server/src/db/ops.rs | 208 ++++++++++++++++ server/src/db/shadow_write.rs | 119 +++++++++ 4 files changed, 450 insertions(+), 412 deletions(-) create mode 100644 server/src/db/content_store.rs create mode 100644 server/src/db/ops.rs create mode 100644 server/src/db/shadow_write.rs diff --git a/server/src/db/content_store.rs b/server/src/db/content_store.rs new file mode 100644 index 00000000..92b440b4 --- /dev/null +++ b/server/src/db/content_store.rs @@ -0,0 +1,105 @@ +//! In-memory content store — fast synchronous reads for story markdown. +//! +//! Backed by a `HashMap` wrapped in a `Mutex`. In +//! non-test builds the store lives in a process-global `OnceLock`; in tests +//! each thread gets its own isolated copy via a `thread_local!` to avoid +//! cross-test pollution. +use std::collections::HashMap; +use std::sync::{Mutex, OnceLock}; + +static CONTENT_STORE: OnceLock>> = OnceLock::new(); + +#[cfg(test)] +thread_local! { + /// Per-thread isolated content store used in tests to prevent cross-test pollution. + pub(super) static CONTENT_STORE_TL: OnceLock>> = const { OnceLock::new() }; +} + +#[cfg(not(test))] +/// Return a reference to the process-global content store, or `None` if not yet initialised. +pub(super) fn get_content_store() -> Option<&'static Mutex>> { + CONTENT_STORE.get() +} + +#[cfg(test)] +/// Return the thread-local content store for tests, falling back to the global store. +pub(super) fn get_content_store() -> Option<&'static Mutex>> { + let tl = CONTENT_STORE_TL.with(|lock| { + if lock.get().is_some() { + Some(lock as *const OnceLock>>) + } else { + None + } + }); + if let Some(ptr) = tl { + // SAFETY: The thread-local lives as long as the thread, which outlives + // any test using it. We only need 'static for the return type. + let lock = unsafe { &*ptr }; + lock.get() + } else { + CONTENT_STORE.get() + } +} + +/// Read the full markdown content of a story from the in-memory store. +pub fn read_content(story_id: &str) -> Option { + let store = get_content_store()?; + let map = store.lock().ok()?; + map.get(story_id).cloned() +} + +/// Write (or overwrite) the full markdown content of a story. +/// +/// Updates the in-memory store immediately. +pub fn write_content(story_id: &str, content: &str) { + if let Some(store) = get_content_store() + && let Ok(mut map) = store.lock() + { + map.insert(story_id.to_string(), content.to_string()); + } +} + +/// Remove a story's content from the in-memory store. +pub fn delete_content(story_id: &str) { + if let Some(store) = get_content_store() + && let Ok(mut map) = store.lock() + { + map.remove(story_id); + } +} + +/// Ensure the in-memory content store is initialised. +/// +/// Safe to call multiple times — the `OnceLock` is set at most once. +pub fn ensure_content_store() { + #[cfg(not(test))] + { + let _ = CONTENT_STORE.set(Mutex::new(HashMap::new())); + } + + #[cfg(test)] + { + CONTENT_STORE_TL.with(|lock| { + if lock.get().is_none() { + let _ = lock.set(Mutex::new(HashMap::new())); + } + }); + crate::crdt_state::init_for_test(); + } +} + +/// Return all story IDs present in the content store. +pub fn all_content_ids() -> Vec { + match get_content_store() { + Some(store) => match store.lock() { + Ok(map) => map.keys().cloned().collect(), + Err(_) => Vec::new(), + }, + None => Vec::new(), + } +} + +/// Initialise the content store from a pre-loaded map (used during DB startup). +pub(super) fn init_content_store(map: HashMap) { + let _ = CONTENT_STORE.set(Mutex::new(map)); +} diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 7d876045..518a5143 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -14,417 +14,23 @@ /// /// On startup, existing content is loaded from the database into memory so /// no filesystem scan is needed after migration. -use crate::io::story_metadata::parse_front_matter; -use crate::slog; -use sqlx::SqlitePool; -use sqlx::sqlite::SqliteConnectOptions; -use std::collections::HashMap; -use std::path::Path; -use std::sync::{Mutex, OnceLock}; -use tokio::sync::mpsc; +pub mod content_store; +/// Write operations for the pipeline — content, stage transitions, and deletions. +pub mod ops; +/// Background shadow-write task — persists pipeline items to SQLite asynchronously. +pub mod shadow_write; -/// A pending shadow write for one pipeline item. -struct PipelineWriteMsg { - story_id: String, - stage: String, - name: Option, - agent: Option, - retry_count: Option, - blocked: Option, - depends_on: Option, - content: Option, -} - -/// Handle to the background shadow-write task. -pub struct PipelineDb { - tx: mpsc::UnboundedSender, -} - -static PIPELINE_DB: OnceLock = OnceLock::new(); - -// ── In-memory content store ───────────────────────────────────────── - -static CONTENT_STORE: OnceLock>> = OnceLock::new(); +pub use content_store::{all_content_ids, delete_content, read_content, write_content}; +pub use ops::{delete_item, move_item_stage, next_item_number, write_item_with_content}; +pub use shadow_write::init; #[cfg(test)] -thread_local! { - static CONTENT_STORE_TL: OnceLock>> = const { OnceLock::new() }; -} - -#[cfg(not(test))] -fn get_content_store() -> Option<&'static Mutex>> { - CONTENT_STORE.get() -} - -#[cfg(test)] -fn get_content_store() -> Option<&'static Mutex>> { - let tl = CONTENT_STORE_TL.with(|lock| { - if lock.get().is_some() { - Some(lock as *const OnceLock>>) - } else { - None - } - }); - if let Some(ptr) = tl { - // SAFETY: The thread-local lives as long as the thread, which outlives - // any test using it. We only need 'static for the return type. - let lock = unsafe { &*ptr }; - lock.get() - } else { - CONTENT_STORE.get() - } -} - -/// Read the full markdown content of a story from the in-memory store. -pub fn read_content(story_id: &str) -> Option { - let store = get_content_store()?; - let map = store.lock().ok()?; - map.get(story_id).cloned() -} - -/// Write (or overwrite) the full markdown content of a story. -/// -/// Updates the in-memory store immediately. -pub fn write_content(story_id: &str, content: &str) { - if let Some(store) = get_content_store() - && let Ok(mut map) = store.lock() - { - map.insert(story_id.to_string(), content.to_string()); - } -} - -/// Remove a story's content from the in-memory store. -pub fn delete_content(story_id: &str) { - if let Some(store) = get_content_store() - && let Ok(mut map) = store.lock() - { - map.remove(story_id); - } -} - -/// Ensure the in-memory content store is initialised. -/// -/// Safe to call multiple times — the `OnceLock` is set at most once. -pub fn ensure_content_store() { - #[cfg(not(test))] - { - let _ = CONTENT_STORE.set(Mutex::new(HashMap::new())); - } - - #[cfg(test)] - { - CONTENT_STORE_TL.with(|lock| { - if lock.get().is_none() { - let _ = lock.set(Mutex::new(HashMap::new())); - } - }); - crate::crdt_state::init_for_test(); - } -} - -/// Return all story IDs present in the content store. -pub fn all_content_ids() -> Vec { - match get_content_store() { - Some(store) => match store.lock() { - Ok(map) => map.keys().cloned().collect(), - Err(_) => Vec::new(), - }, - None => Vec::new(), - } -} - -// ── Initialisation ────────────────────────────────────────────────── - -/// Initialise the pipeline database. -/// -/// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations, -/// loads existing story content into the in-memory store, and spawns the -/// background write task. Safe to call only once; subsequent calls are no-ops. -pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { - if PIPELINE_DB.get().is_some() { - return Ok(()); - } - - let options = SqliteConnectOptions::new() - .filename(db_path) - .create_if_missing(true); - - let pool = SqlitePool::connect_with(options).await?; - sqlx::migrate!("./migrations").run(&pool).await?; - - // Load existing content into the in-memory store. - let rows: Vec<(String, Option)> = - sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL") - .fetch_all(&pool) - .await?; - - let mut content_map = HashMap::new(); - for (id, content) in rows { - if let Some(c) = content { - content_map.insert(id, c); - } - } - let _ = CONTENT_STORE.set(Mutex::new(content_map)); - - let (tx, mut rx) = mpsc::unbounded_channel::(); - - tokio::spawn(async move { - while let Some(msg) = rx.recv().await { - // The "deleted" sentinel means the caller wants the row gone. - // Issue a real DELETE so the shadow table stays clean and - // sync_crdt_stages_from_db cannot resurrect a tombstoned item on - // the next restart. - if msg.stage == "deleted" { - let result = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") - .bind(&msg.story_id) - .execute(&pool) - .await; - if let Err(e) = result { - slog!("[db] Shadow delete failed for '{}': {e}", msg.story_id); - } - continue; - } - - let now = chrono::Utc::now().to_rfc3339(); - let result = sqlx::query( - "INSERT INTO pipeline_items \ - (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \ - ON CONFLICT(id) DO UPDATE SET \ - name = excluded.name, \ - stage = excluded.stage, \ - agent = excluded.agent, \ - retry_count = excluded.retry_count, \ - blocked = excluded.blocked, \ - depends_on = excluded.depends_on, \ - content = COALESCE(excluded.content, pipeline_items.content), \ - updated_at = excluded.updated_at", - ) - .bind(&msg.story_id) - .bind(&msg.name) - .bind(&msg.stage) - .bind(&msg.agent) - .bind(msg.retry_count) - .bind(msg.blocked.map(|b| b as i64)) - .bind(&msg.depends_on) - .bind(&msg.content) - .bind(&now) - .execute(&pool) - .await; - - if let Err(e) = result { - slog!("[db] Shadow write failed for '{}': {e}", msg.story_id); - } - } - }); - - let _ = PIPELINE_DB.set(PipelineDb { tx }); - Ok(()) -} - -// ── Write path ────────────────────────────────────────────────────── - -/// Write a pipeline item from in-memory content (no filesystem access). -/// -/// This is the primary write path for the DB-backed pipeline. It updates -/// the CRDT, the in-memory content store, and the SQLite shadow table. -pub fn write_item_with_content(story_id: &str, stage: &str, content: &str) { - let (name, agent, retry_count, blocked, depends_on) = match parse_front_matter(content) { - Ok(meta) => ( - meta.name, - meta.agent, - meta.retry_count.map(|r| r as i64), - meta.blocked, - meta.depends_on - .as_ref() - .and_then(|d| serde_json::to_string(d).ok()), - ), - Err(_) => (None, None, None, None, None), - }; - - // Update in-memory content store. - ensure_content_store(); - write_content(story_id, content); - - // Primary: CRDT ops. - let merged_at_ts = if crate::pipeline_state::Stage::from_dir(stage) - .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) - { - Some(chrono::Utc::now().timestamp() as f64) - } else { - None - }; - crate::crdt_state::write_item( - story_id, - stage, - name.as_deref(), - agent.as_deref(), - retry_count, - blocked, - depends_on.as_deref(), - None, - None, - merged_at_ts, - ); - - // Shadow: pipeline_items table (only when DB is initialised). - if let Some(db) = PIPELINE_DB.get() { - let msg = PipelineWriteMsg { - story_id: story_id.to_string(), - stage: stage.to_string(), - name, - agent, - retry_count, - blocked, - depends_on, - content: Some(content.to_string()), - }; - let _ = db.tx.send(msg); - } -} - -/// Update only the stage of an existing item (used by move operations). -/// -/// Reads current content from the in-memory store, updates the CRDT stage, -/// and persists the change. Optionally modifies the content (e.g. to clear -/// front-matter fields). -pub fn move_item_stage( - story_id: &str, - new_stage: &str, - content_transform: Option<&dyn Fn(&str) -> String>, -) { - let current_content = read_content(story_id); - - let content = match (¤t_content, content_transform) { - (Some(c), Some(transform)) => { - let new_content = transform(c); - write_content(story_id, &new_content); - Some(new_content) - } - (Some(c), None) => Some(c.clone()), - _ => None, - }; - - let (name, agent, _ignored_retry_count, blocked, depends_on) = content - .as_deref() - .or(current_content.as_deref()) - .and_then(|c| parse_front_matter(c).ok()) - .map(|meta| { - ( - meta.name, - meta.agent, - meta.retry_count.map(|r| r as i64), - meta.blocked, - meta.depends_on - .as_ref() - .and_then(|d| serde_json::to_string(d).ok()), - ) - }) - .unwrap_or((None, None, None, None, None)); - - // CRDT stage transition. - let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage) - .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) - { - Some(chrono::Utc::now().timestamp() as f64) - } else { - None - }; - crate::crdt_state::write_item( - story_id, - new_stage, - name.as_deref(), - agent.as_deref(), - None, - blocked, - depends_on.as_deref(), - None, - None, - merged_at_ts, - ); - // Bug 780: stage transitions reset retry_count to 0. retry_count tracks - // attempts at THIS stage's work (coding, merging, qa); a fresh attempt at - // a new stage is conceptually distinct from prior attempts at a different - // stage. `blocked` is preserved — that's a human-set signal that survives - // transitions. - crate::crdt_state::set_retry_count(story_id, 0); - - // Shadow table — always reset retry_count to 0 on stage transition. - let retry_count: Option = Some(0); - if let Some(db) = PIPELINE_DB.get() { - let msg = PipelineWriteMsg { - story_id: story_id.to_string(), - stage: new_stage.to_string(), - name, - agent, - retry_count, - blocked, - depends_on, - content, - }; - let _ = db.tx.send(msg); - } -} - -/// Delete a story from the shadow table (fire-and-forget). -pub fn delete_item(story_id: &str) { - delete_content(story_id); - - if let Some(db) = PIPELINE_DB.get() { - // Reuse the channel with a special "deleted" stage marker. - // The background task will handle it. - // Actually, we send a delete message by abusing the write — we'll - // just remove it by setting stage to "deleted". - let msg = PipelineWriteMsg { - story_id: story_id.to_string(), - stage: "deleted".to_string(), - name: None, - agent: None, - retry_count: None, - blocked: None, - depends_on: None, - content: None, - }; - let _ = db.tx.send(msg); - } -} - -/// Get the next available item number by scanning both the CRDT state -/// and the in-memory content store for the highest existing number. -pub fn next_item_number() -> u32 { - let mut max_num: u32 = 0; - - // Scan CRDT items via typed projection. - for item in crate::pipeline_state::read_all_typed() { - let num_str: String = item - .story_id - .0 - .chars() - .take_while(|c| c.is_ascii_digit()) - .collect(); - if let Ok(n) = num_str.parse::() - && n > max_num - { - max_num = n; - } - } - - // Also scan the content store (might have items not yet in CRDT). - for id in all_content_ids() { - let num_str: String = id.chars().take_while(|c| c.is_ascii_digit()).collect(); - if let Ok(n) = num_str.parse::() - && n > max_num - { - max_num = n; - } - } - - max_num + 1 -} +pub use content_store::ensure_content_store; #[cfg(test)] mod tests { use super::*; + use crate::io::story_metadata::parse_front_matter; use std::fs; /// Helper: write a minimal story .md file with front matter. @@ -439,10 +45,10 @@ mod tests { // Initialise the DB in an isolated pool (not the global singleton, to // keep tests hermetic). - let options = SqliteConnectOptions::new() + let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); + let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Write a story file in a temp stage dir. @@ -520,10 +126,10 @@ mod tests { async fn pipeline_items_table_has_content_column() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); - let options = SqliteConnectOptions::new() + let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); + let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Verify content column exists by inserting a full row. @@ -560,10 +166,10 @@ mod tests { async fn upsert_updates_stage_on_move() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); - let options = SqliteConnectOptions::new() + let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); + let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); @@ -664,10 +270,10 @@ mod tests { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); - let options = SqliteConnectOptions::new() + let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); - let pool = SqlitePool::connect_with(options).await.unwrap(); + let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); diff --git a/server/src/db/ops.rs b/server/src/db/ops.rs new file mode 100644 index 00000000..b765224e --- /dev/null +++ b/server/src/db/ops.rs @@ -0,0 +1,208 @@ +//! Write operations for the pipeline — content, stage transitions, and deletions. +//! +//! Each function updates three layers atomically in order: the in-memory +//! content store, the CRDT (source of truth for metadata), and the SQLite +//! shadow table (via the background channel). +use super::content_store::{ + all_content_ids, delete_content, ensure_content_store, read_content, write_content, +}; +use super::shadow_write::{PIPELINE_DB, PipelineWriteMsg}; +use crate::io::story_metadata::parse_front_matter; + +/// Write a pipeline item from in-memory content (no filesystem access). +/// +/// This is the primary write path for the DB-backed pipeline. It updates +/// the CRDT, the in-memory content store, and the SQLite shadow table. +pub fn write_item_with_content(story_id: &str, stage: &str, content: &str) { + let (name, agent, retry_count, blocked, depends_on) = match parse_front_matter(content) { + Ok(meta) => ( + meta.name, + meta.agent, + meta.retry_count.map(|r| r as i64), + meta.blocked, + meta.depends_on + .as_ref() + .and_then(|d| serde_json::to_string(d).ok()), + ), + Err(_) => (None, None, None, None, None), + }; + + // Update in-memory content store. + ensure_content_store(); + write_content(story_id, content); + + // Primary: CRDT ops. + let merged_at_ts = if crate::pipeline_state::Stage::from_dir(stage) + .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) + { + Some(chrono::Utc::now().timestamp() as f64) + } else { + None + }; + crate::crdt_state::write_item( + story_id, + stage, + name.as_deref(), + agent.as_deref(), + retry_count, + blocked, + depends_on.as_deref(), + None, + None, + merged_at_ts, + ); + + // Shadow: pipeline_items table (only when DB is initialised). + if let Some(db) = PIPELINE_DB.get() { + let msg = PipelineWriteMsg { + story_id: story_id.to_string(), + stage: stage.to_string(), + name, + agent, + retry_count, + blocked, + depends_on, + content: Some(content.to_string()), + }; + let _ = db.tx.send(msg); + } +} + +/// Update only the stage of an existing item (used by move operations). +/// +/// Reads current content from the in-memory store, updates the CRDT stage, +/// and persists the change. Optionally modifies the content (e.g. to clear +/// front-matter fields). +pub fn move_item_stage( + story_id: &str, + new_stage: &str, + content_transform: Option<&dyn Fn(&str) -> String>, +) { + let current_content = read_content(story_id); + + let content = match (¤t_content, content_transform) { + (Some(c), Some(transform)) => { + let new_content = transform(c); + write_content(story_id, &new_content); + Some(new_content) + } + (Some(c), None) => Some(c.clone()), + _ => None, + }; + + let (name, agent, _ignored_retry_count, blocked, depends_on) = content + .as_deref() + .or(current_content.as_deref()) + .and_then(|c| parse_front_matter(c).ok()) + .map(|meta| { + ( + meta.name, + meta.agent, + meta.retry_count.map(|r| r as i64), + meta.blocked, + meta.depends_on + .as_ref() + .and_then(|d| serde_json::to_string(d).ok()), + ) + }) + .unwrap_or((None, None, None, None, None)); + + // CRDT stage transition. + let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage) + .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) + { + Some(chrono::Utc::now().timestamp() as f64) + } else { + None + }; + crate::crdt_state::write_item( + story_id, + new_stage, + name.as_deref(), + agent.as_deref(), + None, + blocked, + depends_on.as_deref(), + None, + None, + merged_at_ts, + ); + // Bug 780: stage transitions reset retry_count to 0. retry_count tracks + // attempts at THIS stage's work (coding, merging, qa); a fresh attempt at + // a new stage is conceptually distinct from prior attempts at a different + // stage. `blocked` is preserved — that's a human-set signal that survives + // transitions. + crate::crdt_state::set_retry_count(story_id, 0); + + // Shadow table — always reset retry_count to 0 on stage transition. + let retry_count: Option = Some(0); + if let Some(db) = PIPELINE_DB.get() { + let msg = PipelineWriteMsg { + story_id: story_id.to_string(), + stage: new_stage.to_string(), + name, + agent, + retry_count, + blocked, + depends_on, + content, + }; + let _ = db.tx.send(msg); + } +} + +/// Delete a story from the shadow table (fire-and-forget). +pub fn delete_item(story_id: &str) { + delete_content(story_id); + + if let Some(db) = PIPELINE_DB.get() { + // Reuse the channel with a special "deleted" stage marker. + // The background task will handle it. + // Actually, we send a delete message by abusing the write — we'll + // just remove it by setting stage to "deleted". + let msg = PipelineWriteMsg { + story_id: story_id.to_string(), + stage: "deleted".to_string(), + name: None, + agent: None, + retry_count: None, + blocked: None, + depends_on: None, + content: None, + }; + let _ = db.tx.send(msg); + } +} + +/// Get the next available item number by scanning both the CRDT state +/// and the in-memory content store for the highest existing number. +pub fn next_item_number() -> u32 { + let mut max_num: u32 = 0; + + // Scan CRDT items via typed projection. + for item in crate::pipeline_state::read_all_typed() { + let num_str: String = item + .story_id + .0 + .chars() + .take_while(|c| c.is_ascii_digit()) + .collect(); + if let Ok(n) = num_str.parse::() + && n > max_num + { + max_num = n; + } + } + + // Also scan the content store (might have items not yet in CRDT). + for id in all_content_ids() { + let num_str: String = id.chars().take_while(|c| c.is_ascii_digit()).collect(); + if let Ok(n) = num_str.parse::() + && n > max_num + { + max_num = n; + } + } + + max_num + 1 +} diff --git a/server/src/db/shadow_write.rs b/server/src/db/shadow_write.rs new file mode 100644 index 00000000..77a8f7a2 --- /dev/null +++ b/server/src/db/shadow_write.rs @@ -0,0 +1,119 @@ +//! Background shadow-write task — persists pipeline items to SQLite asynchronously. +//! +//! `init` opens the database, runs migrations, loads existing content into the +//! in-memory store, and spawns the write loop. All subsequent writes are sent +//! over an unbounded channel so callers never block on I/O. +use crate::slog; +use sqlx::SqlitePool; +use sqlx::sqlite::SqliteConnectOptions; +use std::collections::HashMap; +use std::path::Path; +use std::sync::OnceLock; +use tokio::sync::mpsc; + +/// A pending shadow write for one pipeline item. +pub(super) struct PipelineWriteMsg { + pub(super) story_id: String, + pub(super) stage: String, + pub(super) name: Option, + pub(super) agent: Option, + pub(super) retry_count: Option, + pub(super) blocked: Option, + pub(super) depends_on: Option, + pub(super) content: Option, +} + +/// Handle to the background shadow-write task. +pub struct PipelineDb { + pub(super) tx: mpsc::UnboundedSender, +} + +/// Process-global handle to the background shadow-write task, set once during `init`. +pub(super) static PIPELINE_DB: OnceLock = OnceLock::new(); + +/// Initialise the pipeline database. +/// +/// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations, +/// loads existing story content into the in-memory store, and spawns the +/// background write task. Safe to call only once; subsequent calls are no-ops. +pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { + if PIPELINE_DB.get().is_some() { + return Ok(()); + } + + let options = SqliteConnectOptions::new() + .filename(db_path) + .create_if_missing(true); + + let pool = SqlitePool::connect_with(options).await?; + sqlx::migrate!("./migrations").run(&pool).await?; + + // Load existing content into the in-memory store. + let rows: Vec<(String, Option)> = + sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL") + .fetch_all(&pool) + .await?; + + let mut content_map = HashMap::new(); + for (id, content) in rows { + if let Some(c) = content { + content_map.insert(id, c); + } + } + super::content_store::init_content_store(content_map); + + let (tx, mut rx) = mpsc::unbounded_channel::(); + + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + // The "deleted" sentinel means the caller wants the row gone. + // Issue a real DELETE so the shadow table stays clean and + // sync_crdt_stages_from_db cannot resurrect a tombstoned item on + // the next restart. + if msg.stage == "deleted" { + let result = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") + .bind(&msg.story_id) + .execute(&pool) + .await; + if let Err(e) = result { + slog!("[db] Shadow delete failed for '{}': {e}", msg.story_id); + } + continue; + } + + let now = chrono::Utc::now().to_rfc3339(); + let result = sqlx::query( + "INSERT INTO pipeline_items \ + (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \ + ON CONFLICT(id) DO UPDATE SET \ + name = excluded.name, \ + stage = excluded.stage, \ + agent = excluded.agent, \ + retry_count = excluded.retry_count, \ + blocked = excluded.blocked, \ + depends_on = excluded.depends_on, \ + content = COALESCE(excluded.content, pipeline_items.content), \ + updated_at = excluded.updated_at", + ) + .bind(&msg.story_id) + .bind(&msg.name) + .bind(&msg.stage) + .bind(&msg.agent) + .bind(msg.retry_count) + .bind(msg.blocked.map(|b| b as i64)) + .bind(&msg.depends_on) + .bind(&msg.content) + .bind(&now) + .execute(&pool) + .await; + + if let Err(e) = result { + slog!("[db] Shadow write failed for '{}': {e}", msg.story_id); + } + } + }); + + let _ = PIPELINE_DB.set(PipelineDb { tx }); + Ok(()) +}