//! SQLite storage layer — content store, shadow writes, and CRDT op persistence. /// SQLite storage layer for pipeline state and story content. /// /// The CRDT layer (`crdt_state`) is the primary source of truth for pipeline /// metadata (stage, name, agent, etc.). This module provides: /// /// 1. **Content store** — an in-memory `HashMap` backed /// by the `pipeline_items.content` column. Provides fast synchronous /// reads for MCP tools and other callers. /// /// 2. **Shadow-write channel** — a fire-and-forget background task that /// upserts `pipeline_items` rows so the database always has a full copy /// of story content plus metadata. /// /// On startup, existing content is loaded from the database into memory so /// no filesystem scan is needed after migration. pub mod content_store; /// Content-store garbage collection: TransitionFired subscriber and startup sweep. pub mod gc; /// Write operations for the pipeline — content, stage transitions, and deletions. pub mod ops; /// Recovery for half-written pipeline items (bug 1001 backfill). /// /// Exposed via `diagnostics::tool_find_orphaned_items` and /// `diagnostics::tool_recover_half_written_items` MCP tools. pub mod recover; /// Background shadow-write task — persists pipeline items to SQLite asynchronously. pub mod shadow_write; pub use content_store::{ContentKey, all_content_ids, delete_content, read_content, write_content}; pub use ops::{ItemMeta, delete_item, move_item_stage, next_item_number, write_item_with_content}; pub use shadow_write::init; #[cfg(test)] pub use content_store::ensure_content_store; #[cfg(test)] mod tests { use super::*; use std::fs; /// Helper: write a minimal story .md file with front matter. fn write_story(dir: &std::path::Path, filename: &str, content: &str) { fs::write(dir.join(filename), content).unwrap(); } #[tokio::test] async fn shadow_write_inserts_row_into_sqlite() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); // Initialise the DB in an isolated pool (not the global singleton, to // keep tests hermetic). let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Write a story file in a temp stage dir. let stage_dir = tmp.path().join("2_current"); fs::create_dir_all(&stage_dir).unwrap(); let story_path = stage_dir.join("10_story_shadow_test.md"); write_story( &stage_dir, "10_story_shadow_test.md", "---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\n---\n# Story\n", ); // Perform the upsert directly (bypass the global singleton). let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind("10_story_shadow_test") .bind("Shadow Test") .bind("2_current") .bind("coder-opus") .bind(2_i64) .bind(Option::::None) .bind("---\nname: Shadow Test\n---\n# Story\n") .bind(&now) .execute(&pool) .await .unwrap(); // Query back and verify. let row: (String, Option, String) = sqlx::query_as("SELECT id, name, stage FROM pipeline_items WHERE id = ?1") .bind("10_story_shadow_test") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "10_story_shadow_test"); assert_eq!(row.1.as_deref(), Some("Shadow Test")); assert_eq!(row.2, "2_current"); // The shadow row's name + retry_count came through from the INSERT // params above; that's what the test exercises. Story 929 dropped // the redundant "re-parse the YAML body to double-check" step that // used to live here. let _ = story_path; } #[tokio::test] async fn pipeline_items_table_has_content_column() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Verify content column exists by inserting a full row. let now = chrono::Utc::now().to_rfc3339(); let content = "---\nname: Test\n---\n# Story\n"; sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)", ) .bind("99_story_col_test") .bind(Option::::None) .bind("1_backlog") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(content) .bind(&now) .execute(&pool) .await .unwrap(); let row: (Option,) = sqlx::query_as("SELECT content FROM pipeline_items WHERE id = ?1") .bind("99_story_col_test") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0.as_deref(), Some(content)); } #[tokio::test] async fn upsert_updates_stage_on_move() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); // Insert initial row in backlog. sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)", ) .bind("5_story_move") .bind("Move Me") .bind("1_backlog") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind("---\nname: Move Me\n---\n") .bind(&now) .execute(&pool) .await .unwrap(); // Upsert with new stage (simulating move to current). sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind("5_story_move") .bind("Move Me") .bind("2_current") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) // content NULL → COALESCE preserves existing .bind(&now) .execute(&pool) .await .unwrap(); let row: (String,) = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") .bind("5_story_move") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "2_current"); } #[test] fn content_store_read_write_delete() { ensure_content_store(); let story_id = "100_story_content_test"; let markdown = "---\nname: Content Test\n---\n# Story\n"; // Write. write_content(ContentKey::Story(story_id), markdown); assert_eq!( read_content(ContentKey::Story(story_id)).as_deref(), Some(markdown) ); // Overwrite. let updated = "---\nname: Updated\n---\n# Updated Story\n"; write_content(ContentKey::Story(story_id), updated); assert_eq!( read_content(ContentKey::Story(story_id)).as_deref(), Some(updated) ); // Delete. delete_content(ContentKey::Story(story_id)); assert!(read_content(ContentKey::Story(story_id)).is_none()); } #[test] fn next_item_number_returns_1_when_empty() { // When no items exist, should return 1. // Note: in test context the global CRDT/content store may or may not // be initialised, so the function falls back gracefully. let n = next_item_number(); assert!(n >= 1); } /// Regression test for bug 1001: `next_item_number` must not hand out a /// tombstoned ID. Without this fix, an `evict_item` followed by a fresh /// create reuses the tombstoned numeric slot, producing a half-written /// item (content store + shadow DB accept; CRDT silently rejects). #[test] fn next_item_number_skips_tombstoned_ids() { crate::crdt_state::init_for_test(); ensure_content_store(); // Seed a high-numbered item via the normal write path so it lands in // the CRDT index. let high_id = "9990"; write_item_with_content( high_id, "1_backlog", "---\nname: To Be Tombstoned\n---\n", ItemMeta::named("To Be Tombstoned"), ); // Tombstone it. This adds 9990 to state.tombstones and clears the // content-store row, so without the fix `next_item_number` would // return 9990 again because it's invisible to both visible-index and // content-id scans. crate::crdt_state::evict_item(high_id).expect("evict should succeed"); assert!(crate::crdt_state::is_tombstoned(high_id)); let next = next_item_number(); assert!( next > 9990, "next_item_number must skip past tombstoned id 9990, got {next}" ); } /// is_tombstoned reflects the post-evict state. #[test] fn is_tombstoned_returns_true_after_evict() { crate::crdt_state::init_for_test(); ensure_content_store(); let id = "9991"; write_item_with_content( id, "1_backlog", "---\nname: Soon To Vanish\n---\n", ItemMeta::named("Soon To Vanish"), ); assert!(!crate::crdt_state::is_tombstoned(id)); crate::crdt_state::evict_item(id).expect("evict should succeed"); assert!(crate::crdt_state::is_tombstoned(id)); } /// Regression test for bug 537: `delete_item` must issue a real SQL DELETE /// rather than upserting stage = "deleted". A "deleted" shadow row that /// survives a restart would be picked up by `sync_crdt_stages_from_db` and /// re-inserted into the CRDT with stage "deleted" — resurrecting a /// tombstoned story. #[tokio::test] async fn delete_item_removes_row_not_sets_deleted_stage() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); // Insert a live row. sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)", ) .bind("42_story_to_delete") .bind("Delete Me") .bind("2_current") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind("---\nname: Delete Me\n---\n") .bind(&now) .execute(&pool) .await .unwrap(); // Simulate what the background task does when it receives a "deleted" // sentinel message — it must DELETE the row, not upsert it. sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") .bind("42_story_to_delete") .execute(&pool) .await .unwrap(); // The row must be gone — not present with stage = "deleted". let row: Option<(String,)> = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") .bind("42_story_to_delete") .fetch_optional(&pool) .await .unwrap(); assert!( row.is_none(), "delete_item must remove the row; found stage = {:?}", row.map(|r| r.0) ); } /// Story 864: `write_item_with_content` no longer parses YAML front-matter /// from `content`. The CRDT metadata reflects ONLY what the caller passes /// via `ItemMeta`. This test writes a body without any front-matter at /// all, sets metadata explicitly, and asserts the CRDT picks up the typed /// values, not anything derived from `content`. #[test] fn write_item_typed_meta_takes_precedence_over_content() { crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9864_story_typed_meta"; // Body has NO YAML header — just plain markdown. let content = "# Just a heading\n\nNo front matter here.\n"; let meta = ItemMeta { name: Some("Typed Name".into()), agent: Some("coder-1".into()), retry_count: Some(2), depends_on: Some(vec![100, 200]), }; write_item_with_content(story_id, "2_current", content, meta); let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT"); assert_eq!(view.stage().dir_name(), "coding"); assert_eq!(view.name(), "Typed Name"); assert_eq!(view.agent(), Some(crate::config::AgentName::Coder1)); assert_eq!(view.retry_count(), 2); assert_eq!(view.depends_on(), &[100, 200]); // Content is stored verbatim (no parsing, no rewrite). assert_eq!( read_content(ContentKey::Story(story_id)).as_deref(), Some(content) ); } /// Story 864: passing `ItemMeta::default()` against a content blob that /// LOOKS like front-matter must NOT silently extract metadata into the /// CRDT. The whole point of removing the implicit YAML round-trip is /// that metadata only flows in through the typed `ItemMeta` arg. #[test] fn write_item_default_meta_ignores_yaml_in_content() { crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9864_story_yaml_ignored"; let content = "---\nname: Should Not Appear\nagent: ghost\n---\n# Body\n"; write_item_with_content(story_id, "2_current", content, ItemMeta::default()); // Nameless items are filtered out by read_item (AC 5: nameless = malformed). assert!( crate::crdt_state::read_item(story_id).is_none(), "name must come from typed meta, not parsed YAML — nameless items must not be surfaced" ); } /// Regression (story 894): startup with no YAML migration call leaves /// existing clean content readable and unmodified. Verifies that removing /// `db::yaml_migration::run()` from the startup path does not break reads. #[test] fn startup_reads_clean_content_unchanged() { crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9894_story_clean_content"; // Plain body — no YAML block, representing post-migration state. let body = "# Story Heading\n\nSome content.\n"; write_item_with_content( story_id, "2_current", body, ItemMeta { name: Some("Clean".into()), ..ItemMeta::default() }, ); let read_back = read_content(ContentKey::Story(story_id)).expect("content present"); assert_eq!(read_back, body, "plain content must be readable as-is"); assert!( !read_back.trim_start().starts_with("---"), "no YAML header should appear" ); } /// Bug 780: stage transitions must reset retry_count to 0 in the CRDT. /// Carryover from prior-stage retries was tripping the auto-assigner's /// deterministic-merge skip logic. #[test] fn move_item_stage_resets_retry_count_to_zero() { crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9870_story_780_retry_reset"; // Seed the story in 2_current with retry_count = 3 (a coder that // burned all its retries). crate::crdt_state::write_item_str( story_id, "2_current", Some("Retry reset test"), None, Some(3), None, None, ); write_content( ContentKey::Story(story_id), "---\nname: Retry reset test\nretry_count: 3\n---\n", ); let typed = crate::pipeline_state::read_typed(story_id) .expect("read should succeed") .expect("story exists in CRDT"); assert_eq!(typed.retry_count, 3); // Promote to 4_merge. retry_count must reset. move_item_stage(story_id, "4_merge", None); let typed_after = crate::pipeline_state::read_typed(story_id) .expect("read should succeed") .expect("story exists in CRDT"); assert_eq!(typed_after.stage.dir_name(), "merge"); assert_eq!( typed_after.retry_count, 0, "retry_count must reset to 0 on stage transition" ); } }