From 63d86f1263ad552e56f317ede93c9f8209343633 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 15 May 2026 19:01:05 +0000 Subject: [PATCH] huskies: merge 1096 bug Shadow drift: set_agent writes CRDT agent register without updating pipeline_items.agent --- server/src/crdt_state/write/item.rs | 25 +++++++++++-------- server/src/db/ops.rs | 37 +++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/server/src/crdt_state/write/item.rs b/server/src/crdt_state/write/item.rs index 609bc08f..14e6be37 100644 --- a/server/src/crdt_state/write/item.rs +++ b/server/src/crdt_state/write/item.rs @@ -183,16 +183,21 @@ pub fn set_agent(story_id: &str, agent: Option) -> boo let Some(state_mutex) = get_crdt() else { return false; }; - let Ok(mut state) = state_mutex.lock() else { - return false; - }; - let Some(&idx) = state.index.get(story_id) else { - return false; - }; - let value = agent.map(|a| a.as_str().to_string()).unwrap_or_default(); - apply_and_persist(&mut state, |s| { - s.crdt.doc.items[idx].agent.set(value.clone()) - }); + { + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + let value = agent.map(|a| a.as_str().to_string()).unwrap_or_default(); + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].agent.set(value.clone()) + }); + } + // Sync the updated agent to the SQLite shadow table. Must be called after + // releasing the CRDT mutex so read_item can re-acquire it without deadlock. + crate::db::ops::sync_item_agent(story_id); true } diff --git a/server/src/db/ops.rs b/server/src/db/ops.rs index 112aa92c..05713ee5 100644 --- a/server/src/db/ops.rs +++ b/server/src/db/ops.rs @@ -176,6 +176,43 @@ pub fn move_item_stage( } } +/// Shadow-write the updated agent field for an existing pipeline item. +/// +/// Called by [`crate::crdt_state::set_agent`] after the CRDT register is updated +/// so `pipeline_items.agent` stays in sync. Reads the full current metadata from +/// the CRDT (stage, name, depends_on, retry_count) to avoid overwriting other +/// columns with stale values — only the `agent` column carries the new data. +pub fn sync_item_agent(story_id: &str) { + let Some(db) = PIPELINE_DB.get() else { + return; + }; + let Some(view) = crate::crdt_state::read_item(story_id) else { + return; + }; + let stage = view.stage().dir_name().to_string(); + let name = Some(view.name().to_string()); + let agent = view.agent().map(|a| a.as_str().to_string()); + let depends_on = { + let d = view.depends_on(); + if d.is_empty() { + None + } else { + serde_json::to_string(d).ok() + } + }; + let retry_count = Some(i64::from(view.retry_count())); + let msg = PipelineWriteMsg { + story_id: story_id.to_string(), + stage, + name, + agent, + retry_count, + depends_on, + content: None, + }; + let _ = db.tx.send(msg); +} + /// Delete a story from the shadow table (fire-and-forget). pub fn delete_item(story_id: &str) { delete_content(ContentKey::Story(story_id));