fix(1025): gate auto-block counter on mergemaster presence

1018's merge_failure_block_subscriber counted every MergeFailure transition
toward the 3-strike block threshold, but mergemaster's recovery iterations
(squash → fail → fix → retry) emit multiple MergeFailure transitions while
making real progress. Story 997 was blocked at 10:59:46 while mergemaster
was still resolving conflicts and would have succeeded a minute later.

Fix: pass the AgentPool to the subscriber. When a mergemaster agent is in
the pool for the story, MergeFailure transitions are recovery iterations
in progress and do NOT increment the consecutive-failure counter. Block
only fires for the genuinely-stuck case (no recovery agent attached and N
consecutive failures accumulate).

Tests:
- mergemaster_running_suppresses_block: 3 failures with recovery_running=true
  → counter stays empty, story stays in MergeFailure
- no_mergemaster_still_blocks_at_threshold: 3 failures with recovery_running=false
  → blocks (1018 behaviour preserved)

All 2938 tests pass.
This commit is contained in:
Timmy
2026-05-14 12:13:37 +01:00
parent c7a7cb4281
commit 8e996e2bd3
2 changed files with 134 additions and 9 deletions
@@ -11,11 +11,16 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::pipeline_state::{MergeFailureKind, PipelineEvent, Stage, StoryId};
use crate::slog;
use crate::slog_warn;
use super::super::super::PipelineStage;
use super::super::AgentPool;
use super::scan::is_story_assigned_for_stage;
/// Spawn a background task that blocks stories after N consecutive `MergeFailure` transitions.
///
/// Subscribes to the pipeline transition broadcast channel and tracks a per-story
@@ -25,14 +30,23 @@ use crate::slog_warn;
///
/// The counter resets when the story leaves `MergeFailure` (e.g. on `FixupRequested`,
/// `ReQueuedForQa`, or a successful merge via `Unblock → Merge → Done`).
pub(crate) fn spawn_merge_failure_block_subscriber(project_root: PathBuf) {
///
/// Bug 1025: while a mergemaster is actively running on the story, its
/// iteration loop (squash → fail → fix → retry) generates multiple
/// MergeFailure transitions. Those are NOT consecutive give-ups — they are
/// recovery iterations in progress. We skip counter increments while a
/// mergemaster is in the pool for the story; the counter only increments on
/// transitions that happen with no recovery agent attached.
pub(crate) fn spawn_merge_failure_block_subscriber(pool: Arc<AgentPool>, project_root: PathBuf) {
let mut rx = crate::pipeline_state::subscribe_transitions();
tokio::spawn(async move {
let mut counters: HashMap<StoryId, (u32, MergeFailureKind)> = HashMap::new();
loop {
match rx.recv().await {
Ok(fired) => {
on_transition(&project_root, &fired, &mut counters);
let recovery_running =
is_mergemaster_running(&pool, &project_root, &fired.story_id.0);
on_transition(&project_root, &fired, &mut counters, recovery_running);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
slog_warn!(
@@ -46,14 +60,42 @@ pub(crate) fn spawn_merge_failure_block_subscriber(project_root: PathBuf) {
});
}
/// Return true if a mergemaster agent is currently in the pool for `story_id`.
/// Used to suppress counter increments while recovery is actively iterating
/// (bug 1025).
fn is_mergemaster_running(pool: &AgentPool, project_root: &Path, story_id: &str) -> bool {
let config = match crate::config::ProjectConfig::load(project_root) {
Ok(c) => c,
Err(_) => return false,
};
let agents = match pool.agents.lock() {
Ok(a) => a,
Err(_) => return false,
};
is_story_assigned_for_stage(&config, &agents, story_id, &PipelineStage::Mergemaster)
}
/// Handle a single transition event: update counters and emit Block if threshold is reached.
///
/// `recovery_running`: when `true`, a mergemaster is currently in the pool for
/// the story and the failure is part of an in-flight recovery loop. We do NOT
/// increment the consecutive-failure counter in that case (bug 1025).
fn on_transition(
project_root: &Path,
fired: &crate::pipeline_state::TransitionFired,
counters: &mut HashMap<StoryId, (u32, MergeFailureKind)>,
recovery_running: bool,
) {
match &fired.after {
Stage::MergeFailure { kind, .. } => {
if recovery_running {
slog!(
"[merge-block-sub] Story '{}' MergeFailure while mergemaster is running; \
not counting toward block threshold (recovery in progress).",
fired.story_id.0
);
return;
}
let entry = counters
.entry(fired.story_id.clone())
.or_insert_with(|| (0, kind.clone()));
@@ -195,7 +237,7 @@ mod tests {
// Fire 2 MergeFailure events (default threshold is 3).
for _ in 0..2 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters);
on_transition(tmp.path(), &fired, &mut counters, false);
}
// Story must still be in MergeFailure (not Blocked).
@@ -229,7 +271,7 @@ mod tests {
// Fire 3 MergeFailure events — the 3rd must trigger the block.
for _ in 0..3 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters);
on_transition(tmp.path(), &fired, &mut counters, false);
}
let item = crate::pipeline_state::read_typed(story_id)
@@ -272,7 +314,7 @@ mod tests {
// Fire 2 MergeFailure events.
for _ in 0..2 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters);
on_transition(tmp.path(), &fired, &mut counters, false);
}
assert_eq!(
counters.get(&StoryId(story_id.to_string())).map(|e| e.0),
@@ -282,7 +324,7 @@ mod tests {
// Simulate FixupRequested (non-MergeFailure transition).
let reset_fired = make_coding_fired(story_id);
on_transition(tmp.path(), &reset_fired, &mut counters);
on_transition(tmp.path(), &reset_fired, &mut counters, false);
assert!(
!counters.contains_key(&StoryId(story_id.to_string())),
"counter must be cleared after non-MergeFailure transition"
@@ -298,7 +340,7 @@ mod tests {
// Fire 2 more MergeFailure events — still below threshold.
for _ in 0..2 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters);
on_transition(tmp.path(), &fired, &mut counters, false);
}
let item = crate::pipeline_state::read_typed(story_id)
@@ -310,4 +352,82 @@ mod tests {
item.stage
);
}
/// Bug 1025: while a mergemaster is running, MergeFailure transitions are
/// recovery iterations, not consecutive give-ups. 3 failures with
/// `recovery_running=true` must NOT block.
#[test]
fn mergemaster_running_suppresses_block() {
let tmp = tempfile::tempdir().unwrap();
setup_project(&tmp);
let story_id = "1025_recovery_running";
seed_at_merge(story_id);
crate::agents::lifecycle::transition_to_merge_failure(
story_id,
MergeFailureKind::ConflictDetected(None),
)
.expect("initial MergeFailure transition");
let mut counters: HashMap<StoryId, (u32, MergeFailureKind)> = HashMap::new();
let kind = MergeFailureKind::ConflictDetected(None);
// Fire 3 MergeFailure events WHILE a mergemaster is running (gated).
for _ in 0..3 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters, true);
}
// Counter must NOT have incremented at all — recovery in progress.
assert!(
!counters.contains_key(&StoryId(story_id.to_string())),
"counter must not increment while mergemaster is running"
);
// And the story must still be in MergeFailure (not Blocked).
let item = crate::pipeline_state::read_typed(story_id)
.expect("read")
.expect("item");
assert!(
matches!(item.stage, Stage::MergeFailure { .. }),
"story must NOT be blocked while mergemaster is running (recovery in progress): {:?}",
item.stage
);
}
/// Bug 1025 regression guard: the genuinely-stuck case (no mergemaster
/// running) still blocks at the threshold, so the original 1018 behaviour
/// is preserved.
#[test]
fn no_mergemaster_still_blocks_at_threshold() {
let tmp = tempfile::tempdir().unwrap();
setup_project(&tmp);
let story_id = "1025_genuine_stuck";
seed_at_merge(story_id);
crate::agents::lifecycle::transition_to_merge_failure(
story_id,
MergeFailureKind::ConflictDetected(None),
)
.expect("initial MergeFailure transition");
let mut counters: HashMap<StoryId, (u32, MergeFailureKind)> = HashMap::new();
let kind = MergeFailureKind::ConflictDetected(None);
// Fire 3 MergeFailure events with NO mergemaster (recovery_running=false).
for _ in 0..3 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters, false);
}
// Story must be Blocked (genuine-stuck case unchanged).
let item = crate::pipeline_state::read_typed(story_id)
.expect("read")
.expect("item");
assert!(
matches!(item.stage, Stage::Blocked { .. }),
"story must still block when no mergemaster is running: {:?}",
item.stage
);
}
}