huskies: merge 1018

This commit is contained in:
dave
2026-05-14 09:33:50 +00:00
parent 8b2ba1c810
commit 309542cf2c
12 changed files with 360 additions and 7 deletions
@@ -0,0 +1,311 @@
//! TransitionFired subscriber that auto-blocks stories after N consecutive MergeFailure transitions.
//!
//! Listens on the pipeline transition broadcast channel and, for each story,
//! counts how many times it has entered [`Stage::MergeFailure`] consecutively.
//! When the count reaches the configurable threshold (default 3), the story is
//! transitioned to [`Stage::Blocked`] with a reason that names the failure kind.
//!
//! The counter for a story resets whenever a non-`MergeFailure` transition fires
//! for that story (e.g. after a successful merge or a `FixupRequested` demotion
//! back to coding).
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use crate::pipeline_state::{MergeFailureKind, PipelineEvent, Stage, StoryId};
use crate::slog;
use crate::slog_warn;
/// Spawn a background task that blocks stories after N consecutive `MergeFailure` transitions.
///
/// Subscribes to the pipeline transition broadcast channel and tracks a per-story
/// consecutive-failure counter. When a story's count reaches the threshold configured
/// in `project.toml` (`merge_failure_block_threshold`, default 3), the story is
/// transitioned to `Stage::Blocked` with a reason that names the failure kind.
///
/// The counter resets when the story leaves `MergeFailure` (e.g. on `FixupRequested`,
/// `ReQueuedForQa`, or a successful merge via `Unblock → Merge → Done`).
pub(crate) fn spawn_merge_failure_block_subscriber(project_root: PathBuf) {
let mut rx = crate::pipeline_state::subscribe_transitions();
tokio::spawn(async move {
let mut counters: HashMap<StoryId, (u32, MergeFailureKind)> = HashMap::new();
loop {
match rx.recv().await {
Ok(fired) => {
on_transition(&project_root, &fired, &mut counters);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
slog_warn!(
"[merge-block-sub] Subscriber lagged, skipped {n} event(s). \
Some consecutive-failure counts may be understated."
);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
/// Handle a single transition event: update counters and emit Block if threshold is reached.
fn on_transition(
project_root: &Path,
fired: &crate::pipeline_state::TransitionFired,
counters: &mut HashMap<StoryId, (u32, MergeFailureKind)>,
) {
match &fired.after {
Stage::MergeFailure { kind, .. } => {
let entry = counters
.entry(fired.story_id.clone())
.or_insert_with(|| (0, kind.clone()));
entry.0 += 1;
entry.1 = kind.clone();
let count = entry.0;
let threshold = load_threshold(project_root);
if threshold == 0 {
return;
}
if count >= threshold {
let kind_str = failure_kind_label(kind);
let reason = format!(
"Auto-blocked after {count} consecutive MergeFailure ({kind_str}) transitions."
);
let story_id = fired.story_id.0.as_str();
slog!(
"[merge-block-sub] Story '{story_id}' reached {count} consecutive \
MergeFailure ({kind_str}); blocking."
);
if let Err(e) = crate::pipeline_state::apply_transition(
story_id,
PipelineEvent::Block { reason },
None,
) {
slog_warn!("[merge-block-sub] Failed to block '{story_id}': {e}");
} else {
counters.remove(&fired.story_id);
}
}
}
_ => {
counters.remove(&fired.story_id);
}
}
}
/// Load the threshold from project config, falling back to the compiled default.
fn load_threshold(project_root: &Path) -> u32 {
crate::config::ProjectConfig::load(project_root)
.map(|c| c.merge_failure_block_threshold)
.unwrap_or(3)
}
/// Short human-readable label for a [`MergeFailureKind`] variant.
fn failure_kind_label(kind: &MergeFailureKind) -> &'static str {
match kind {
MergeFailureKind::ConflictDetected(_) => "ConflictDetected",
MergeFailureKind::GatesFailed(_) => "GatesFailed",
MergeFailureKind::EmptyDiff => "EmptyDiff",
MergeFailureKind::NoCommits => "NoCommits",
MergeFailureKind::Other(_) => "Other",
}
}
// ── Tests ──────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use crate::pipeline_state::{BranchName, PipelineEvent, Stage, StoryId, TransitionFired};
use std::num::NonZeroU32;
fn setup_project(tmp: &tempfile::TempDir) {
let sk = tmp.path().join(".huskies");
std::fs::create_dir_all(&sk).unwrap();
std::fs::write(sk.join("project.toml"), "[[agent]]\nname = \"coder\"\n").unwrap();
}
fn seed_at_merge(story_id: &str) {
crate::crdt_state::init_for_test();
crate::db::ensure_content_store();
crate::db::write_item_with_content(
story_id,
"4_merge",
"---\nname: Test\n---\n",
crate::db::ItemMeta::named("Test"),
);
}
fn make_merge_failure_fired(story_id: &str, kind: MergeFailureKind) -> TransitionFired {
TransitionFired {
story_id: StoryId(story_id.to_string()),
before: Stage::Merge {
feature_branch: BranchName("feature/test".to_string()),
commits_ahead: NonZeroU32::new(1).unwrap(),
claim: None,
},
after: Stage::MergeFailure {
kind: kind.clone(),
feature_branch: BranchName("feature/test".to_string()),
commits_ahead: NonZeroU32::new(1).unwrap(),
},
event: PipelineEvent::MergeFailed { kind },
at: chrono::Utc::now(),
}
}
fn make_coding_fired(story_id: &str) -> TransitionFired {
TransitionFired {
story_id: StoryId(story_id.to_string()),
before: Stage::MergeFailure {
kind: MergeFailureKind::GatesFailed("error".to_string()),
feature_branch: BranchName("feature/test".to_string()),
commits_ahead: NonZeroU32::new(1).unwrap(),
},
after: Stage::Coding {
claim: None,
plan: Default::default(),
},
event: PipelineEvent::FixupRequested,
at: chrono::Utc::now(),
}
}
/// AC3 (threshold-not-reached): 2 consecutive failures below threshold of 3 must NOT block.
#[test]
fn below_threshold_does_not_block() {
let tmp = tempfile::tempdir().unwrap();
setup_project(&tmp);
let story_id = "1018_below";
seed_at_merge(story_id);
// Transition to MergeFailure once to establish the stage.
crate::agents::lifecycle::transition_to_merge_failure(
story_id,
MergeFailureKind::GatesFailed("error".to_string()),
)
.expect("initial MergeFailure transition");
let mut counters: HashMap<StoryId, (u32, MergeFailureKind)> = HashMap::new();
let kind = MergeFailureKind::GatesFailed("error".to_string());
// Fire 2 MergeFailure events (default threshold is 3).
for _ in 0..2 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters);
}
// Story must still be in MergeFailure (not Blocked).
let item = crate::pipeline_state::read_typed(story_id)
.expect("read")
.expect("item");
assert!(
matches!(item.stage, Stage::MergeFailure { .. }),
"story must still be in MergeFailure after 2 failures (threshold 3): {:?}",
item.stage
);
}
/// AC3 (threshold-reached): 3 consecutive failures at threshold of 3 must block.
#[test]
fn at_threshold_blocks_with_failure_kind_in_reason() {
let tmp = tempfile::tempdir().unwrap();
setup_project(&tmp);
let story_id = "1018_at_threshold";
seed_at_merge(story_id);
crate::agents::lifecycle::transition_to_merge_failure(
story_id,
MergeFailureKind::GatesFailed("fmt error".to_string()),
)
.expect("initial MergeFailure transition");
let mut counters: HashMap<StoryId, (u32, MergeFailureKind)> = HashMap::new();
let kind = MergeFailureKind::GatesFailed("fmt error".to_string());
// Fire 3 MergeFailure events — the 3rd must trigger the block.
for _ in 0..3 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters);
}
let item = crate::pipeline_state::read_typed(story_id)
.expect("read")
.expect("item");
assert!(
matches!(item.stage, Stage::Blocked { .. }),
"story must be Blocked after 3 consecutive MergeFailures: {:?}",
item.stage
);
// The block reason must name the failure kind.
if let Stage::Blocked { reason } = &item.stage {
assert!(
reason.contains("GatesFailed"),
"block reason must name the failure kind: {reason}"
);
}
}
/// AC3 (reset): counter clears after a non-MergeFailure transition.
///
/// 2 failures → FixupRequested reset → 2 more failures: still below threshold, no block.
#[test]
fn counter_resets_on_non_merge_failure_transition() {
let tmp = tempfile::tempdir().unwrap();
setup_project(&tmp);
let story_id = "1018_reset";
seed_at_merge(story_id);
crate::agents::lifecycle::transition_to_merge_failure(
story_id,
MergeFailureKind::ConflictDetected(None),
)
.expect("initial MergeFailure transition");
let mut counters: HashMap<StoryId, (u32, MergeFailureKind)> = HashMap::new();
let kind = MergeFailureKind::ConflictDetected(None);
// Fire 2 MergeFailure events.
for _ in 0..2 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters);
}
assert_eq!(
counters.get(&StoryId(story_id.to_string())).map(|e| e.0),
Some(2),
"counter must be 2 after 2 failures"
);
// Simulate FixupRequested (non-MergeFailure transition).
let reset_fired = make_coding_fired(story_id);
on_transition(tmp.path(), &reset_fired, &mut counters);
assert!(
!counters.contains_key(&StoryId(story_id.to_string())),
"counter must be cleared after non-MergeFailure transition"
);
// Re-seed to MergeFailure so we can apply the block transition.
crate::agents::lifecycle::transition_to_merge_failure(
story_id,
MergeFailureKind::ConflictDetected(None),
)
.expect("re-enter MergeFailure after reset");
// Fire 2 more MergeFailure events — still below threshold.
for _ in 0..2 {
let fired = make_merge_failure_fired(story_id, kind.clone());
on_transition(tmp.path(), &fired, &mut counters);
}
let item = crate::pipeline_state::read_typed(story_id)
.expect("read")
.expect("item");
assert!(
matches!(item.stage, Stage::MergeFailure { .. }),
"story must still be in MergeFailure after reset + 2 new failures: {:?}",
item.stage
);
}
}
@@ -4,6 +4,8 @@
mod auto_assign;
mod backlog;
mod merge;
/// TransitionFired subscriber that auto-blocks stories after N consecutive MergeFailure transitions.
pub(crate) mod merge_failure_block_subscriber;
/// TransitionFired subscriber that auto-spawns mergemaster on ConflictDetected merge failures.
pub(crate) mod merge_failure_subscriber;
mod pipeline;
@@ -15,5 +17,7 @@ pub(crate) mod watchdog;
// so that pool::lifecycle and pool::pipeline continue to access them unchanged.
pub(super) use scan::{find_free_agent_for_stage, is_agent_free};
/// Re-export for `startup::tick_loop`.
pub(crate) use merge_failure_block_subscriber::spawn_merge_failure_block_subscriber;
/// Re-export for `startup::tick_loop`.
pub(crate) use merge_failure_subscriber::spawn_merge_failure_subscriber;
+14
View File
@@ -139,6 +139,12 @@ pub struct ProjectConfig {
/// unaffected. Default: `true`.
#[serde(default = "default_status_push_enabled")]
pub status_push_enabled: bool,
/// Number of consecutive `MergeFailure` transitions before a story is
/// automatically blocked. The auto-block subscriber resets the counter
/// whenever the story leaves `MergeFailure` (e.g. after a successful
/// merge or `FixupRequested`). Default: 3. Set to 0 to disable.
#[serde(default = "default_merge_failure_block_threshold")]
pub merge_failure_block_threshold: u32,
}
/// Configuration for the filesystem watcher's sweep behaviour.
@@ -210,6 +216,10 @@ fn default_status_push_enabled() -> bool {
true
}
fn default_merge_failure_block_threshold() -> u32 {
3
}
fn default_max_mesh_peers() -> usize {
3
}
@@ -368,6 +378,7 @@ impl Default for ProjectConfig {
gateway_url: None,
gateway_project: None,
status_push_enabled: default_status_push_enabled(),
merge_failure_block_threshold: default_merge_failure_block_threshold(),
}
}
}
@@ -457,6 +468,7 @@ impl ProjectConfig {
gateway_url: None,
gateway_project: None,
status_push_enabled: default_status_push_enabled(),
merge_failure_block_threshold: default_merge_failure_block_threshold(),
};
validate_agents(&config.agent)?;
return Ok(config);
@@ -497,6 +509,7 @@ impl ProjectConfig {
gateway_url: None,
gateway_project: None,
status_push_enabled: default_status_push_enabled(),
merge_failure_block_threshold: default_merge_failure_block_threshold(),
};
validate_agents(&config.agent)?;
Ok(config)
@@ -525,6 +538,7 @@ impl ProjectConfig {
gateway_url: None,
gateway_project: None,
status_push_enabled: default_status_push_enabled(),
merge_failure_block_threshold: default_merge_failure_block_threshold(),
})
}
}
+1 -1
View File
@@ -507,7 +507,7 @@ fn project_stage_for_view(
}),
"qa" => Some(Stage::Qa),
"blocked" => Some(Stage::Blocked {
reason: String::new(),
reason: resume_to.unwrap_or("").to_string(),
}),
"merge" => Some(Stage::Merge {
feature_branch: BranchName(format!("feature/story-{story_id}")),
+18 -4
View File
@@ -119,8 +119,16 @@ async fn subscribe_receives_stage_transition_events() {
None,
);
let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on insert");
assert_eq!(evt.story_id, "906_story_subscribe");
// Drain any stale events from concurrent tests until we see ours.
// CRDT_EVENT_TX is global; parallel tests emit to the same channel.
let evt: CrdtEvent = loop {
let e = rx
.try_recv()
.expect("expected CrdtEvent for 906_story_subscribe");
if e.story_id == "906_story_subscribe" {
break e;
}
};
assert!(evt.from_stage.is_none());
assert!(matches!(
evt.to_stage,
@@ -138,8 +146,14 @@ async fn subscribe_receives_stage_transition_events() {
None,
);
let evt: CrdtEvent = rx.try_recv().expect("expected CrdtEvent on stage change");
assert_eq!(evt.story_id, "906_story_subscribe");
let evt: CrdtEvent = loop {
let e = rx
.try_recv()
.expect("expected CrdtEvent for 906_story_subscribe stage change");
if e.story_id == "906_story_subscribe" {
break e;
}
};
assert!(matches!(
evt.from_stage,
Some(crate::pipeline_state::Stage::Backlog)
+1 -1
View File
@@ -75,7 +75,7 @@ pub fn apply_transition(
super::Stage::Superseded { superseded_by, .. } => {
crate::crdt_state::set_resume_to_raw(story_id, &superseded_by.0);
}
super::Stage::Rejected { reason, .. } => {
super::Stage::Rejected { reason, .. } | super::Stage::Blocked { reason } => {
crate::crdt_state::set_resume_to_raw(story_id, reason);
}
_ => {}
+3 -1
View File
@@ -203,7 +203,9 @@ pub fn transition(state: Stage, event: PipelineEvent) -> Result<Stage, Transitio
(Backlog, Block { reason })
| (Coding { .. }, Block { reason })
| (Qa, Block { reason })
| (Merge { .. }, Block { reason }) => Ok(Blocked { reason }),
| (Merge { .. }, Block { reason })
| (MergeFailure { .. }, Block { reason })
| (MergeFailureFinal { .. }, Block { reason }) => Ok(Blocked { reason }),
// Story 945: ReviewHold no longer auto-archives. It transitions the
// story to `Stage::ReviewHold { resume_to, reason }`, preserving the
+4
View File
@@ -68,6 +68,10 @@ pub(crate) fn spawn_event_bridges(
root.clone(),
);
// Consecutive-failure auto-block subscriber: blocks stories after N
// consecutive MergeFailure transitions (story 1018).
crate::agents::pool::auto_assign::spawn_merge_failure_block_subscriber(root.clone());
// Content-store GC subscriber: purges all ContentKey::* entries for a
// story when it reaches a terminal stage, preventing zombie entries from
// accumulating in the process heap (story 996).
+1
View File
@@ -214,6 +214,7 @@ mod tests {
gateway_url: None,
gateway_project: None,
status_push_enabled: true,
merge_failure_block_threshold: 3,
}
}
+1
View File
@@ -220,6 +220,7 @@ mod tests {
gateway_url: None,
gateway_project: None,
status_push_enabled: true,
merge_failure_block_threshold: 3,
}
}
+1
View File
@@ -92,6 +92,7 @@ mod tests {
gateway_url: None,
gateway_project: None,
status_push_enabled: true,
merge_failure_block_threshold: 3,
}
}
+1
View File
@@ -135,6 +135,7 @@ mod tests {
gateway_url: None,
gateway_project: None,
status_push_enabled: true,
merge_failure_block_threshold: 3,
}
}