From 8531bac6cddf7d1a955df87db08856f5ab27a177 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 15 May 2026 12:34:21 +0000 Subject: [PATCH] huskies: merge 1097 bug Shadow drift: set_depends_on writes CRDT depends_on register without updating pipeline_items.depends_on --- server/src/crdt_state/write/item.rs | 35 +++++++----- server/src/db/ops.rs | 88 +++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 15 deletions(-) diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index 07d1edc2..483202fa 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -21,21 +21,26 @@ use crate::pipeline_state::{AgentClaim, Stage, stage_dir_name}; /// /// Returns `true` if the item was found and the op was applied, `false` otherwise. pub fn set_depends_on(story_id: &str, deps: &[u32]) -> bool { - let Some(state_mutex) = get_crdt() else { - return false; - }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.index.get(story_id) else { - return false; - }; - let value = if deps.is_empty() { - String::new() - } else { - serde_json::to_string(deps).unwrap_or_default() - }; - apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].depends_on.set(value)); + { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + let value = if deps.is_empty() { + String::new() + } else { + serde_json::to_string(deps).unwrap_or_default() + }; + apply_and_persist(&mut state, |s| s.crdt.doc.items[idx].depends_on.set(value)); + } + // Drop the CRDT lock before calling sync: read_item acquires the same + // mutex and would deadlock if the lock were still held here. + crate::db::ops::sync_item_depends_on(story_id); true } diff --git a/server/src/db/ops.rs b/server/src/db/ops.rs index 23d717b1..94834630 100644 --- a/server/src/db/ops.rs +++ b/server/src/db/ops.rs @@ -268,6 +268,41 @@ pub fn sync_item_name(story_id: &str) { let _ = db.tx.send(msg); } +/// Sync the `depends_on` field of a pipeline item from the CRDT to the shadow table. +/// +/// Called after [`crate::crdt_state::set_depends_on`] updates the CRDT register so +/// that the SQLite shadow table stays in lock-step. Reads the full current view from +/// the CRDT (stage, name, agent, retry_count, depends_on) and sends a +/// [`PipelineWriteMsg`] over [`PIPELINE_DB`]`.tx`. Pattern mirrors +/// [`move_item_stage`] lines 157-176. No-op when the CRDT is uninitialised or the +/// story_id is not found. +pub fn sync_item_depends_on(story_id: &str) { + let Some(db) = PIPELINE_DB.get() else { + return; + }; + let Some(view) = crate::crdt_state::read_item(story_id) else { + return; + }; + let depends_on = { + let d = view.depends_on(); + if d.is_empty() { + None + } else { + serde_json::to_string(d).ok() + } + }; + let msg = PipelineWriteMsg { + story_id: story_id.to_string(), + stage: view.stage().dir_name().to_string(), + name: Some(view.name().to_string()), + agent: view.agent().map(|a| a.to_string()), + retry_count: Some(view.retry_count() as i64), + depends_on, + content: None, + }; + let _ = db.tx.send(msg); +} + /// Get the next available item number by scanning the CRDT state, the /// in-memory content store, AND the tombstone set for the highest existing /// number. @@ -318,3 +353,56 @@ pub fn next_item_number() -> u32 { max_num + 1 } + +#[cfg(test)] +mod tests { + use super::*; + use crate::db::shadow_write; + + /// Regression test for story 1097: `set_depends_on` must sync the shadow + /// table. Before the fix, the CRDT register was updated but the + /// `pipeline_items.depends_on` column was never written. + #[tokio::test] + async fn set_depends_on_syncs_shadow_table() { + crate::crdt_state::init_for_test(); + ensure_content_store(); + + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("pipeline.db"); + shadow_write::init(&db_path).await.unwrap(); + + let story_id = "1097_story_depends_on_shadow_drift"; + + // Insert the story so it exists in both the CRDT and the shadow table. + write_item_with_content( + story_id, + "backlog", + "---\nname: Depends On Shadow Drift\n---\n", + ItemMeta::named("Depends On Shadow Drift"), + ); + + // Let the initial shadow write land. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + // This is the write under test: it must update the shadow table. + let ok = crate::crdt_state::set_depends_on(story_id, &[1, 2]); + assert!(ok, "set_depends_on must return true for an existing item"); + + // Let the shadow write land. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let pool = shadow_write::get_shared_pool().expect("pool must be initialised"); + let row: (Option,) = + sqlx::query_as("SELECT depends_on FROM pipeline_items WHERE id = ?1") + .bind(story_id) + .fetch_one(pool) + .await + .expect("row must exist in shadow table"); + + assert_eq!( + row.0.as_deref(), + Some("[1,2]"), + "pipeline_items.depends_on must reflect the set_depends_on call" + ); + } +}