diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index 483202fa..609bc08f 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -564,6 +564,24 @@ pub fn set_retry_count(story_id: &str, count: i64) { _ => return, }; write_item(story_id, &new_stage, None, None, None, None); + if let Some(db) = crate::db::shadow_write::PIPELINE_DB.get() { + let stage = stage_dir_name(&new_stage).to_string(); + let name = Some(item.name().to_string()); + let agent = item.agent().map(|a| a.to_string()); + let depends_on = (!item.depends_on().is_empty()) + .then(|| serde_json::to_string(item.depends_on()).ok()) + .flatten(); + let msg = crate::db::shadow_write::PipelineWriteMsg { + story_id: story_id.to_string(), + stage, + name, + agent, + retry_count: Some(count.max(0)), + depends_on, + content: None, + }; + let _ = db.tx.send(msg); + } } /// Increment `retries` by 1 and return the new value. @@ -613,5 +631,23 @@ pub fn bump_retry_count(story_id: &str) -> i64 { _ => return 0, }; write_item(story_id, &new_stage, None, None, None, None); + if let Some(db) = crate::db::shadow_write::PIPELINE_DB.get() { + let stage = stage_dir_name(&new_stage).to_string(); + let name = Some(item.name().to_string()); + let agent = item.agent().map(|a| a.to_string()); + let depends_on = (!item.depends_on().is_empty()) + .then(|| serde_json::to_string(item.depends_on()).ok()) + .flatten(); + let msg = crate::db::shadow_write::PipelineWriteMsg { + story_id: story_id.to_string(), + stage, + name, + agent, + retry_count: Some(new_retries as i64), + depends_on, + content: None, + }; + let _ = db.tx.send(msg); + } new_retries as i64 } diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 5b89c843..23ab1547 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -592,6 +592,42 @@ mod tests { ); } + /// `shadow_write::init` spawns its background task on the calling runtime, + /// which under `#[tokio::test]` is per-test and dies when the test ends. + /// Park the init on a leaked multi-thread runtime so the bg task lives for + /// the whole test process; mirrors `db::ops::tests::ensure_shadow_db`. + #[cfg(test)] + static SHADOW_RT: std::sync::OnceLock = std::sync::OnceLock::new(); + + #[cfg(test)] + async fn ensure_shadow_db() { + static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + if INIT.get().is_some() { + return; + } + let rt = SHADOW_RT.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .expect("shadow rt") + }); + rt.spawn(async { + static INNER: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + if INNER.get().is_some() { + return; + } + let tmp = tempfile::tempdir().expect("tmp"); + let db_path = tmp.path().join("pipeline.db"); + std::mem::forget(tmp); + shadow_write::init(&db_path).await.expect("shadow init"); + let _ = INNER.set(()); + }) + .await + .expect("shadow init task"); + let _ = INIT.set(()); + } + /// Regression for story 1095: `set_name` must propagate the new name to the /// SQLite shadow table via `sync_item_name`. Before the fix, the CRDT /// register was updated but `pipeline_items.name` stayed stale. @@ -599,10 +635,7 @@ mod tests { async fn set_name_updates_shadow_name_column() { crate::crdt_state::init_for_test(); ensure_content_store(); - - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("pipeline.db"); - shadow_write::init(&db_path).await.expect("db init"); + ensure_shadow_db().await; let story_id = "9095_story_set_name_shadow"; write_item_with_content( @@ -612,17 +645,29 @@ mod tests { ItemMeta::named("Original Name"), ); + // Wait for the initial insert to land. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + // Rename via the CRDT setter — now also triggers sync_item_name. crate::crdt_state::set_name(story_id, Some("Updated Name")); // Wait for the background write task to flush. tokio::time::sleep(std::time::Duration::from_millis(100)).await; - let pool = shadow_write::get_shared_pool().expect("pool must be initialised"); + // Open a fresh pool on this test's runtime — sqlx pools are not safe + // to share across runtimes, so we can't reuse `get_shared_pool()` + // (which was created on the leaked shadow-write runtime). + let path = shadow_write::SHADOW_DB_PATH + .get() + .expect("SHADOW_DB_PATH set by init"); + let opts = sqlx::sqlite::SqliteConnectOptions::new() + .filename(path) + .create_if_missing(false); + let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); let row: (Option,) = sqlx::query_as("SELECT name FROM pipeline_items WHERE id = ?1") .bind(story_id) - .fetch_one(pool) + .fetch_one(&pool) .await .unwrap(); @@ -633,6 +678,58 @@ mod tests { ); } + /// Bug 1098: `bump_retry_count` must mirror the new value to the SQLite + /// shadow table, not only to the CRDT register. + /// + /// Before the fix, calling `bump_retry_count` updated the CRDT but left + /// `pipeline_items.retry_count` stale. + #[tokio::test] + async fn bump_retry_count_updates_shadow_table() { + crate::crdt_state::init_for_test(); + ensure_content_store(); + ensure_shadow_db().await; + + let story_id = "9899_story_retry_shadow_1098"; + + // Insert the story into both CRDT and the shadow table. + write_item_with_content( + story_id, + "2_current", + "# Retry shadow test\n", + ItemMeta::named("Retry Shadow Test"), + ); + + // Let the background write task process the initial insert. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // Three bumps → retry_count must reach 3 in SQLite. + crate::crdt_state::bump_retry_count(story_id); + crate::crdt_state::bump_retry_count(story_id); + crate::crdt_state::bump_retry_count(story_id); + + // Let the background write task process all three updates. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let path = shadow_write::SHADOW_DB_PATH + .get() + .expect("SHADOW_DB_PATH set by init"); + let opts = sqlx::sqlite::SqliteConnectOptions::new() + .filename(path) + .create_if_missing(false); + let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap(); + let (count,): (i64,) = + sqlx::query_as("SELECT retry_count FROM pipeline_items WHERE id = ?1") + .bind(story_id) + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!( + count, 3, + "retry_count must be 3 after three bump_retry_count calls" + ); + } + /// Story 1087, AC2: the split-stage migration projects every supported /// wire-form `stage` string into the canonical `(pipeline, status)` pair. /// The fixture covers each Stage variant (and the legacy numeric-prefix diff --git a/server/src/db/ops.rs b/server/src/db/ops.rs index 94834630..112aa92c 100644 --- a/server/src/db/ops.rs +++ b/server/src/db/ops.rs @@ -359,6 +359,41 @@ mod tests { use super::*; use crate::db::shadow_write; + /// `shadow_write::init` spawns its background task on the calling runtime. + /// Under `#[tokio::test]` that runtime is per-test and drops when the test + /// ends, killing the task. This OnceLock holds a multi-thread runtime that + /// persists for the lifetime of the test binary so the write loop stays alive + /// across all tests that share `PIPELINE_DB`. + static SHADOW_RT: std::sync::OnceLock = std::sync::OnceLock::new(); + + async fn ensure_shadow_db() { + static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + if INIT.get().is_some() { + return; + } + let rt = SHADOW_RT.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .expect("shadow rt") + }); + rt.spawn(async { + static INNER: std::sync::OnceLock<()> = std::sync::OnceLock::new(); + if INNER.get().is_some() { + return; + } + let tmp = tempfile::tempdir().expect("tmp"); + let db_path = tmp.path().join("pipeline.db"); + std::mem::forget(tmp); + shadow_write::init(&db_path).await.expect("shadow init"); + let _ = INNER.set(()); + }) + .await + .expect("shadow init task"); + let _ = INIT.set(()); + } + /// Regression test for story 1097: `set_depends_on` must sync the shadow /// table. Before the fix, the CRDT register was updated but the /// `pipeline_items.depends_on` column was never written. @@ -366,10 +401,7 @@ mod tests { async fn set_depends_on_syncs_shadow_table() { crate::crdt_state::init_for_test(); ensure_content_store(); - - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("pipeline.db"); - shadow_write::init(&db_path).await.unwrap(); + ensure_shadow_db().await; let story_id = "1097_story_depends_on_shadow_drift"; diff --git a/server/src/db/shadow_write.rs b/server/src/db/shadow_write.rs index 30fc19b5..424ccf0a 100644 --- a/server/src/db/shadow_write.rs +++ b/server/src/db/shadow_write.rs @@ -41,23 +41,30 @@ pub fn get_shared_pool() -> Option<&'static SqlitePool> { } /// A pending shadow write for one pipeline item. -pub(super) struct PipelineWriteMsg { - pub(super) story_id: String, - pub(super) stage: String, - pub(super) name: Option, - pub(super) agent: Option, - pub(super) retry_count: Option, - pub(super) depends_on: Option, - pub(super) content: Option, +pub(crate) struct PipelineWriteMsg { + pub(crate) story_id: String, + pub(crate) stage: String, + pub(crate) name: Option, + pub(crate) agent: Option, + pub(crate) retry_count: Option, + pub(crate) depends_on: Option, + pub(crate) content: Option, } /// Handle to the background shadow-write task. pub struct PipelineDb { - pub(super) tx: mpsc::UnboundedSender, + pub(crate) tx: mpsc::UnboundedSender, } /// Process-global handle to the background shadow-write task, set once during `init`. -pub(super) static PIPELINE_DB: OnceLock = OnceLock::new(); +pub(crate) static PIPELINE_DB: OnceLock = OnceLock::new(); + +/// Path of the SQLite file opened by [`init`], set once by the first successful caller. +/// +/// Tests that need to open their own pool (because sqlx pools are not safe to +/// share across Tokio runtimes) read this to find the right file regardless of +/// which test won the `PIPELINE_DB` init race. +pub(crate) static SHADOW_DB_PATH: OnceLock = OnceLock::new(); /// Initialise the pipeline database. /// @@ -68,6 +75,10 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> { if PIPELINE_DB.get().is_some() { return Ok(()); } + // Record the path before doing any real work so tests can always find the + // correct file even if two callers race — the OnceLock ensures only one + // path wins, and whichever wins will also win the PIPELINE_DB set below. + let _ = SHADOW_DB_PATH.set(db_path.to_path_buf()); // Story 1087: before running the migration that splits `stage` into // (`pipeline`, `status`), take a timestamped side-car copy of the live DB