//! SQLite storage layer — content store, shadow writes, and CRDT op persistence. /// SQLite storage layer for pipeline state and story content. /// /// The CRDT layer (`crdt_state`) is the primary source of truth for pipeline /// metadata (stage, name, agent, etc.). This module provides: /// /// 1. **Content store** — an in-memory `HashMap` backed /// by the `pipeline_items.content` column. Provides fast synchronous /// reads for MCP tools and other callers. /// /// 2. **Shadow-write channel** — a fire-and-forget background task that /// upserts `pipeline_items` rows so the database always has a full copy /// of story content plus metadata. /// /// On startup, existing content is loaded from the database into memory so /// no filesystem scan is needed after migration. pub mod content_store; /// Write operations for the pipeline — content, stage transitions, and deletions. pub mod ops; /// Background shadow-write task — persists pipeline items to SQLite asynchronously. pub mod shadow_write; pub use content_store::{all_content_ids, delete_content, read_content, write_content}; pub use ops::{delete_item, move_item_stage, next_item_number, write_item_with_content}; pub use shadow_write::init; #[cfg(test)] pub use content_store::ensure_content_store; #[cfg(test)] mod tests { use super::*; use crate::io::story_metadata::parse_front_matter; use std::fs; /// Helper: write a minimal story .md file with front matter. fn write_story(dir: &std::path::Path, filename: &str, content: &str) { fs::write(dir.join(filename), content).unwrap(); } #[tokio::test] async fn shadow_write_inserts_row_into_sqlite() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); // Initialise the DB in an isolated pool (not the global singleton, to // keep tests hermetic). let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Write a story file in a temp stage dir. let stage_dir = tmp.path().join("2_current"); fs::create_dir_all(&stage_dir).unwrap(); let story_path = stage_dir.join("10_story_shadow_test.md"); write_story( &stage_dir, "10_story_shadow_test.md", "---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\nblocked: false\n---\n# Story\n", ); // Perform the upsert directly (bypass the global singleton). let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ blocked = excluded.blocked, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind("10_story_shadow_test") .bind("Shadow Test") .bind("2_current") .bind("coder-opus") .bind(2_i64) .bind(0_i64) .bind(Option::::None) .bind("---\nname: Shadow Test\n---\n# Story\n") .bind(&now) .execute(&pool) .await .unwrap(); // Query back and verify. let row: (String, Option, String) = sqlx::query_as("SELECT id, name, stage FROM pipeline_items WHERE id = ?1") .bind("10_story_shadow_test") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "10_story_shadow_test"); assert_eq!(row.1.as_deref(), Some("Shadow Test")); assert_eq!(row.2, "2_current"); // Verify metadata was parsed correctly from the story file. let (name, _agent, retry_count, _blocked, _depends_on) = match std::fs::read_to_string(&story_path) { Ok(contents) => match parse_front_matter(&contents) { Ok(meta) => ( meta.name, meta.agent, meta.retry_count.map(|r| r as i64), meta.blocked, meta.depends_on, ), Err(_) => (None, None, None, None, None), }, Err(_) => (None, None, None, None, None), }; assert_eq!(name.as_deref(), Some("Shadow Test")); assert_eq!(retry_count, Some(2)); } #[tokio::test] async fn pipeline_items_table_has_content_column() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Verify content column exists by inserting a full row. let now = chrono::Utc::now().to_rfc3339(); let content = "---\nname: Test\n---\n# Story\n"; sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)", ) .bind("99_story_col_test") .bind(Option::::None) .bind("1_backlog") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(content) .bind(&now) .execute(&pool) .await .unwrap(); let row: (Option,) = sqlx::query_as("SELECT content FROM pipeline_items WHERE id = ?1") .bind("99_story_col_test") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0.as_deref(), Some(content)); } #[tokio::test] async fn upsert_updates_stage_on_move() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); // Insert initial row in backlog. sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)", ) .bind("5_story_move") .bind("Move Me") .bind("1_backlog") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind("---\nname: Move Me\n---\n") .bind(&now) .execute(&pool) .await .unwrap(); // Upsert with new stage (simulating move to current). sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ blocked = excluded.blocked, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind("5_story_move") .bind("Move Me") .bind("2_current") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) // content NULL → COALESCE preserves existing .bind(&now) .execute(&pool) .await .unwrap(); let row: (String,) = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") .bind("5_story_move") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "2_current"); } #[test] fn content_store_read_write_delete() { ensure_content_store(); let story_id = "100_story_content_test"; let markdown = "---\nname: Content Test\n---\n# Story\n"; // Write. write_content(story_id, markdown); assert_eq!(read_content(story_id).as_deref(), Some(markdown)); // Overwrite. let updated = "---\nname: Updated\n---\n# Updated Story\n"; write_content(story_id, updated); assert_eq!(read_content(story_id).as_deref(), Some(updated)); // Delete. delete_content(story_id); assert!(read_content(story_id).is_none()); } #[test] fn next_item_number_returns_1_when_empty() { // When no items exist, should return 1. // Note: in test context the global CRDT/content store may or may not // be initialised, so the function falls back gracefully. let n = next_item_number(); assert!(n >= 1); } /// Regression test for bug 537: `delete_item` must issue a real SQL DELETE /// rather than upserting stage = "deleted". A "deleted" shadow row that /// survives a restart would be picked up by `sync_crdt_stages_from_db` and /// re-inserted into the CRDT with stage "deleted" — resurrecting a /// tombstoned story. #[tokio::test] async fn delete_item_removes_row_not_sets_deleted_stage() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); // Insert a live row. sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, blocked, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?9)", ) .bind("42_story_to_delete") .bind("Delete Me") .bind("2_current") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind("---\nname: Delete Me\n---\n") .bind(&now) .execute(&pool) .await .unwrap(); // Simulate what the background task does when it receives a "deleted" // sentinel message — it must DELETE the row, not upsert it. sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") .bind("42_story_to_delete") .execute(&pool) .await .unwrap(); // The row must be gone — not present with stage = "deleted". let row: Option<(String,)> = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") .bind("42_story_to_delete") .fetch_optional(&pool) .await .unwrap(); assert!( row.is_none(), "delete_item must remove the row; found stage = {:?}", row.map(|r| r.0) ); } /// Bug 780: stage transitions must reset retry_count to 0 in the CRDT. /// Carryover from prior-stage retries was tripping the auto-assigner's /// deterministic-merge skip logic. #[test] fn move_item_stage_resets_retry_count_to_zero() { crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9870_story_780_retry_reset"; // Seed the story in 2_current with retry_count = 3 (a coder that // burned all its retries). crate::crdt_state::write_item( story_id, "2_current", Some("Retry reset test"), None, Some(3), None, None, None, None, None, ); write_content( story_id, "---\nname: Retry reset test\nretry_count: 3\n---\n", ); let typed = crate::pipeline_state::read_typed(story_id) .expect("read should succeed") .expect("story exists in CRDT"); assert_eq!(typed.retry_count, 3); // Promote to 4_merge. retry_count must reset. move_item_stage(story_id, "4_merge", None); let typed_after = crate::pipeline_state::read_typed(story_id) .expect("read should succeed") .expect("story exists in CRDT"); assert_eq!(typed_after.stage.dir_name(), "4_merge"); assert_eq!( typed_after.retry_count, 0, "retry_count must reset to 0 on stage transition" ); } }