diff --git a/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs index aaae6be1..fa13c8b8 100644 --- a/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs +++ b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs @@ -11,11 +11,16 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; +use std::sync::Arc; use crate::pipeline_state::{MergeFailureKind, PipelineEvent, Stage, StoryId}; use crate::slog; use crate::slog_warn; +use super::super::super::PipelineStage; +use super::super::AgentPool; +use super::scan::is_story_assigned_for_stage; + /// Spawn a background task that blocks stories after N consecutive `MergeFailure` transitions. /// /// Subscribes to the pipeline transition broadcast channel and tracks a per-story @@ -25,14 +30,23 @@ use crate::slog_warn; /// /// The counter resets when the story leaves `MergeFailure` (e.g. on `FixupRequested`, /// `ReQueuedForQa`, or a successful merge via `Unblock → Merge → Done`). -pub(crate) fn spawn_merge_failure_block_subscriber(project_root: PathBuf) { +/// +/// Bug 1025: while a mergemaster is actively running on the story, its +/// iteration loop (squash → fail → fix → retry) generates multiple +/// MergeFailure transitions. Those are NOT consecutive give-ups — they are +/// recovery iterations in progress. We skip counter increments while a +/// mergemaster is in the pool for the story; the counter only increments on +/// transitions that happen with no recovery agent attached. +pub(crate) fn spawn_merge_failure_block_subscriber(pool: Arc, project_root: PathBuf) { let mut rx = crate::pipeline_state::subscribe_transitions(); tokio::spawn(async move { let mut counters: HashMap = HashMap::new(); loop { match rx.recv().await { Ok(fired) => { - on_transition(&project_root, &fired, &mut counters); + let recovery_running = + is_mergemaster_running(&pool, &project_root, &fired.story_id.0); + on_transition(&project_root, &fired, &mut counters, recovery_running); } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { slog_warn!( @@ -46,14 +60,42 @@ pub(crate) fn spawn_merge_failure_block_subscriber(project_root: PathBuf) { }); } +/// Return true if a mergemaster agent is currently in the pool for `story_id`. +/// Used to suppress counter increments while recovery is actively iterating +/// (bug 1025). +fn is_mergemaster_running(pool: &AgentPool, project_root: &Path, story_id: &str) -> bool { + let config = match crate::config::ProjectConfig::load(project_root) { + Ok(c) => c, + Err(_) => return false, + }; + let agents = match pool.agents.lock() { + Ok(a) => a, + Err(_) => return false, + }; + is_story_assigned_for_stage(&config, &agents, story_id, &PipelineStage::Mergemaster) +} + /// Handle a single transition event: update counters and emit Block if threshold is reached. +/// +/// `recovery_running`: when `true`, a mergemaster is currently in the pool for +/// the story and the failure is part of an in-flight recovery loop. We do NOT +/// increment the consecutive-failure counter in that case (bug 1025). fn on_transition( project_root: &Path, fired: &crate::pipeline_state::TransitionFired, counters: &mut HashMap, + recovery_running: bool, ) { match &fired.after { Stage::MergeFailure { kind, .. } => { + if recovery_running { + slog!( + "[merge-block-sub] Story '{}' MergeFailure while mergemaster is running; \ + not counting toward block threshold (recovery in progress).", + fired.story_id.0 + ); + return; + } let entry = counters .entry(fired.story_id.clone()) .or_insert_with(|| (0, kind.clone())); @@ -195,7 +237,7 @@ mod tests { // Fire 2 MergeFailure events (default threshold is 3). for _ in 0..2 { let fired = make_merge_failure_fired(story_id, kind.clone()); - on_transition(tmp.path(), &fired, &mut counters); + on_transition(tmp.path(), &fired, &mut counters, false); } // Story must still be in MergeFailure (not Blocked). @@ -229,7 +271,7 @@ mod tests { // Fire 3 MergeFailure events — the 3rd must trigger the block. for _ in 0..3 { let fired = make_merge_failure_fired(story_id, kind.clone()); - on_transition(tmp.path(), &fired, &mut counters); + on_transition(tmp.path(), &fired, &mut counters, false); } let item = crate::pipeline_state::read_typed(story_id) @@ -272,7 +314,7 @@ mod tests { // Fire 2 MergeFailure events. for _ in 0..2 { let fired = make_merge_failure_fired(story_id, kind.clone()); - on_transition(tmp.path(), &fired, &mut counters); + on_transition(tmp.path(), &fired, &mut counters, false); } assert_eq!( counters.get(&StoryId(story_id.to_string())).map(|e| e.0), @@ -282,7 +324,7 @@ mod tests { // Simulate FixupRequested (non-MergeFailure transition). let reset_fired = make_coding_fired(story_id); - on_transition(tmp.path(), &reset_fired, &mut counters); + on_transition(tmp.path(), &reset_fired, &mut counters, false); assert!( !counters.contains_key(&StoryId(story_id.to_string())), "counter must be cleared after non-MergeFailure transition" @@ -298,7 +340,7 @@ mod tests { // Fire 2 more MergeFailure events — still below threshold. for _ in 0..2 { let fired = make_merge_failure_fired(story_id, kind.clone()); - on_transition(tmp.path(), &fired, &mut counters); + on_transition(tmp.path(), &fired, &mut counters, false); } let item = crate::pipeline_state::read_typed(story_id) @@ -310,4 +352,82 @@ mod tests { item.stage ); } + + /// Bug 1025: while a mergemaster is running, MergeFailure transitions are + /// recovery iterations, not consecutive give-ups. 3 failures with + /// `recovery_running=true` must NOT block. + #[test] + fn mergemaster_running_suppresses_block() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "1025_recovery_running"; + seed_at_merge(story_id); + + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::ConflictDetected(None), + ) + .expect("initial MergeFailure transition"); + + let mut counters: HashMap = HashMap::new(); + let kind = MergeFailureKind::ConflictDetected(None); + + // Fire 3 MergeFailure events WHILE a mergemaster is running (gated). + for _ in 0..3 { + let fired = make_merge_failure_fired(story_id, kind.clone()); + on_transition(tmp.path(), &fired, &mut counters, true); + } + + // Counter must NOT have incremented at all — recovery in progress. + assert!( + !counters.contains_key(&StoryId(story_id.to_string())), + "counter must not increment while mergemaster is running" + ); + + // And the story must still be in MergeFailure (not Blocked). + let item = crate::pipeline_state::read_typed(story_id) + .expect("read") + .expect("item"); + assert!( + matches!(item.stage, Stage::MergeFailure { .. }), + "story must NOT be blocked while mergemaster is running (recovery in progress): {:?}", + item.stage + ); + } + + /// Bug 1025 regression guard: the genuinely-stuck case (no mergemaster + /// running) still blocks at the threshold, so the original 1018 behaviour + /// is preserved. + #[test] + fn no_mergemaster_still_blocks_at_threshold() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "1025_genuine_stuck"; + seed_at_merge(story_id); + + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::ConflictDetected(None), + ) + .expect("initial MergeFailure transition"); + + let mut counters: HashMap = HashMap::new(); + let kind = MergeFailureKind::ConflictDetected(None); + + // Fire 3 MergeFailure events with NO mergemaster (recovery_running=false). + for _ in 0..3 { + let fired = make_merge_failure_fired(story_id, kind.clone()); + on_transition(tmp.path(), &fired, &mut counters, false); + } + + // Story must be Blocked (genuine-stuck case unchanged). + let item = crate::pipeline_state::read_typed(story_id) + .expect("read") + .expect("item"); + assert!( + matches!(item.stage, Stage::Blocked { .. }), + "story must still block when no mergemaster is running: {:?}", + item.stage + ); + } } diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 1b78e59e..25a7a116 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -69,8 +69,13 @@ pub(crate) fn spawn_event_bridges( ); // Consecutive-failure auto-block subscriber: blocks stories after N - // consecutive MergeFailure transitions (story 1018). - crate::agents::pool::auto_assign::spawn_merge_failure_block_subscriber(root.clone()); + // consecutive MergeFailure transitions (story 1018). Bug 1025: takes + // the agent pool so it can gate the counter on mergemaster presence — + // failures during active recovery iteration do not count toward block. + crate::agents::pool::auto_assign::spawn_merge_failure_block_subscriber( + Arc::clone(&agents), + root.clone(), + ); // Content-store GC subscriber: purges all ContentKey::* entries for a // story when it reaches a terminal stage, preventing zombie entries from