From 8a7e1aa0368757b7d8bbacb8e5f35ebae0331244 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 29 Apr 2026 16:05:54 +0000 Subject: [PATCH] huskies: merge 873 --- server/src/agents/pool/auto_assign/merge.rs | 1 + server/src/crdt_state/mod.rs | 2 +- server/src/crdt_state/read.rs | 6 ++ server/src/crdt_state/types.rs | 7 ++ server/src/crdt_state/write.rs | 83 +++++++++++++++++++++ server/src/pipeline_state/projection.rs | 6 ++ 6 files changed, 104 insertions(+), 1 deletion(-) diff --git a/server/src/agents/pool/auto_assign/merge.rs b/server/src/agents/pool/auto_assign/merge.rs index 94e9f2f3..98fadb98 100644 --- a/server/src/agents/pool/auto_assign/merge.rs +++ b/server/src/agents/pool/auto_assign/merge.rs @@ -85,6 +85,7 @@ impl AgentPool { crate::db::write_content(story_id, &updated); crate::db::write_item_with_content(story_id, "4_merge", &updated); } + crate::crdt_state::set_mergemaster_attempted(story_id, true); if let Err(e) = self .start_agent(project_root, story_id, Some(&agent_name), None, None) .await diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index ce3aeea1..f34a65e9 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -53,7 +53,7 @@ pub use types::{ }; pub use write::{ bump_retry_count, migrate_names_from_slugs, migrate_story_ids_to_numeric, name_from_story_id, - set_agent, set_depends_on, set_qa_mode, set_retry_count, write_item, + set_agent, set_depends_on, set_mergemaster_attempted, set_qa_mode, set_retry_count, write_item, }; #[cfg(test)] diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index 15d34244..cab05869 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -320,6 +320,11 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option None, }; + let mergemaster_attempted = match item.mergemaster_attempted.view() { + JsonValue::Bool(b) => Some(b), + _ => None, + }; + Some(PipelineItemView { story_id, stage, @@ -332,6 +337,7 @@ pub(super) fn extract_item_view(item: &PipelineItemCrdt) -> Option, + /// Set to `true` when the auto-assigner has already spawned a mergemaster + /// session for a content-conflict failure. Prevents repeated auto-spawns + /// across restarts. Written as `false` (not removed) when cleared. + pub mergemaster_attempted: LwwRegisterCrdt, } /// CRDT node that holds a single peer's presence entry. @@ -128,6 +132,9 @@ pub struct PipelineItemView { /// QA mode override from the CRDT register: `"server"`, `"agent"`, or `"human"`. /// `None` means the register is unset (use project default). pub qa_mode: Option, + /// Whether the auto-assigner has already spawned a mergemaster session for + /// this item. `None` means the register has never been set (treat as false). + pub mergemaster_attempted: Option, } /// A snapshot of a single node presence entry derived from the CRDT document. diff --git a/server/src/crdt_state/write.rs b/server/src/crdt_state/write.rs index e7025f3e..aa9df715 100644 --- a/server/src/crdt_state/write.rs +++ b/server/src/crdt_state/write.rs @@ -265,6 +265,31 @@ pub fn set_qa_mode(story_id: &str, mode: Option) -> bool { true } +/// Set the `mergemaster_attempted` CRDT flag for a pipeline item. +/// +/// Passing `true` records that a mergemaster session has been spawned for this +/// item, preventing repeated auto-spawns across restarts. +/// Passing `false` explicitly writes `false` (does not remove the register) so +/// the cleared state is distinguishable from an unset register and survives +/// CRDT replay correctly. +/// +/// Returns `true` if the item was found and the op was applied, `false` otherwise. +pub fn set_mergemaster_attempted(story_id: &str, value: bool) -> bool { + let Some(state_mutex) = get_crdt() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].mergemaster_attempted.set(value) + }); + true +} + /// Write a pipeline item state through CRDT operations. /// /// If the item exists, updates its registers. If not, inserts a new item @@ -368,6 +393,7 @@ pub fn write_item( "claimed_at": claimed_at.unwrap_or(0.0), "merged_at": merged_at.unwrap_or(0.0), "qa_mode": "", + "mergemaster_attempted": false, }) .into(); @@ -394,6 +420,7 @@ pub fn write_item( item.claimed_at.advance_seq(floor); item.merged_at.advance_seq(floor); item.qa_mode.advance_seq(floor); + item.mergemaster_attempted.advance_seq(floor); } // Broadcast a CrdtEvent for the new item. @@ -921,6 +948,62 @@ mod tests { ); } + // ── set_mergemaster_attempted regression tests ─────────────────────────── + + #[test] + fn set_mergemaster_attempted_true_then_false_flips_register() { + init_for_test(); + + write_item( + "873_story_mergemaster_flip", + "4_merge", + None, + None, + None, + None, + None, + None, + None, + None, + ); + + // Set true — register must read back as true. + let ok = set_mergemaster_attempted("873_story_mergemaster_flip", true); + assert!( + ok, + "set_mergemaster_attempted should return true for known item" + ); + let view = read_item("873_story_mergemaster_flip").unwrap(); + assert_eq!( + view.mergemaster_attempted, + Some(true), + "CRDT register should hold true after setting true" + ); + + // Set false — register must flip back to false (not unset). + let ok = set_mergemaster_attempted("873_story_mergemaster_flip", false); + assert!( + ok, + "set_mergemaster_attempted(false) should return true for known item" + ); + let view = read_item("873_story_mergemaster_flip").unwrap(); + assert_eq!( + view.mergemaster_attempted, + Some(false), + "CRDT register should hold false after explicit clear" + ); + } + + #[test] + fn set_mergemaster_attempted_returns_false_for_unknown_story() { + init_for_test(); + let ok = set_mergemaster_attempted("nonexistent_story_mm", true); + assert!( + !ok, + "set_mergemaster_attempted should return false for unknown story_id" + ); + } + #[test] fn set_qa_mode_returns_false_for_unknown_story() { init_for_test(); diff --git a/server/src/pipeline_state/projection.rs b/server/src/pipeline_state/projection.rs index 938262fe..33a810cc 100644 --- a/server/src/pipeline_state/projection.rs +++ b/server/src/pipeline_state/projection.rs @@ -206,6 +206,7 @@ mod tests { claimed_at: None, merged_at: None, qa_mode: None, + mergemaster_attempted: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert_eq!(item.story_id, StoryId("42_story_test".to_string())); @@ -229,6 +230,7 @@ mod tests { claimed_at: None, merged_at: None, qa_mode: None, + mergemaster_attempted: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Coding)); @@ -249,6 +251,7 @@ mod tests { claimed_at: None, merged_at: None, qa_mode: None, + mergemaster_attempted: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Merge { .. })); @@ -276,6 +279,7 @@ mod tests { claimed_at: None, merged_at: None, qa_mode: None, + mergemaster_attempted: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -301,6 +305,7 @@ mod tests { claimed_at: None, merged_at: None, qa_mode: None, + mergemaster_attempted: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -326,6 +331,7 @@ mod tests { claimed_at: None, merged_at: None, qa_mode: None, + mergemaster_attempted: None, }; let result = PipelineItem::try_from(&view); assert!(matches!(