diff --git a/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs new file mode 100644 index 00000000..19bf4130 --- /dev/null +++ b/server/src/agents/pool/auto_assign/merge_failure_block_subscriber.rs @@ -0,0 +1,311 @@ +//! TransitionFired subscriber that auto-blocks stories after N consecutive MergeFailure transitions. +//! +//! Listens on the pipeline transition broadcast channel and, for each story, +//! counts how many times it has entered [`Stage::MergeFailure`] consecutively. +//! When the count reaches the configurable threshold (default 3), the story is +//! transitioned to [`Stage::Blocked`] with a reason that names the failure kind. +//! +//! The counter for a story resets whenever a non-`MergeFailure` transition fires +//! for that story (e.g. after a successful merge or a `FixupRequested` demotion +//! back to coding). + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; + +use crate::pipeline_state::{MergeFailureKind, PipelineEvent, Stage, StoryId}; +use crate::slog; +use crate::slog_warn; + +/// Spawn a background task that blocks stories after N consecutive `MergeFailure` transitions. +/// +/// Subscribes to the pipeline transition broadcast channel and tracks a per-story +/// consecutive-failure counter. When a story's count reaches the threshold configured +/// in `project.toml` (`merge_failure_block_threshold`, default 3), the story is +/// transitioned to `Stage::Blocked` with a reason that names the failure kind. +/// +/// The counter resets when the story leaves `MergeFailure` (e.g. on `FixupRequested`, +/// `ReQueuedForQa`, or a successful merge via `Unblock → Merge → Done`). +pub(crate) fn spawn_merge_failure_block_subscriber(project_root: PathBuf) { + let mut rx = crate::pipeline_state::subscribe_transitions(); + tokio::spawn(async move { + let mut counters: HashMap = HashMap::new(); + loop { + match rx.recv().await { + Ok(fired) => { + on_transition(&project_root, &fired, &mut counters); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog_warn!( + "[merge-block-sub] Subscriber lagged, skipped {n} event(s). \ + Some consecutive-failure counts may be understated." + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + +/// Handle a single transition event: update counters and emit Block if threshold is reached. +fn on_transition( + project_root: &Path, + fired: &crate::pipeline_state::TransitionFired, + counters: &mut HashMap, +) { + match &fired.after { + Stage::MergeFailure { kind, .. } => { + let entry = counters + .entry(fired.story_id.clone()) + .or_insert_with(|| (0, kind.clone())); + entry.0 += 1; + entry.1 = kind.clone(); + + let count = entry.0; + let threshold = load_threshold(project_root); + + if threshold == 0 { + return; + } + + if count >= threshold { + let kind_str = failure_kind_label(kind); + let reason = format!( + "Auto-blocked after {count} consecutive MergeFailure ({kind_str}) transitions." + ); + let story_id = fired.story_id.0.as_str(); + slog!( + "[merge-block-sub] Story '{story_id}' reached {count} consecutive \ + MergeFailure ({kind_str}); blocking." + ); + if let Err(e) = crate::pipeline_state::apply_transition( + story_id, + PipelineEvent::Block { reason }, + None, + ) { + slog_warn!("[merge-block-sub] Failed to block '{story_id}': {e}"); + } else { + counters.remove(&fired.story_id); + } + } + } + _ => { + counters.remove(&fired.story_id); + } + } +} + +/// Load the threshold from project config, falling back to the compiled default. +fn load_threshold(project_root: &Path) -> u32 { + crate::config::ProjectConfig::load(project_root) + .map(|c| c.merge_failure_block_threshold) + .unwrap_or(3) +} + +/// Short human-readable label for a [`MergeFailureKind`] variant. +fn failure_kind_label(kind: &MergeFailureKind) -> &'static str { + match kind { + MergeFailureKind::ConflictDetected(_) => "ConflictDetected", + MergeFailureKind::GatesFailed(_) => "GatesFailed", + MergeFailureKind::EmptyDiff => "EmptyDiff", + MergeFailureKind::NoCommits => "NoCommits", + MergeFailureKind::Other(_) => "Other", + } +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::pipeline_state::{BranchName, PipelineEvent, Stage, StoryId, TransitionFired}; + use std::num::NonZeroU32; + + fn setup_project(tmp: &tempfile::TempDir) { + let sk = tmp.path().join(".huskies"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write(sk.join("project.toml"), "[[agent]]\nname = \"coder\"\n").unwrap(); + } + + fn seed_at_merge(story_id: &str) { + crate::crdt_state::init_for_test(); + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + story_id, + "4_merge", + "---\nname: Test\n---\n", + crate::db::ItemMeta::named("Test"), + ); + } + + fn make_merge_failure_fired(story_id: &str, kind: MergeFailureKind) -> TransitionFired { + TransitionFired { + story_id: StoryId(story_id.to_string()), + before: Stage::Merge { + feature_branch: BranchName("feature/test".to_string()), + commits_ahead: NonZeroU32::new(1).unwrap(), + claim: None, + }, + after: Stage::MergeFailure { + kind: kind.clone(), + feature_branch: BranchName("feature/test".to_string()), + commits_ahead: NonZeroU32::new(1).unwrap(), + }, + event: PipelineEvent::MergeFailed { kind }, + at: chrono::Utc::now(), + } + } + + fn make_coding_fired(story_id: &str) -> TransitionFired { + TransitionFired { + story_id: StoryId(story_id.to_string()), + before: Stage::MergeFailure { + kind: MergeFailureKind::GatesFailed("error".to_string()), + feature_branch: BranchName("feature/test".to_string()), + commits_ahead: NonZeroU32::new(1).unwrap(), + }, + after: Stage::Coding { + claim: None, + plan: Default::default(), + }, + event: PipelineEvent::FixupRequested, + at: chrono::Utc::now(), + } + } + + /// AC3 (threshold-not-reached): 2 consecutive failures below threshold of 3 must NOT block. + #[test] + fn below_threshold_does_not_block() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "1018_below"; + seed_at_merge(story_id); + + // Transition to MergeFailure once to establish the stage. + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::GatesFailed("error".to_string()), + ) + .expect("initial MergeFailure transition"); + + let mut counters: HashMap = HashMap::new(); + let kind = MergeFailureKind::GatesFailed("error".to_string()); + + // Fire 2 MergeFailure events (default threshold is 3). + for _ in 0..2 { + let fired = make_merge_failure_fired(story_id, kind.clone()); + on_transition(tmp.path(), &fired, &mut counters); + } + + // Story must still be in MergeFailure (not Blocked). + let item = crate::pipeline_state::read_typed(story_id) + .expect("read") + .expect("item"); + assert!( + matches!(item.stage, Stage::MergeFailure { .. }), + "story must still be in MergeFailure after 2 failures (threshold 3): {:?}", + item.stage + ); + } + + /// AC3 (threshold-reached): 3 consecutive failures at threshold of 3 must block. + #[test] + fn at_threshold_blocks_with_failure_kind_in_reason() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "1018_at_threshold"; + seed_at_merge(story_id); + + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::GatesFailed("fmt error".to_string()), + ) + .expect("initial MergeFailure transition"); + + let mut counters: HashMap = HashMap::new(); + let kind = MergeFailureKind::GatesFailed("fmt error".to_string()); + + // Fire 3 MergeFailure events — the 3rd must trigger the block. + for _ in 0..3 { + let fired = make_merge_failure_fired(story_id, kind.clone()); + on_transition(tmp.path(), &fired, &mut counters); + } + + let item = crate::pipeline_state::read_typed(story_id) + .expect("read") + .expect("item"); + assert!( + matches!(item.stage, Stage::Blocked { .. }), + "story must be Blocked after 3 consecutive MergeFailures: {:?}", + item.stage + ); + + // The block reason must name the failure kind. + if let Stage::Blocked { reason } = &item.stage { + assert!( + reason.contains("GatesFailed"), + "block reason must name the failure kind: {reason}" + ); + } + } + + /// AC3 (reset): counter clears after a non-MergeFailure transition. + /// + /// 2 failures → FixupRequested reset → 2 more failures: still below threshold, no block. + #[test] + fn counter_resets_on_non_merge_failure_transition() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "1018_reset"; + seed_at_merge(story_id); + + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::ConflictDetected(None), + ) + .expect("initial MergeFailure transition"); + + let mut counters: HashMap = HashMap::new(); + let kind = MergeFailureKind::ConflictDetected(None); + + // Fire 2 MergeFailure events. + for _ in 0..2 { + let fired = make_merge_failure_fired(story_id, kind.clone()); + on_transition(tmp.path(), &fired, &mut counters); + } + assert_eq!( + counters.get(&StoryId(story_id.to_string())).map(|e| e.0), + Some(2), + "counter must be 2 after 2 failures" + ); + + // Simulate FixupRequested (non-MergeFailure transition). + let reset_fired = make_coding_fired(story_id); + on_transition(tmp.path(), &reset_fired, &mut counters); + assert!( + !counters.contains_key(&StoryId(story_id.to_string())), + "counter must be cleared after non-MergeFailure transition" + ); + + // Re-seed to MergeFailure so we can apply the block transition. + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::ConflictDetected(None), + ) + .expect("re-enter MergeFailure after reset"); + + // Fire 2 more MergeFailure events — still below threshold. + for _ in 0..2 { + let fired = make_merge_failure_fired(story_id, kind.clone()); + on_transition(tmp.path(), &fired, &mut counters); + } + + let item = crate::pipeline_state::read_typed(story_id) + .expect("read") + .expect("item"); + assert!( + matches!(item.stage, Stage::MergeFailure { .. }), + "story must still be in MergeFailure after reset + 2 new failures: {:?}", + item.stage + ); + } +} diff --git a/server/src/agents/pool/auto_assign/mod.rs b/server/src/agents/pool/auto_assign/mod.rs index 19215ad2..943206c6 100644 --- a/server/src/agents/pool/auto_assign/mod.rs +++ b/server/src/agents/pool/auto_assign/mod.rs @@ -4,6 +4,8 @@ mod auto_assign; mod backlog; mod merge; +/// TransitionFired subscriber that auto-blocks stories after N consecutive MergeFailure transitions. +pub(crate) mod merge_failure_block_subscriber; /// TransitionFired subscriber that auto-spawns mergemaster on ConflictDetected merge failures. pub(crate) mod merge_failure_subscriber; mod pipeline; @@ -15,5 +17,7 @@ pub(crate) mod watchdog; // so that pool::lifecycle and pool::pipeline continue to access them unchanged. pub(super) use scan::{find_free_agent_for_stage, is_agent_free}; +/// Re-export for `startup::tick_loop`. +pub(crate) use merge_failure_block_subscriber::spawn_merge_failure_block_subscriber; /// Re-export for `startup::tick_loop`. pub(crate) use merge_failure_subscriber::spawn_merge_failure_subscriber; diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index 397b8866..901edbeb 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -139,6 +139,12 @@ pub struct ProjectConfig { /// unaffected. Default: `true`. #[serde(default = "default_status_push_enabled")] pub status_push_enabled: bool, + /// Number of consecutive `MergeFailure` transitions before a story is + /// automatically blocked. The auto-block subscriber resets the counter + /// whenever the story leaves `MergeFailure` (e.g. after a successful + /// merge or `FixupRequested`). Default: 3. Set to 0 to disable. + #[serde(default = "default_merge_failure_block_threshold")] + pub merge_failure_block_threshold: u32, } /// Configuration for the filesystem watcher's sweep behaviour. @@ -210,6 +216,10 @@ fn default_status_push_enabled() -> bool { true } +fn default_merge_failure_block_threshold() -> u32 { + 3 +} + fn default_max_mesh_peers() -> usize { 3 } @@ -368,6 +378,7 @@ impl Default for ProjectConfig { gateway_url: None, gateway_project: None, status_push_enabled: default_status_push_enabled(), + merge_failure_block_threshold: default_merge_failure_block_threshold(), } } } @@ -457,6 +468,7 @@ impl ProjectConfig { gateway_url: None, gateway_project: None, status_push_enabled: default_status_push_enabled(), + merge_failure_block_threshold: default_merge_failure_block_threshold(), }; validate_agents(&config.agent)?; return Ok(config); @@ -497,6 +509,7 @@ impl ProjectConfig { gateway_url: None, gateway_project: None, status_push_enabled: default_status_push_enabled(), + merge_failure_block_threshold: default_merge_failure_block_threshold(), }; validate_agents(&config.agent)?; Ok(config) @@ -525,6 +538,7 @@ impl ProjectConfig { gateway_url: None, gateway_project: None, status_push_enabled: default_status_push_enabled(), + merge_failure_block_threshold: default_merge_failure_block_threshold(), }) } } diff --git a/server/src/crdt_state/read.rs b/server/src/crdt_state/read.rs index 83252563..d8848cd0 100644 --- a/server/src/crdt_state/read.rs +++ b/server/src/crdt_state/read.rs @@ -507,7 +507,7 @@ fn project_stage_for_view( }), "qa" => Some(Stage::Qa), "blocked" => Some(Stage::Blocked { - reason: String::new(), + reason: resume_to.unwrap_or("").to_string(), }), "merge" => Some(Stage::Merge { feature_branch: BranchName(format!("feature/story-{story_id}")), diff --git a/server/src/crdt_state/state/tests.rs b/server/src/crdt_state/state/tests.rs index 4ef73937..e195354a 100644 --- a/server/src/crdt_state/state/tests.rs +++ b/server/src/crdt_state/state/tests.rs @@ -119,8 +119,16 @@ async fn subscribe_receives_stage_transition_events() { None, ); - let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on insert"); - assert_eq!(evt.story_id, "906_story_subscribe"); + // Drain any stale events from concurrent tests until we see ours. + // CRDT_EVENT_TX is global; parallel tests emit to the same channel. + let evt: CrdtEvent = loop { + let e = rx + .try_recv() + .expect("expected CrdtEvent for 906_story_subscribe"); + if e.story_id == "906_story_subscribe" { + break e; + } + }; assert!(evt.from_stage.is_none()); assert!(matches!( evt.to_stage, @@ -138,8 +146,14 @@ async fn subscribe_receives_stage_transition_events() { None, ); - let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on stage change"); - assert_eq!(evt.story_id, "906_story_subscribe"); + let evt: CrdtEvent = loop { + let e = rx + .try_recv() + .expect("expected CrdtEvent for 906_story_subscribe stage change"); + if e.story_id == "906_story_subscribe" { + break e; + } + }; assert!(matches!( evt.from_stage, Some(crate::pipeline_state::Stage::Backlog) diff --git a/server/src/pipeline_state/apply.rs b/server/src/pipeline_state/apply.rs index 0d31a7ff..e278bc16 100644 --- a/server/src/pipeline_state/apply.rs +++ b/server/src/pipeline_state/apply.rs @@ -75,7 +75,7 @@ pub fn apply_transition( super::Stage::Superseded { superseded_by, .. } => { crate::crdt_state::set_resume_to_raw(story_id, &superseded_by.0); } - super::Stage::Rejected { reason, .. } => { + super::Stage::Rejected { reason, .. } | super::Stage::Blocked { reason } => { crate::crdt_state::set_resume_to_raw(story_id, reason); } _ => {} diff --git a/server/src/pipeline_state/transition.rs b/server/src/pipeline_state/transition.rs index 9f4b299b..12035258 100644 --- a/server/src/pipeline_state/transition.rs +++ b/server/src/pipeline_state/transition.rs @@ -203,7 +203,9 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result Ok(Blocked { reason }), + | (Merge { .. }, Block { reason }) + | (MergeFailure { .. }, Block { reason }) + | (MergeFailureFinal { .. }, Block { reason }) => Ok(Blocked { reason }), // Story 945: ReviewHold no longer auto-archives. It transitions the // story to `Stage::ReviewHold { resume_to, reason }`, preserving the diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index c128d36d..1b78e59e 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -68,6 +68,10 @@ pub(crate) fn spawn_event_bridges( root.clone(), ); + // Consecutive-failure auto-block subscriber: blocks stories after N + // consecutive MergeFailure transitions (story 1018). + crate::agents::pool::auto_assign::spawn_merge_failure_block_subscriber(root.clone()); + // Content-store GC subscriber: purges all ContentKey::* entries for a // story when it reaches a terminal stage, preventing zombie entries from // accumulating in the process heap (story 996). diff --git a/server/src/worktree/cleanup.rs b/server/src/worktree/cleanup.rs index 4c22c7b0..e34612eb 100644 --- a/server/src/worktree/cleanup.rs +++ b/server/src/worktree/cleanup.rs @@ -214,6 +214,7 @@ mod tests { gateway_url: None, gateway_project: None, status_push_enabled: true, + merge_failure_block_threshold: 3, } } diff --git a/server/src/worktree/create.rs b/server/src/worktree/create.rs index 7047b7cb..0fcc61c1 100644 --- a/server/src/worktree/create.rs +++ b/server/src/worktree/create.rs @@ -220,6 +220,7 @@ mod tests { gateway_url: None, gateway_project: None, status_push_enabled: true, + merge_failure_block_threshold: 3, } } diff --git a/server/src/worktree/remove.rs b/server/src/worktree/remove.rs index cec5a723..894368b2 100644 --- a/server/src/worktree/remove.rs +++ b/server/src/worktree/remove.rs @@ -92,6 +92,7 @@ mod tests { gateway_url: None, gateway_project: None, status_push_enabled: true, + merge_failure_block_threshold: 3, } } diff --git a/server/src/worktree/sweep.rs b/server/src/worktree/sweep.rs index 58dd71c1..7ea2d9ba 100644 --- a/server/src/worktree/sweep.rs +++ b/server/src/worktree/sweep.rs @@ -135,6 +135,7 @@ mod tests { gateway_url: None, gateway_project: None, status_push_enabled: true, + merge_failure_block_threshold: 3, } }