//! SQLite storage layer — content store, shadow writes, and CRDT op persistence. /// SQLite storage layer for pipeline state and story content. /// /// The CRDT layer (`crdt_state`) is the primary source of truth for pipeline /// metadata (stage, name, agent, etc.). This module provides: /// /// 1. **Content store** — an in-memory `HashMap` backed /// by the `pipeline_items.content` column. Provides fast synchronous /// reads for MCP tools and other callers. /// /// 2. **Shadow-write channel** — a fire-and-forget background task that /// upserts `pipeline_items` rows so the database always has a full copy /// of story content plus metadata. /// /// On startup, existing content is loaded from the database into memory so /// no filesystem scan is needed after migration. use crate::io::story_metadata::parse_front_matter; use crate::slog; use sqlx::SqlitePool; use sqlx::sqlite::SqliteConnectOptions; use std::collections::HashMap; use std::path::Path; use std::sync::{Mutex, OnceLock}; use tokio::sync::mpsc; /// A pending shadow write for one pipeline item. struct PipelineWriteMsg { story_id: String, stage: String, name: Option, agent: Option, retry_count: Option, blocked: Option, depends_on: Option, content: Option, } /// Handle to the background shadow-write task. pub struct PipelineDb { tx: mpsc::UnboundedSender, } static PIPELINE_DB: OnceLock = OnceLock::new(); // ── In-memory content store ───────────────────────────────────────── static CONTENT_STORE: OnceLock>> = OnceLock::new(); #[cfg(test)] thread_local! { static CONTENT_STORE_TL: OnceLock>> = const { OnceLock::new() }; } #[cfg(not(test))] fn get_content_store() -> Option<&'static Mutex>> { CONTENT_STORE.get() } #[cfg(test)] fn get_content_store() -> Option<&'static Mutex>> { let tl = CONTENT_STORE_TL.with(|lock| { if lock.get().is_some() { Some(lock as *const OnceLock>>) } else { None } }); if let Some(ptr) = tl { // SAFETY: The thread-local lives as long as the thread, which outlives // any test using it. We only need 'static for the return type. let lock = unsafe { &*ptr }; lock.get() } else { CONTENT_STORE.get() } } /// Read the full markdown content of a story from the in-memory store. pub fn read_content(story_id: &str) -> Option { let store = get_content_store()?; let map = store.lock().ok()?; map.get(story_id).cloned() } /// Write (or overwrite) the full markdown content of a story. /// /// Updates the in-memory store immediately. pub fn write_content(story_id: &str, content: &str) { if let Some(store) = get_content_store() && let Ok(mut map) = store.lock() { map.insert(story_id.to_string(), content.to_string()); } } /// Remove a story's content from the in-memory store. pub fn delete_content(story_id: &str) { if let Some(store) = get_content_store() && let Ok(mut map) = store.lock() { map.remove(story_id); } } /// Ensure the in-memory content store is initialised. /// /// Safe to call multiple times — the `OnceLock` is set at most once. pub fn ensure_content_store() { #[cfg(not(test))] { let _ = CONTENT_STORE.set(Mutex::new(HashMap::new())); } #[cfg(test)] { CONTENT_STORE_TL.with(|lock| { if lock.get().is_none() { let _ = lock.set(Mutex::new(HashMap::new())); } }); crate::crdt_state::init_for_test(); } } /// Return all story IDs present in the content store. pub fn all_content_ids() -> Vec { match get_content_store() { Some(store) => match store.lock() { Ok(map) => map.keys().cloned().collect(), Err(_) => Vec::new(), }, None => Vec::new(), } } // ── Initialisation ────────────────────────────────────────────────── /// Initialise the pipeline database. /// /// Opens (or creates) the SQLite file at `db_path`, runs embedded migrations, /// loads existing story content into the in-memory store, and spawns the /// background write task. Safe to call only once; subsequent calls are no-ops. pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { if PIPELINE_DB.get().is_some() { return Ok(()); } let options = SqliteConnectOptions::new() .filename(db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await?; sqlx::migrate!("./migrations").run(&pool).await?; // Load existing content into the in-memory store. let rows: Vec<(String, Option)> = sqlx::query_as("SELECT id, content FROM pipeline_items WHERE content IS NOT NULL") .fetch_all(&pool) .await?; let mut content_map = HashMap::new(); for (id, content) in rows { if let Some(c) = content { content_map.insert(id, c); } } let _ = CONTENT_STORE.set(Mutex::new(content_map)); let (tx, mut rx) = mpsc::unbounded_channel::(); tokio::spawn(async move { while let Some(msg) = rx.recv().await { // The "deleted" sentinel means the caller wants the row gone. // Issue a real DELETE so the shadow table stays clean and // sync_crdt_stages_from_db cannot resurrect a tombstoned item on // the next restart. if msg.stage == "deleted" { let result = sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") .bind(&msg.story_id) .execute(&pool) .await; if let Err(e) = result { slog!("[db] Shadow delete failed for '{}': {e}", msg.story_id); } continue; } let now = chrono::Utc::now().to_rfc3339(); let result = sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ blocked = excluded.blocked, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind(&msg.story_id) .bind(&msg.name) .bind(&msg.stage) .bind(&msg.agent) .bind(msg.retry_count) .bind(msg.blocked.map(|b| b as i64)) .bind(&msg.depends_on) .bind(&msg.content) .bind(&now) .execute(&pool) .await; if let Err(e) = result { slog!("[db] Shadow write failed for '{}': {e}", msg.story_id); } } }); let _ = PIPELINE_DB.set(PipelineDb { tx }); Ok(()) } // ── Write path ────────────────────────────────────────────────────── /// Write a pipeline item from in-memory content (no filesystem access). /// /// This is the primary write path for the DB-backed pipeline. It updates /// the CRDT, the in-memory content store, and the SQLite shadow table. pub fn write_item_with_content(story_id: &str, stage: &str, content: &str) { let (name, agent, retry_count, blocked, depends_on) = match parse_front_matter(content) { Ok(meta) => ( meta.name, meta.agent, meta.retry_count.map(|r| r as i64), meta.blocked, meta.depends_on .as_ref() .and_then(|d| serde_json::to_string(d).ok()), ), Err(_) => (None, None, None, None, None), }; // Update in-memory content store. ensure_content_store(); write_content(story_id, content); // Primary: CRDT ops. let merged_at_ts = if crate::pipeline_state::Stage::from_dir(stage) .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) { Some(chrono::Utc::now().timestamp() as f64) } else { None }; crate::crdt_state::write_item( story_id, stage, name.as_deref(), agent.as_deref(), retry_count, blocked, depends_on.as_deref(), None, None, merged_at_ts, ); // Shadow: pipeline_items table (only when DB is initialised). if let Some(db) = PIPELINE_DB.get() { let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: stage.to_string(), name, agent, retry_count, blocked, depends_on, content: Some(content.to_string()), }; let _ = db.tx.send(msg); } } /// Update only the stage of an existing item (used by move operations). /// /// Reads current content from the in-memory store, updates the CRDT stage, /// and persists the change. Optionally modifies the content (e.g. to clear /// front-matter fields). pub fn move_item_stage( story_id: &str, new_stage: &str, content_transform: Option<&dyn Fn(&str) -> String>, ) { let current_content = read_content(story_id); let content = match (¤t_content, content_transform) { (Some(c), Some(transform)) => { let new_content = transform(c); write_content(story_id, &new_content); Some(new_content) } (Some(c), None) => Some(c.clone()), _ => None, }; let (name, agent, retry_count, blocked, depends_on) = content .as_deref() .or(current_content.as_deref()) .and_then(|c| parse_front_matter(c).ok()) .map(|meta| { ( meta.name, meta.agent, meta.retry_count.map(|r| r as i64), meta.blocked, meta.depends_on .as_ref() .and_then(|d| serde_json::to_string(d).ok()), ) }) .unwrap_or((None, None, None, None, None)); // CRDT stage transition. let merged_at_ts = if crate::pipeline_state::Stage::from_dir(new_stage) .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Done { .. })) { Some(chrono::Utc::now().timestamp() as f64) } else { None }; crate::crdt_state::write_item( story_id, new_stage, name.as_deref(), agent.as_deref(), retry_count, blocked, depends_on.as_deref(), None, None, merged_at_ts, ); // Shadow table. if let Some(db) = PIPELINE_DB.get() { let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: new_stage.to_string(), name, agent, retry_count, blocked, depends_on, content, }; let _ = db.tx.send(msg); } } /// Delete a story from the shadow table (fire-and-forget). pub fn delete_item(story_id: &str) { delete_content(story_id); if let Some(db) = PIPELINE_DB.get() { // Reuse the channel with a special "deleted" stage marker. // The background task will handle it. // Actually, we send a delete message by abusing the write — we'll // just remove it by setting stage to "deleted". let msg = PipelineWriteMsg { story_id: story_id.to_string(), stage: "deleted".to_string(), name: None, agent: None, retry_count: None, blocked: None, depends_on: None, content: None, }; let _ = db.tx.send(msg); } } /// Get the next available item number by scanning both the CRDT state /// and the in-memory content store for the highest existing number. pub fn next_item_number() -> u32 { let mut max_num: u32 = 0; // Scan CRDT items via typed projection. for item in crate::pipeline_state::read_all_typed() { let num_str: String = item .story_id .0 .chars() .take_while(|c| c.is_ascii_digit()) .collect(); if let Ok(n) = num_str.parse::() && n > max_num { max_num = n; } } // Also scan the content store (might have items not yet in CRDT). for id in all_content_ids() { let num_str: String = id.chars().take_while(|c| c.is_ascii_digit()).collect(); if let Ok(n) = num_str.parse::() && n > max_num { max_num = n; } } max_num + 1 } #[cfg(test)] mod tests { use super::*; use std::fs; /// Helper: write a minimal story .md file with front matter. fn write_story(dir: &std::path::Path, filename: &str, content: &str) { fs::write(dir.join(filename), content).unwrap(); } #[tokio::test] async fn shadow_write_inserts_row_into_sqlite() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); // Initialise the DB in an isolated pool (not the global singleton, to // keep tests hermetic). let options = SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Write a story file in a temp stage dir. let stage_dir = tmp.path().join("2_current"); fs::create_dir_all(&stage_dir).unwrap(); let story_path = stage_dir.join("10_story_shadow_test.md"); write_story( &stage_dir, "10_story_shadow_test.md", "---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\nblocked: false\n---\n# Story\n", ); // Perform the upsert directly (bypass the global singleton). let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ blocked = excluded.blocked, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind("10_story_shadow_test") .bind("Shadow Test") .bind("2_current") .bind("coder-opus") .bind(2_i64) .bind(0_i64) .bind(Option::::None) .bind("---\nname: Shadow Test\n---\n# Story\n") .bind(&now) .execute(&pool) .await .unwrap(); // Query back and verify. let row: (String, Option, String) = sqlx::query_as("SELECT id, name, stage FROM pipeline_items WHERE id = ?1") .bind("10_story_shadow_test") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "10_story_shadow_test"); assert_eq!(row.1.as_deref(), Some("Shadow Test")); assert_eq!(row.2, "2_current"); // Verify metadata was parsed correctly from the story file. let (name, _agent, retry_count, _blocked, _depends_on) = match std::fs::read_to_string(&story_path) { Ok(contents) => match parse_front_matter(&contents) { Ok(meta) => ( meta.name, meta.agent, meta.retry_count.map(|r| r as i64), meta.blocked, meta.depends_on, ), Err(_) => (None, None, None, None, None), }, Err(_) => (None, None, None, None, None), }; assert_eq!(name.as_deref(), Some("Shadow Test")); assert_eq!(retry_count, Some(2)); } #[tokio::test] async fn pipeline_items_table_has_content_column() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Verify content column exists by inserting a full row. let now = chrono::Utc::now().to_rfc3339(); let content = "---\nname: Test\n---\n# Story\n"; sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)", ) .bind("99_story_col_test") .bind(Option::::None) .bind("1_backlog") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(content) .bind(&now) .execute(&pool) .await .unwrap(); let row: (Option,) = sqlx::query_as("SELECT content FROM pipeline_items WHERE id = ?1") .bind("99_story_col_test") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0.as_deref(), Some(content)); } #[tokio::test] async fn upsert_updates_stage_on_move() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); // Insert initial row in backlog. sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)", ) .bind("5_story_move") .bind("Move Me") .bind("1_backlog") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind("---\nname: Move Me\n---\n") .bind(&now) .execute(&pool) .await .unwrap(); // Upsert with new stage (simulating move to current). sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ blocked = excluded.blocked, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind("5_story_move") .bind("Move Me") .bind("2_current") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) // content NULL → COALESCE preserves existing .bind(&now) .execute(&pool) .await .unwrap(); let row: (String,) = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") .bind("5_story_move") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "2_current"); } #[test] fn content_store_read_write_delete() { ensure_content_store(); let story_id = "100_story_content_test"; let markdown = "---\nname: Content Test\n---\n# Story\n"; // Write. write_content(story_id, markdown); assert_eq!(read_content(story_id).as_deref(), Some(markdown)); // Overwrite. let updated = "---\nname: Updated\n---\n# Updated Story\n"; write_content(story_id, updated); assert_eq!(read_content(story_id).as_deref(), Some(updated)); // Delete. delete_content(story_id); assert!(read_content(story_id).is_none()); } #[test] fn next_item_number_returns_1_when_empty() { // When no items exist, should return 1. // Note: in test context the global CRDT/content store may or may not // be initialised, so the function falls back gracefully. let n = next_item_number(); assert!(n >= 1); } /// Regression test for bug 537: `delete_item` must issue a real SQL DELETE /// rather than upserting stage = "deleted". A "deleted" shadow row that /// survives a restart would be picked up by `sync_crdt_stages_from_db` and /// re-inserted into the CRDT with stage "deleted" — resurrecting a /// tombstoned story. #[tokio::test] async fn delete_item_removes_row_not_sets_deleted_stage() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); // Insert a live row. sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)", ) .bind("42_story_to_delete") .bind("Delete Me") .bind("2_current") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind("---\nname: Delete Me\n---\n") .bind(&now) .execute(&pool) .await .unwrap(); // Simulate what the background task does when it receives a "deleted" // sentinel message — it must DELETE the row, not upsert it. sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") .bind("42_story_to_delete") .execute(&pool) .await .unwrap(); // The row must be gone — not present with stage = "deleted". let row: Option<(String,)> = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") .bind("42_story_to_delete") .fetch_optional(&pool) .await .unwrap(); assert!( row.is_none(), "delete_item must remove the row; found stage = {:?}", row.map(|r| r.0) ); } }