From 28338a8e8d73e481b0d810f857d8982911fe3938 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 13 May 2026 11:47:27 +0000 Subject: [PATCH] huskies: merge 958 --- server/src/agent_mode/loop_ops.rs | 16 ++++- server/src/agent_mode/mod.rs | 7 +- server/src/agents/lifecycle.rs | 5 +- .../agents/pool/auto_assign/auto_assign.rs | 72 +++++++++++++++++++ server/src/agents/pool/worktree.rs | 7 +- server/src/pipeline_state/types.rs | 14 +--- server/src/startup/tick_loop.rs | 13 ++-- 7 files changed, 103 insertions(+), 31 deletions(-) diff --git a/server/src/agent_mode/loop_ops.rs b/server/src/agent_mode/loop_ops.rs index edccb414..b7595072 100644 --- a/server/src/agent_mode/loop_ops.rs +++ b/server/src/agent_mode/loop_ops.rs @@ -41,8 +41,13 @@ pub(super) async fn scan_and_claim( }; for item in &items { - // Only claim stories in active stages. - if !item.stage().is_active() { + // Only claim stories in execution stages (Coding, Qa, Merge). + if !matches!( + item.stage(), + crate::pipeline_state::Stage::Coding + | crate::pipeline_state::Stage::Qa + | crate::pipeline_state::Stage::Merge { .. } + ) { continue; } @@ -163,7 +168,12 @@ pub(super) fn reclaim_timed_out_work(_project_root: &Path) { let now = chrono::Utc::now().timestamp() as f64; for item in &items { - if !item.stage().is_active() { + if !matches!( + item.stage(), + crate::pipeline_state::Stage::Coding + | crate::pipeline_state::Stage::Qa + | crate::pipeline_state::Stage::Merge { .. } + ) { continue; } diff --git a/server/src/agent_mode/mod.rs b/server/src/agent_mode/mod.rs index b8a52f06..0b741f97 100644 --- a/server/src/agent_mode/mod.rs +++ b/server/src/agent_mode/mod.rs @@ -123,7 +123,7 @@ pub async fn run( } } - // Subscribe to watcher events to trigger auto-assign on stage transitions. + // Subscribe to watcher events to trigger auto-assign on every stage transition. { let auto_rx = watcher_tx.subscribe(); let auto_agents = Arc::clone(&agents); @@ -131,10 +131,7 @@ pub async fn run( tokio::spawn(async move { let mut rx = auto_rx; while let Ok(event) = rx.recv().await { - if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event - && crate::pipeline_state::Stage::from_dir(stage.as_str()) - .is_some_and(|s| s.is_active()) - { + if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event { slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign."); auto_agents.auto_assign_available_work(&auto_root).await; } diff --git a/server/src/agents/lifecycle.rs b/server/src/agents/lifecycle.rs index 1f3ca65f..d3616ce2 100644 --- a/server/src/agents/lifecycle.rs +++ b/server/src/agents/lifecycle.rs @@ -643,10 +643,9 @@ mod tests { "stage should be Stage::Merge after unblock, got: {:?}", item.stage ); - // auto_assign checks is_active() — Merge satisfies it. assert!( - item.stage.is_active(), - "Merge satisfies is_active() so auto_assign can pick it up: {:?}", + matches!(item.stage, Stage::Merge { .. }), + "stage should be Stage::Merge so auto_assign can pick it up: {:?}", item.stage ); } diff --git a/server/src/agents/pool/auto_assign/auto_assign.rs b/server/src/agents/pool/auto_assign/auto_assign.rs index 6b775781..85d01feb 100644 --- a/server/src/agents/pool/auto_assign/auto_assign.rs +++ b/server/src/agents/pool/auto_assign/auto_assign.rs @@ -814,4 +814,76 @@ mod tests { found {active_coder_count} active entries" ); } + + // ── Story 958: MergeFailure transition fires auto-assign via watcher bridge ─ + + /// Regression: before story 958, the auto-assign subscriber filtered events + /// with `is_active()`, which returned false for `MergeFailure`. This meant + /// a CRDT `MergeFailure` transition never triggered auto-assign, and + /// mergemaster was never auto-spawned on content conflicts. + /// + /// After story 958, the subscriber fires on EVERY WorkItem event. This + /// test verifies the end-to-end path: a WorkItem event with stage + /// `merge_failure` arriving on the watcher channel causes + /// `auto_assign_available_work` to run, which then auto-spawns mergemaster. + #[tokio::test] + async fn merge_failure_watcher_event_triggers_mergemaster_spawn() { + use std::sync::Arc; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().to_path_buf(); + let sk = root.join(".huskies"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write( + sk.join("project.toml"), + "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + crate::crdt_state::init_for_test(); + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + "958_regression_conflict", + "4_merge_failure", + "CONFLICT (content): server/src/lib.rs", + crate::db::ItemMeta::named("Regression"), + ); + crate::db::write_content( + crate::db::ContentKey::GateOutput("958_regression_conflict"), + "CONFLICT (content): server/src/lib.rs", + ); + + let (watcher_tx, _) = broadcast::channel::(16); + let pool = Arc::new(AgentPool::new(3102, watcher_tx.clone())); + + crate::startup::tick_loop::spawn_event_bridges( + watcher_tx.clone(), + Some(root.clone()), + Arc::clone(&pool), + ); + + // Simulate the CRDT bridge forwarding a merge_failure stage transition. + let _ = watcher_tx.send(crate::io::watcher::WatcherEvent::WorkItem { + stage: "merge_failure".to_string(), + item_id: "958_regression_conflict".to_string(), + action: "update".to_string(), + commit_msg: "huskies: update 958_regression_conflict".to_string(), + from_stage: Some("merge".to_string()), + }); + + // Allow the subscriber task to run auto_assign_available_work. + tokio::task::yield_now().await; + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + + let agents = pool.agents.lock().unwrap(); + let mergemaster_spawned = agents.iter().any(|(key, a)| { + key.contains("958_regression_conflict") + && a.agent_name == "mergemaster" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!( + mergemaster_spawned, + "mergemaster must be auto-spawned when a merge_failure event fires \ + through the watcher bridge (story 958 regression)" + ); + } } diff --git a/server/src/agents/pool/worktree.rs b/server/src/agents/pool/worktree.rs index a730ecf2..5787ac70 100644 --- a/server/src/agents/pool/worktree.rs +++ b/server/src/agents/pool/worktree.rs @@ -28,7 +28,12 @@ pub(super) fn find_active_story_stage( story_id: &str, ) -> Option { if let Ok(Some(item)) = crate::pipeline_state::read_typed(story_id) - && item.stage.is_active() + && matches!( + item.stage, + crate::pipeline_state::Stage::Coding + | crate::pipeline_state::Stage::Qa + | crate::pipeline_state::Stage::Merge { .. } + ) { return Some(item.stage); } diff --git a/server/src/pipeline_state/types.rs b/server/src/pipeline_state/types.rs index 44fd6983..03562f15 100644 --- a/server/src/pipeline_state/types.rs +++ b/server/src/pipeline_state/types.rs @@ -165,16 +165,6 @@ pub enum ArchiveReason { // ── Stage convenience methods ────────────────────────────────────────────── impl Stage { - /// Returns true if this stage is an "active" stage (Coding, Qa, or Merge). - pub fn is_active(&self) -> bool { - matches!(self, Stage::Coding | Stage::Qa | Stage::Merge { .. }) - } - - /// Returns true if this is the Upcoming variant. - pub fn is_upcoming(&self) -> bool { - matches!(self, Stage::Upcoming) - } - /// Returns the filesystem directory name for this stage. pub fn dir_name(&self) -> &'static str { stage_dir_name(self) @@ -223,8 +213,8 @@ impl Stage { /// stage-directory string (from CRDT fields or watcher events) into a /// typed `Stage`. Rich variants (`Done`, `Archived`, `Merge`) are /// synthesised with zero-value fields — callers should use this only for - /// stage *classification* (e.g. `is_active()`, `matches!`), not for - /// accessing the rich metadata fields. + /// stage *classification* (via `matches!`), not for accessing the rich + /// metadata fields. pub fn from_dir(s: &str) -> Option { match s { "upcoming" => Some(Stage::Upcoming), diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 34099065..6adaaa2f 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -5,7 +5,6 @@ use crate::agents::{AgentPool, ReconciliationEvent}; use crate::config; use crate::gateway_relay; use crate::io; -use crate::pipeline_state; use crate::service; use crate::service::status::StatusBroadcaster; use std::path::PathBuf; @@ -58,18 +57,18 @@ pub(crate) fn spawn_event_bridges( } } - // Auto-assign: trigger `auto_assign_available_work` whenever a work item - // enters an active pipeline stage (2_current/, 3_qa/, 4_merge/). + // Auto-assign: trigger `auto_assign_available_work` on every work-item + // CRDT state-transition event. auto_assign_available_work is idempotent + // and noops where there is nothing to do, so firing on every transition + // ensures that MergeFailure and other non-"active" stages are covered + // without any per-stage special-casing. if let Some(root) = project_root { let watcher_auto_rx = watcher_tx.subscribe(); let watcher_auto_agents = Arc::clone(&agents); tokio::spawn(async move { let mut rx = watcher_auto_rx; while let Ok(event) = rx.recv().await { - if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event - && pipeline_state::Stage::from_dir(stage.as_str()) - .is_some_and(|s| s.is_active()) - { + if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event { crate::slog!( "[auto-assign] CRDT transition detected in {stage}/; \ triggering auto-assign."