//! SQLite storage layer — content store, shadow writes, and CRDT op persistence. /// SQLite storage layer for pipeline state and story content. /// /// The CRDT layer (`crdt_state`) is the primary source of truth for pipeline /// metadata (stage, name, agent, etc.). This module provides: /// /// 1. **Content store** — an in-memory `HashMap` backed /// by the `pipeline_items.content` column. Provides fast synchronous /// reads for MCP tools and other callers. /// /// 2. **Shadow-write channel** — a fire-and-forget background task that /// upserts `pipeline_items` rows so the database always has a full copy /// of story content plus metadata. /// /// On startup, existing content is loaded from the database into memory so /// no filesystem scan is needed after migration. pub mod content_store; /// Content-store garbage collection: TransitionFired subscriber and startup sweep. pub mod gc; /// Write operations for the pipeline — content, stage transitions, and deletions. pub mod ops; /// Recovery for half-written pipeline items (bug 1001 backfill). /// /// Exposed via `diagnostics::tool_find_orphaned_items` and /// `diagnostics::tool_recover_half_written_items` MCP tools. pub mod recover; /// Background shadow-write task — persists pipeline items to SQLite asynchronously. pub mod shadow_write; pub use content_store::{ContentKey, all_content_ids, delete_content, read_content, write_content}; pub use ops::{ItemMeta, delete_item, move_item_stage, next_item_number, write_item_with_content}; pub use shadow_write::{check_schema_drift, get_shared_pool, init}; #[cfg(test)] pub use content_store::ensure_content_store; #[cfg(test)] mod tests { use super::*; use std::fs; /// Helper: write a minimal story .md file with front matter. fn write_story(dir: &std::path::Path, filename: &str, content: &str) { fs::write(dir.join(filename), content).unwrap(); } #[tokio::test] async fn shadow_write_inserts_row_into_sqlite() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); // Initialise the DB in an isolated pool (not the global singleton, to // keep tests hermetic). let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Write a story file in a temp stage dir. let stage_dir = tmp.path().join("2_current"); fs::create_dir_all(&stage_dir).unwrap(); let story_path = stage_dir.join("10_story_shadow_test.md"); write_story( &stage_dir, "10_story_shadow_test.md", "---\nname: Shadow Test\nagent: coder-opus\nretry_count: 2\n---\n# Story\n", ); // Perform the upsert directly (bypass the global singleton). let now = chrono::Utc::now().to_rfc3339(); sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind("10_story_shadow_test") .bind("Shadow Test") .bind("2_current") .bind("coder-opus") .bind(2_i64) .bind(Option::::None) .bind("---\nname: Shadow Test\n---\n# Story\n") .bind(&now) .execute(&pool) .await .unwrap(); // Query back and verify. let row: (String, Option, String) = sqlx::query_as("SELECT id, name, stage FROM pipeline_items WHERE id = ?1") .bind("10_story_shadow_test") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "10_story_shadow_test"); assert_eq!(row.1.as_deref(), Some("Shadow Test")); assert_eq!(row.2, "2_current"); // The shadow row's name + retry_count came through from the INSERT // params above; that's what the test exercises. Story 929 dropped // the redundant "re-parse the YAML body to double-check" step that // used to live here. let _ = story_path; } #[tokio::test] async fn pipeline_items_table_has_content_column() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Verify content column exists by inserting a full row. let now = chrono::Utc::now().to_rfc3339(); let content = "---\nname: Test\n---\n# Story\n"; sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)", ) .bind("99_story_col_test") .bind(Option::::None) .bind("1_backlog") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(content) .bind(&now) .execute(&pool) .await .unwrap(); let row: (Option,) = sqlx::query_as("SELECT content FROM pipeline_items WHERE id = ?1") .bind("99_story_col_test") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0.as_deref(), Some(content)); } #[tokio::test] async fn upsert_updates_stage_on_move() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); // Insert initial row in backlog. sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)", ) .bind("5_story_move") .bind("Move Me") .bind("1_backlog") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind("---\nname: Move Me\n---\n") .bind(&now) .execute(&pool) .await .unwrap(); // Upsert with new stage (simulating move to current). sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8) \ ON CONFLICT(id) DO UPDATE SET \ name = excluded.name, \ stage = excluded.stage, \ agent = excluded.agent, \ retry_count = excluded.retry_count, \ depends_on = excluded.depends_on, \ content = COALESCE(excluded.content, pipeline_items.content), \ updated_at = excluded.updated_at", ) .bind("5_story_move") .bind("Move Me") .bind("2_current") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) // content NULL → COALESCE preserves existing .bind(&now) .execute(&pool) .await .unwrap(); let row: (String,) = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") .bind("5_story_move") .fetch_one(&pool) .await .unwrap(); assert_eq!(row.0, "2_current"); } #[test] fn content_store_read_write_delete() { ensure_content_store(); let story_id = "100_story_content_test"; let markdown = "---\nname: Content Test\n---\n# Story\n"; // Write. write_content(ContentKey::Story(story_id), markdown); assert_eq!( read_content(ContentKey::Story(story_id)).as_deref(), Some(markdown) ); // Overwrite. let updated = "---\nname: Updated\n---\n# Updated Story\n"; write_content(ContentKey::Story(story_id), updated); assert_eq!( read_content(ContentKey::Story(story_id)).as_deref(), Some(updated) ); // Delete. delete_content(ContentKey::Story(story_id)); assert!(read_content(ContentKey::Story(story_id)).is_none()); } #[test] fn next_item_number_returns_1_when_empty() { // When no items exist, should return 1. // Note: in test context the global CRDT/content store may or may not // be initialised, so the function falls back gracefully. let n = next_item_number(); assert!(n >= 1); } /// Regression test for bug 1001: `next_item_number` must not hand out a /// tombstoned ID. Without this fix, an `evict_item` followed by a fresh /// create reuses the tombstoned numeric slot, producing a half-written /// item (content store + shadow DB accept; CRDT silently rejects). #[test] fn next_item_number_skips_tombstoned_ids() { crate::crdt_state::init_for_test(); ensure_content_store(); // Seed a high-numbered item via the normal write path so it lands in // the CRDT index. let high_id = "9990"; write_item_with_content( high_id, "1_backlog", "---\nname: To Be Tombstoned\n---\n", ItemMeta::named("To Be Tombstoned"), ); // Tombstone it. This adds 9990 to state.tombstones and clears the // content-store row, so without the fix `next_item_number` would // return 9990 again because it's invisible to both visible-index and // content-id scans. crate::crdt_state::evict_item(high_id).expect("evict should succeed"); assert!(crate::crdt_state::is_tombstoned(high_id)); let next = next_item_number(); assert!( next > 9990, "next_item_number must skip past tombstoned id 9990, got {next}" ); } /// is_tombstoned reflects the post-evict state. #[test] fn is_tombstoned_returns_true_after_evict() { crate::crdt_state::init_for_test(); ensure_content_store(); let id = "9991"; write_item_with_content( id, "1_backlog", "---\nname: Soon To Vanish\n---\n", ItemMeta::named("Soon To Vanish"), ); assert!(!crate::crdt_state::is_tombstoned(id)); crate::crdt_state::evict_item(id).expect("evict should succeed"); assert!(crate::crdt_state::is_tombstoned(id)); } /// Regression test for bug 537: `delete_item` must issue a real SQL DELETE /// rather than upserting stage = "deleted". A "deleted" shadow row that /// survives a restart would be picked up by `sync_crdt_stages_from_db` and /// re-inserted into the CRDT with stage "deleted" — resurrecting a /// tombstoned story. #[tokio::test] async fn delete_item_removes_row_not_sets_deleted_stage() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("pipeline.db"); let options = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(options).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let now = chrono::Utc::now().to_rfc3339(); // Insert a live row. sqlx::query( "INSERT INTO pipeline_items \ (id, name, stage, agent, retry_count, depends_on, content, created_at, updated_at) \ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?8)", ) .bind("42_story_to_delete") .bind("Delete Me") .bind("2_current") .bind(Option::::None) .bind(Option::::None) .bind(Option::::None) .bind("---\nname: Delete Me\n---\n") .bind(&now) .execute(&pool) .await .unwrap(); // Simulate what the background task does when it receives a "deleted" // sentinel message — it must DELETE the row, not upsert it. sqlx::query("DELETE FROM pipeline_items WHERE id = ?1") .bind("42_story_to_delete") .execute(&pool) .await .unwrap(); // The row must be gone — not present with stage = "deleted". let row: Option<(String,)> = sqlx::query_as("SELECT stage FROM pipeline_items WHERE id = ?1") .bind("42_story_to_delete") .fetch_optional(&pool) .await .unwrap(); assert!( row.is_none(), "delete_item must remove the row; found stage = {:?}", row.map(|r| r.0) ); } /// Story 864: `write_item_with_content` no longer parses YAML front-matter /// from `content`. The CRDT metadata reflects ONLY what the caller passes /// via `ItemMeta`. This test writes a body without any front-matter at /// all, sets metadata explicitly, and asserts the CRDT picks up the typed /// values, not anything derived from `content`. #[test] fn write_item_typed_meta_takes_precedence_over_content() { crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9864_story_typed_meta"; // Body has NO YAML header — just plain markdown. let content = "# Just a heading\n\nNo front matter here.\n"; let meta = ItemMeta { name: Some("Typed Name".into()), agent: Some("coder-1".into()), depends_on: Some(vec![100, 200]), }; write_item_with_content(story_id, "2_current", content, meta); let view = crate::crdt_state::read_item(story_id).expect("story exists in CRDT"); assert_eq!(view.stage().dir_name(), "coding"); assert_eq!(view.name(), "Typed Name"); assert_eq!(view.agent(), Some(crate::config::AgentName::Coder1)); assert_eq!(view.retry_count(), 0); assert_eq!(view.depends_on(), &[100, 200]); // Content is stored verbatim (no parsing, no rewrite). assert_eq!( read_content(ContentKey::Story(story_id)).as_deref(), Some(content) ); } /// Regression: root cause of the 2026-05-14 21:07 production outage. /// /// A headless agent on a feature branch (whose binary includes a new /// sqlx migration) must NEVER apply that migration to the production /// pipeline.db. Verify that opening an agent-local DB and running /// migrations on it leaves the production DB's `_sqlx_migrations` table /// unchanged. /// /// The enforcement mechanism is in `init_subsystems(is_agent=true)`, which /// redirects to a temp path. This test validates the SQLite isolation /// property: migrations applied to one file are confined to that file. #[tokio::test] async fn agent_db_isolation_does_not_affect_production_db() { let tmp = tempfile::tempdir().unwrap(); let prod_db_path = tmp.path().join("production.db"); let agent_db_path = tmp.path().join("agent_temp.db"); // Set up the production DB — apply the current compiled-in migrations. let prod_opts = sqlx::sqlite::SqliteConnectOptions::new() .filename(&prod_db_path) .create_if_missing(true); let prod_pool = sqlx::SqlitePool::connect_with(prod_opts).await.unwrap(); sqlx::migrate!("./migrations") .run(&prod_pool) .await .unwrap(); // Record the migration versions present in the production DB. let before: Vec<(i64,)> = sqlx::query_as("SELECT version FROM _sqlx_migrations ORDER BY version") .fetch_all(&prod_pool) .await .unwrap(); // Simulate the agent opening its own isolated DB and running migrations. let agent_opts = sqlx::sqlite::SqliteConnectOptions::new() .filename(&agent_db_path) .create_if_missing(true); let agent_pool = sqlx::SqlitePool::connect_with(agent_opts).await.unwrap(); sqlx::migrate!("./migrations") .run(&agent_pool) .await .unwrap(); // Production DB must be completely unaffected by the agent's migration run. let after: Vec<(i64,)> = sqlx::query_as("SELECT version FROM _sqlx_migrations ORDER BY version") .fetch_all(&prod_pool) .await .unwrap(); assert_eq!( before, after, "agent opening its own DB must not alter the production DB migration table" ); } /// Verify that `check_schema_drift` returns an empty list when all /// migrations in the database are recognised by this binary. #[tokio::test] async fn check_schema_drift_empty_when_all_known() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("drift_test.db"); let opts = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); let drift = super::shadow_write::check_schema_drift(&pool).await; assert!( drift.is_empty(), "no drift expected when DB matches the compiled-in migration set" ); } /// Verify that `check_schema_drift` identifies a manually-inserted /// migration row that is not part of the compiled-in set. #[tokio::test] async fn check_schema_drift_detects_unknown_migration() { let tmp = tempfile::tempdir().unwrap(); let db_path = tmp.path().join("drift_future.db"); let opts = sqlx::sqlite::SqliteConnectOptions::new() .filename(&db_path) .create_if_missing(true); let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); sqlx::migrate!("./migrations").run(&pool).await.unwrap(); // Inject a fake "future" migration that no binary compiled today would know. let fake_checksum: Vec = vec![0u8; 20]; sqlx::query( "INSERT INTO _sqlx_migrations \ (version, description, installed_on, success, checksum, execution_time) \ VALUES (99999999999999, 'future_migration', '2099-01-01T00:00:00Z', 1, ?1, 0)", ) .bind(&fake_checksum) .execute(&pool) .await .unwrap(); let drift = super::shadow_write::check_schema_drift(&pool).await; assert_eq!(drift.len(), 1, "exactly one unknown migration expected"); assert_eq!(drift[0].version, 99999999999999_i64); assert_eq!(drift[0].description, "future_migration"); } /// Story 864: passing `ItemMeta::default()` against a content blob that /// LOOKS like front-matter must NOT silently extract metadata into the /// CRDT. The whole point of removing the implicit YAML round-trip is /// that metadata only flows in through the typed `ItemMeta` arg. #[test] fn write_item_default_meta_ignores_yaml_in_content() { crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9864_story_yaml_ignored"; let content = "---\nname: Should Not Appear\nagent: ghost\n---\n# Body\n"; write_item_with_content(story_id, "2_current", content, ItemMeta::default()); // Nameless items are filtered out by read_item (AC 5: nameless = malformed). assert!( crate::crdt_state::read_item(story_id).is_none(), "name must come from typed meta, not parsed YAML — nameless items must not be surfaced" ); } /// Regression (story 894): startup with no YAML migration call leaves /// existing clean content readable and unmodified. Verifies that removing /// `db::yaml_migration::run()` from the startup path does not break reads. #[test] fn startup_reads_clean_content_unchanged() { crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9894_story_clean_content"; // Plain body — no YAML block, representing post-migration state. let body = "# Story Heading\n\nSome content.\n"; write_item_with_content( story_id, "2_current", body, ItemMeta { name: Some("Clean".into()), ..ItemMeta::default() }, ); let read_back = read_content(ContentKey::Story(story_id)).expect("content present"); assert_eq!(read_back, body, "plain content must be readable as-is"); assert!( !read_back.trim_start().starts_with("---"), "no YAML header should appear" ); } /// Bug 780: stage transitions must reset retry_count to 0 in the CRDT. /// Carryover from prior-stage retries was tripping the auto-assigner's /// deterministic-merge skip logic. #[test] fn move_item_stage_resets_retry_count_to_zero() { crate::crdt_state::init_for_test(); ensure_content_store(); let story_id = "9870_story_780_retry_reset"; // Seed the story in 2_current with retry_count = 3 (a coder that // burned all its retries). crate::crdt_state::write_item_str( story_id, "2_current", Some("Retry reset test"), None, None, None, ); crate::crdt_state::set_retry_count(story_id, 3); let typed = crate::pipeline_state::read_typed(story_id) .expect("read should succeed") .expect("story exists in CRDT"); assert_eq!(typed.retry_count(), 3); // Promote to 4_merge. retry_count must reset. move_item_stage(story_id, "4_merge", None); let typed_after = crate::pipeline_state::read_typed(story_id) .expect("read should succeed") .expect("story exists in CRDT"); assert_eq!(typed_after.stage.dir_name(), "merge"); assert_eq!( typed_after.retry_count(), 0, "retry_count must reset to 0 on stage transition" ); } }