huskies: merge 1098 bug Shadow drift: set_retry_count / bump_retry_count write CRDT register without updating pipeline_items.retry_count

This commit is contained in:
dave
2026-05-15 18:19:56 +00:00
parent 0ae6dfd565
commit 1adc734801
4 changed files with 196 additions and 20 deletions
+36
View File
@@ -564,6 +564,24 @@ pub fn set_retry_count(story_id: &str, count: i64) {
_ => return,
};
write_item(story_id, &new_stage, None, None, None, None);
if let Some(db) = crate::db::shadow_write::PIPELINE_DB.get() {
let stage = stage_dir_name(&new_stage).to_string();
let name = Some(item.name().to_string());
let agent = item.agent().map(|a| a.to_string());
let depends_on = (!item.depends_on().is_empty())
.then(|| serde_json::to_string(item.depends_on()).ok())
.flatten();
let msg = crate::db::shadow_write::PipelineWriteMsg {
story_id: story_id.to_string(),
stage,
name,
agent,
retry_count: Some(count.max(0)),
depends_on,
content: None,
};
let _ = db.tx.send(msg);
}
}
/// Increment `retries` by 1 and return the new value.
@@ -613,5 +631,23 @@ pub fn bump_retry_count(story_id: &str) -> i64 {
_ => return 0,
};
write_item(story_id, &new_stage, None, None, None, None);
if let Some(db) = crate::db::shadow_write::PIPELINE_DB.get() {
let stage = stage_dir_name(&new_stage).to_string();
let name = Some(item.name().to_string());
let agent = item.agent().map(|a| a.to_string());
let depends_on = (!item.depends_on().is_empty())
.then(|| serde_json::to_string(item.depends_on()).ok())
.flatten();
let msg = crate::db::shadow_write::PipelineWriteMsg {
story_id: story_id.to_string(),
stage,
name,
agent,
retry_count: Some(new_retries as i64),
depends_on,
content: None,
};
let _ = db.tx.send(msg);
}
new_retries as i64
}
+103 -6
View File
@@ -592,6 +592,42 @@ mod tests {
);
}
/// `shadow_write::init` spawns its background task on the calling runtime,
/// which under `#[tokio::test]` is per-test and dies when the test ends.
/// Park the init on a leaked multi-thread runtime so the bg task lives for
/// the whole test process; mirrors `db::ops::tests::ensure_shadow_db`.
#[cfg(test)]
static SHADOW_RT: std::sync::OnceLock<tokio::runtime::Runtime> = std::sync::OnceLock::new();
#[cfg(test)]
async fn ensure_shadow_db() {
static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if INIT.get().is_some() {
return;
}
let rt = SHADOW_RT.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.expect("shadow rt")
});
rt.spawn(async {
static INNER: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if INNER.get().is_some() {
return;
}
let tmp = tempfile::tempdir().expect("tmp");
let db_path = tmp.path().join("pipeline.db");
std::mem::forget(tmp);
shadow_write::init(&db_path).await.expect("shadow init");
let _ = INNER.set(());
})
.await
.expect("shadow init task");
let _ = INIT.set(());
}
/// Regression for story 1095: `set_name` must propagate the new name to the
/// SQLite shadow table via `sync_item_name`. Before the fix, the CRDT
/// register was updated but `pipeline_items.name` stayed stale.
@@ -599,10 +635,7 @@ mod tests {
async fn set_name_updates_shadow_name_column() {
crate::crdt_state::init_for_test();
ensure_content_store();
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
shadow_write::init(&db_path).await.expect("db init");
ensure_shadow_db().await;
let story_id = "9095_story_set_name_shadow";
write_item_with_content(
@@ -612,17 +645,29 @@ mod tests {
ItemMeta::named("Original Name"),
);
// Wait for the initial insert to land.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Rename via the CRDT setter — now also triggers sync_item_name.
crate::crdt_state::set_name(story_id, Some("Updated Name"));
// Wait for the background write task to flush.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let pool = shadow_write::get_shared_pool().expect("pool must be initialised");
// Open a fresh pool on this test's runtime — sqlx pools are not safe
// to share across runtimes, so we can't reuse `get_shared_pool()`
// (which was created on the leaked shadow-write runtime).
let path = shadow_write::SHADOW_DB_PATH
.get()
.expect("SHADOW_DB_PATH set by init");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(path)
.create_if_missing(false);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
let row: (Option<String>,) =
sqlx::query_as("SELECT name FROM pipeline_items WHERE id = ?1")
.bind(story_id)
.fetch_one(pool)
.fetch_one(&pool)
.await
.unwrap();
@@ -633,6 +678,58 @@ mod tests {
);
}
/// Bug 1098: `bump_retry_count` must mirror the new value to the SQLite
/// shadow table, not only to the CRDT register.
///
/// Before the fix, calling `bump_retry_count` updated the CRDT but left
/// `pipeline_items.retry_count` stale.
#[tokio::test]
async fn bump_retry_count_updates_shadow_table() {
crate::crdt_state::init_for_test();
ensure_content_store();
ensure_shadow_db().await;
let story_id = "9899_story_retry_shadow_1098";
// Insert the story into both CRDT and the shadow table.
write_item_with_content(
story_id,
"2_current",
"# Retry shadow test\n",
ItemMeta::named("Retry Shadow Test"),
);
// Let the background write task process the initial insert.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
// Three bumps → retry_count must reach 3 in SQLite.
crate::crdt_state::bump_retry_count(story_id);
crate::crdt_state::bump_retry_count(story_id);
crate::crdt_state::bump_retry_count(story_id);
// Let the background write task process all three updates.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let path = shadow_write::SHADOW_DB_PATH
.get()
.expect("SHADOW_DB_PATH set by init");
let opts = sqlx::sqlite::SqliteConnectOptions::new()
.filename(path)
.create_if_missing(false);
let pool = sqlx::SqlitePool::connect_with(opts).await.unwrap();
let (count,): (i64,) =
sqlx::query_as("SELECT retry_count FROM pipeline_items WHERE id = ?1")
.bind(story_id)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
count, 3,
"retry_count must be 3 after three bump_retry_count calls"
);
}
/// Story 1087, AC2: the split-stage migration projects every supported
/// wire-form `stage` string into the canonical `(pipeline, status)` pair.
/// The fixture covers each Stage variant (and the legacy numeric-prefix
+36 -4
View File
@@ -359,6 +359,41 @@ mod tests {
use super::*;
use crate::db::shadow_write;
/// `shadow_write::init` spawns its background task on the calling runtime.
/// Under `#[tokio::test]` that runtime is per-test and drops when the test
/// ends, killing the task. This OnceLock holds a multi-thread runtime that
/// persists for the lifetime of the test binary so the write loop stays alive
/// across all tests that share `PIPELINE_DB`.
static SHADOW_RT: std::sync::OnceLock<tokio::runtime::Runtime> = std::sync::OnceLock::new();
async fn ensure_shadow_db() {
static INIT: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if INIT.get().is_some() {
return;
}
let rt = SHADOW_RT.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.expect("shadow rt")
});
rt.spawn(async {
static INNER: std::sync::OnceLock<()> = std::sync::OnceLock::new();
if INNER.get().is_some() {
return;
}
let tmp = tempfile::tempdir().expect("tmp");
let db_path = tmp.path().join("pipeline.db");
std::mem::forget(tmp);
shadow_write::init(&db_path).await.expect("shadow init");
let _ = INNER.set(());
})
.await
.expect("shadow init task");
let _ = INIT.set(());
}
/// Regression test for story 1097: `set_depends_on` must sync the shadow
/// table. Before the fix, the CRDT register was updated but the
/// `pipeline_items.depends_on` column was never written.
@@ -366,10 +401,7 @@ mod tests {
async fn set_depends_on_syncs_shadow_table() {
crate::crdt_state::init_for_test();
ensure_content_store();
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
shadow_write::init(&db_path).await.unwrap();
ensure_shadow_db().await;
let story_id = "1097_story_depends_on_shadow_drift";
+21 -10
View File
@@ -41,23 +41,30 @@ pub fn get_shared_pool() -> Option<&'static SqlitePool> {
}
/// A pending shadow write for one pipeline item.
pub(super) struct PipelineWriteMsg {
pub(super) story_id: String,
pub(super) stage: String,
pub(super) name: Option<String>,
pub(super) agent: Option<String>,
pub(super) retry_count: Option<i64>,
pub(super) depends_on: Option<String>,
pub(super) content: Option<String>,
pub(crate) struct PipelineWriteMsg {
pub(crate) story_id: String,
pub(crate) stage: String,
pub(crate) name: Option<String>,
pub(crate) agent: Option<String>,
pub(crate) retry_count: Option<i64>,
pub(crate) depends_on: Option<String>,
pub(crate) content: Option<String>,
}
/// Handle to the background shadow-write task.
pub struct PipelineDb {
pub(super) tx: mpsc::UnboundedSender<PipelineWriteMsg>,
pub(crate) tx: mpsc::UnboundedSender<PipelineWriteMsg>,
}
/// Process-global handle to the background shadow-write task, set once during `init`.
pub(super) static PIPELINE_DB: OnceLock<PipelineDb> = OnceLock::new();
pub(crate) static PIPELINE_DB: OnceLock<PipelineDb> = OnceLock::new();
/// Path of the SQLite file opened by [`init`], set once by the first successful caller.
///
/// Tests that need to open their own pool (because sqlx pools are not safe to
/// share across Tokio runtimes) read this to find the right file regardless of
/// which test won the `PIPELINE_DB` init race.
pub(crate) static SHADOW_DB_PATH: OnceLock<std::path::PathBuf> = OnceLock::new();
/// Initialise the pipeline database.
///
@@ -68,6 +75,10 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
if PIPELINE_DB.get().is_some() {
return Ok(());
}
// Record the path before doing any real work so tests can always find the
// correct file even if two callers race — the OnceLock ensures only one
// path wins, and whichever wins will also win the PIPELINE_DB set below.
let _ = SHADOW_DB_PATH.set(db_path.to_path_buf());
// Story 1087: before running the migration that splits `stage` into
// (`pipeline`, `status`), take a timestamped side-car copy of the live DB