diff --git a/server/src/agents/pool/auto_assign/auto_assign.rs b/server/src/agents/pool/auto_assign/auto_assign.rs index 63bdeaa1..5d2181de 100644 --- a/server/src/agents/pool/auto_assign/auto_assign.rs +++ b/server/src/agents/pool/auto_assign/auto_assign.rs @@ -509,252 +509,6 @@ mod tests { ); } - // ── Story 827: auto-spawn mergemaster on content conflict ───────────────── - - /// A story in 4_merge with a content-conflict merge_failure and no - /// mergemaster_attempted flag must trigger an auto-spawn of mergemaster. - #[tokio::test] - async fn auto_assign_spawns_mergemaster_for_content_conflict() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "9860_story_conflict", - "4_merge_failure", - "CONFLICT (content): server/src/lib.rs", - crate::db::ItemMeta::named("Conflict"), - ); - // After master c228ae16, has_content_conflict_failure reads from - // {story_id}:gate_output (not the story description), so seed it there. - crate::db::write_content( - crate::db::ContentKey::GateOutput("9860_story_conflict"), - "CONFLICT (content): server/src/lib.rs", - ); - - let pool = AgentPool::new_test(3001); - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - let mergemaster_spawned = agents.iter().any(|(key, a)| { - key.contains("9860_story_conflict") - && a.agent_name == "mergemaster" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - mergemaster_spawned, - "mergemaster should be spawned for a content-conflict story" - ); - } - - /// A story with merge_failure containing only "nothing to commit" must NOT - /// auto-spawn mergemaster. - #[tokio::test] - async fn auto_assign_does_not_spawn_mergemaster_for_non_conflict_failure() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "9861_story_nothing", - "4_merge_failure", - "nothing to commit, working tree clean", - crate::db::ItemMeta::named("Nothing"), - ); - - let pool = AgentPool::new_test(3001); - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - let mergemaster_spawned = agents.iter().any(|(key, a)| { - key.contains("9861_story_nothing") - && a.agent_name == "mergemaster" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - !mergemaster_spawned, - "mergemaster must not be spawned for non-conflict failures" - ); - } - - /// A story in 4_merge with blocked: true must NOT auto-spawn mergemaster - /// even when it has an unresolved content-conflict merge_failure and - /// mergemaster_attempted is still false. - #[tokio::test] - async fn auto_assign_does_not_spawn_mergemaster_for_blocked_story() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - crate::db::ensure_content_store(); - // Story 945: "blocked AND in 4_merge" is no longer representable as - // separate states. A blocked story lives in `Stage::Blocked` (which - // maps to wire-form "blocked"), so auto-assign won't see it in 4_merge. - crate::db::write_item_with_content( - "9863_story_blocked_conflict", - "blocked", - "CONFLICT (content): foo.rs", - crate::db::ItemMeta { - name: Some("Blocked conflict".to_string()), - ..Default::default() - }, - ); - - let pool = AgentPool::new_test(3001); - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - let mergemaster_spawned = agents.iter().any(|(key, a)| { - key.contains("9863_story_blocked_conflict") - && a.agent_name == "mergemaster" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - !mergemaster_spawned, - "mergemaster must not be spawned for a blocked story" - ); - } - - /// A story with mergemaster_attempted: true must NOT auto-spawn again, even - /// if the merge_failure still contains a content conflict. - #[tokio::test] - async fn auto_assign_does_not_respawn_mergemaster_when_already_attempted() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - crate::db::ensure_content_store(); - // Story 945: "mergemaster attempted" is now `Stage::MergeFailureFinal`. - crate::db::write_item_with_content( - "9862_story_attempted", - "merge_failure_final", - "CONFLICT (content): foo.rs", - crate::db::ItemMeta::named("Already tried"), - ); - - let pool = AgentPool::new_test(3001); - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - let mergemaster_spawned = agents.iter().any(|(key, a)| { - key.contains("9862_story_attempted") - && a.agent_name == "mergemaster" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - !mergemaster_spawned, - "mergemaster must not re-spawn when mergemaster_attempted is true" - ); - } - - // ── Story 920: transient vs genuine mergemaster termination ────────────── - - /// AC4 (transient): a mergemaster that was killed transiently (no - /// report_merge_failure, spawn count below cap) must be re-spawned by the - /// next auto-assign pass — `mergemaster_attempted` stays false. - #[tokio::test] - async fn transient_mergemaster_exit_allows_respawn() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "920_story_transient", - "4_merge_failure", - "CONFLICT (content): foo.rs", - crate::db::ItemMeta::named("Transient"), - ); - // After master c228ae16, has_content_conflict_failure reads from - // {story_id}:gate_output (not the story description), so seed it there. - crate::db::write_content( - crate::db::ContentKey::GateOutput("920_story_transient"), - "CONFLICT (content): foo.rs", - ); - // Simulate two previous transient exits (below cap of 3) recorded in DB. - crate::db::write_content( - crate::db::ContentKey::MergeMasterSpawnCount("920_story_transient"), - "2", - ); - - // mergemaster_attempted must still be false (transient exits don't set it). - let pool = AgentPool::new_test(3001); - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - let respawned = agents.iter().any(|(key, a)| { - key.contains("920_story_transient") - && a.agent_name == "mergemaster" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - respawned, - "mergemaster must re-spawn after transient terminations while below cap" - ); - } - - /// AC4 (genuine): after report_merge_failure, mergemaster_attempted is set - /// to true and auto-assign must not trigger another re-spawn. - #[tokio::test] - async fn genuine_mergemaster_exit_no_respawn() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - // Story 945: the genuine give-up state is now `Stage::MergeFailureFinal`. - crate::db::write_item_with_content( - "920_story_genuine", - "merge_failure_final", - "CONFLICT (content): bar.rs", - crate::db::ItemMeta::named("Genuine"), - ); - - let pool = AgentPool::new_test(3001); - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - let spawned = agents.iter().any(|(key, a)| { - key.contains("920_story_genuine") - && a.agent_name == "mergemaster" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - !spawned, - "mergemaster must not re-spawn after genuine give-up (mergemaster_attempted=true)" - ); - } - /// Two concurrent auto_assign_available_work calls must not assign the same /// agent to two stories simultaneously. After both complete, at most one /// Pending/Running entry must exist per agent name. @@ -814,159 +568,4 @@ mod tests { found {active_coder_count} active entries" ); } - - // ── Story 958: MergeFailure transition fires auto-assign via watcher bridge ─ - - /// Regression: before story 958, the auto-assign subscriber filtered events - /// with `is_active()`, which returned false for `MergeFailure`. This meant - /// a CRDT `MergeFailure` transition never triggered auto-assign, and - /// mergemaster was never auto-spawned on content conflicts. - /// - /// After story 958, the subscriber fires on EVERY WorkItem event. This - /// test verifies the end-to-end path: a WorkItem event with stage - /// `merge_failure` arriving on the watcher channel causes - /// `auto_assign_available_work` to run, which then auto-spawns mergemaster. - #[tokio::test] - async fn merge_failure_watcher_event_triggers_mergemaster_spawn() { - use std::sync::Arc; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path().to_path_buf(); - let sk = root.join(".huskies"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - // The spawn path calls `git worktree add` — the tempdir must be a real - // git repo with at least one commit or it fails with "not a git repo". - for args in [ - &["init"][..], - &["config", "user.email", "test@test.com"], - &["config", "user.name", "Test"], - &["commit", "--allow-empty", "-m", "init"], - ] { - std::process::Command::new("git") - .args(args) - .current_dir(&root) - .output() - .unwrap(); - } - - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "958_regression_conflict", - "4_merge_failure", - "CONFLICT (content): server/src/lib.rs", - crate::db::ItemMeta::named("Regression"), - ); - crate::db::write_content( - crate::db::ContentKey::GateOutput("958_regression_conflict"), - "CONFLICT (content): server/src/lib.rs", - ); - - let (watcher_tx, _) = broadcast::channel::(16); - let pool = Arc::new(AgentPool::new(3102, watcher_tx.clone())); - - crate::startup::tick_loop::spawn_event_bridges( - watcher_tx.clone(), - Some(root.clone()), - Arc::clone(&pool), - ); - - // Simulate the CRDT bridge forwarding a merge_failure stage transition. - let _ = watcher_tx.send(crate::io::watcher::WatcherEvent::WorkItem { - stage: "merge_failure".to_string(), - item_id: "958_regression_conflict".to_string(), - action: "update".to_string(), - commit_msg: "huskies: update 958_regression_conflict".to_string(), - from_stage: Some("merge".to_string()), - }); - - // Allow the subscriber task to run auto_assign_available_work. - tokio::task::yield_now().await; - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - - let agents = pool.agents.lock().unwrap(); - let mergemaster_spawned = agents.iter().any(|(key, a)| { - key.contains("958_regression_conflict") - && a.agent_name == "mergemaster" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - mergemaster_spawned, - "mergemaster must be auto-spawned when a merge_failure event fires \ - through the watcher bridge (story 958 regression)" - ); - } - - /// AC5 (story 982): a merge failure with content conflicts — seeded via the - /// typed `transition_to_merge_failure(ConflictDetected)` path without any - /// direct content-store or MergeJob writes in the test — produces - /// `Stage::MergeFailure { kind: ConflictDetected(_), .. }` and - /// auto-spawn-mergemaster fires within one `auto_assign_available_work` call. - #[tokio::test] - async fn auto_spawn_mergemaster_for_conflict_detected_kind_without_content_store_writes() { - let tmp = tempfile::tempdir().unwrap(); - let sk = tmp.path().join(".huskies"); - std::fs::create_dir_all(&sk).unwrap(); - std::fs::write( - sk.join("project.toml"), - "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", - ) - .unwrap(); - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - - let story_id = "982_ac5_conflict_auto_spawn"; - // Seed at Merge stage so the transition is valid. - crate::db::write_item_with_content( - story_id, - "4_merge", - "---\nname: AC5 auto-spawn test\n---\n", - crate::db::ItemMeta::named("AC5 auto-spawn test"), - ); - // Transition to MergeFailure(ConflictDetected) via lifecycle — no direct - // content-store writes in this test body. - crate::agents::lifecycle::transition_to_merge_failure( - story_id, - crate::pipeline_state::MergeFailureKind::ConflictDetected(Some( - "CONFLICT (content): server/src/lib.rs".to_string(), - )), - ) - .expect("transition to MergeFailure(ConflictDetected) should succeed"); - - // Verify the stage kind before triggering auto-assign. - let item = crate::pipeline_state::read_typed(story_id) - .unwrap() - .unwrap(); - assert!( - matches!( - item.stage, - crate::pipeline_state::Stage::MergeFailure { - kind: crate::pipeline_state::MergeFailureKind::ConflictDetected(_), - .. - } - ), - "stage must be MergeFailure(ConflictDetected) before auto-assign: {:?}", - item.stage - ); - - // One auto-assign cycle should spawn mergemaster. - let pool = AgentPool::new_test(3001); - pool.auto_assign_available_work(tmp.path()).await; - - let agents = pool.agents.lock().unwrap(); - let mergemaster_spawned = agents.iter().any(|(key, a)| { - key.contains(story_id) - && a.agent_name == "mergemaster" - && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) - }); - assert!( - mergemaster_spawned, - "mergemaster must be auto-spawned for ConflictDetected kind in one auto-assign cycle" - ); - } } diff --git a/server/src/agents/pool/auto_assign/merge.rs b/server/src/agents/pool/auto_assign/merge.rs index c0f61753..6091b199 100644 --- a/server/src/agents/pool/auto_assign/merge.rs +++ b/server/src/agents/pool/auto_assign/merge.rs @@ -1,4 +1,4 @@ -//! Merge stage dispatch: trigger server-side merges and auto-spawn mergemaster for content conflicts. +//! Merge stage dispatch: trigger server-side squash-merges for stories in `Stage::Merge`. use std::num::NonZeroU32; use std::path::Path; @@ -12,20 +12,19 @@ use crate::worktree; use super::super::super::PipelineStage; use super::super::AgentPool; -use super::scan::{find_free_agent_for_stage, is_story_assigned_for_stage, scan_stage_items}; +use super::scan::{is_story_assigned_for_stage, scan_stage_items}; use super::story_checks::{ - has_content_conflict_failure, has_mergemaster_attempted, has_review_hold, - has_unmet_dependencies, is_story_blocked, is_story_frozen, + has_review_hold, has_unmet_dependencies, is_story_blocked, is_story_frozen, }; impl AgentPool { - /// Process stories in `4_merge/`: trigger server-side squash-merges and auto-spawn - /// a mergemaster agent when a content-conflict failure is detected. + /// Process stories in `4_merge/`: trigger server-side squash-merges. /// - /// Stories with a recorded merge failure may be eligible for automatic mergemaster - /// dispatch when the failure is a content conflict — otherwise they need human - /// intervention. Each eligible story without an active merge job triggers - /// `trigger_server_side_merge`. + /// Each eligible story without an active merge job triggers + /// `trigger_server_side_merge`. Mergemaster auto-spawn for + /// `Stage::MergeFailure` stories is handled by the + /// [`merge_failure_subscriber`][super::merge_failure_subscriber] — this + /// function no longer scans the `merge_failure` stage. pub(super) async fn assign_merge_stage(&self, project_root: &Path, config: &ProjectConfig) { // ── 4_merge: deterministic server-side merge (no LLM agent) ────────── // @@ -113,59 +112,5 @@ impl AgentPool { slog!("[auto-assign] Triggering server-side merge for '{story_id}' in 4_merge/"); self.trigger_server_side_merge(project_root, story_id); } - - // ── 4_merge_failure: auto-spawn mergemaster on content conflict ─────── - // - // Stories transition to 4_merge_failure when the server-side merge fails. - // Content conflicts get one automatic mergemaster attempt; other failures - // require human intervention. - let merge_failure_stage = Stage::MergeFailure { - kind: crate::pipeline_state::MergeFailureKind::Other(String::new()), - feature_branch: BranchName(String::new()), - commits_ahead: NonZeroU32::new(1).expect("1 is non-zero"), - }; - let merge_failure_items = scan_stage_items(&merge_failure_stage); - for story_id in &merge_failure_items { - if has_content_conflict_failure(story_id) && !has_mergemaster_attempted(story_id) { - let mergemaster_agent = { - let agents = match self.agents.lock() { - Ok(a) => a, - Err(e) => { - slog_error!( - "[auto-assign] Failed to lock agents for mergemaster check: {e}" - ); - continue; - } - }; - if is_story_assigned_for_stage( - config, - &agents, - story_id, - &PipelineStage::Mergemaster, - ) { - None - } else { - find_free_agent_for_stage(config, &agents, &PipelineStage::Mergemaster) - .map(str::to_string) - } - }; - - if let Some(agent_name) = mergemaster_agent { - slog!( - "[auto-assign] Content conflict on '{story_id}'; \ - auto-spawning mergemaster '{agent_name}'." - ); - if let Err(e) = self - .start_agent(project_root, story_id, Some(&agent_name), None, None) - .await - { - slog!( - "[auto-assign] Failed to start mergemaster '{agent_name}' \ - for '{story_id}': {e}" - ); - } - } - } - } } } diff --git a/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs new file mode 100644 index 00000000..d57bafd9 --- /dev/null +++ b/server/src/agents/pool/auto_assign/merge_failure_subscriber.rs @@ -0,0 +1,368 @@ +//! TransitionFired subscriber that auto-spawns mergemaster on ConflictDetected merge failures. +//! +//! Listens on the pipeline transition broadcast channel and schedules a +//! mergemaster agent whenever a story enters +//! `Stage::MergeFailure { kind: ConflictDetected(_), .. }`. +//! Other [`MergeFailureKind`] variants require human intervention and are +//! intentionally ignored here. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use crate::pipeline_state::{MergeFailureKind, Stage}; +use crate::slog; +use crate::slog_warn; + +use super::super::super::PipelineStage; +use super::super::AgentPool; +use super::scan::{find_free_agent_for_stage, is_story_assigned_for_stage}; + +/// Spawn a background task that auto-spawns mergemaster agents on +/// `Stage::MergeFailure { kind: ConflictDetected(_) }` transitions. +/// +/// The task subscribes to the pipeline transition broadcast channel and calls +/// [`AgentPool::start_agent`] with the first free mergemaster agent whenever a +/// story transitions into a recoverable conflict state. All other +/// [`MergeFailureKind`] variants are silently skipped — they need a human. +pub(crate) fn spawn_merge_failure_subscriber(pool: Arc, project_root: PathBuf) { + let mut rx = crate::pipeline_state::subscribe_transitions(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(fired) => { + on_merge_failure_transition(&pool, &project_root, &fired).await; + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog_warn!( + "[merge-failure-sub] Subscriber lagged, skipped {n} event(s). \ + ConflictDetected stories may need manual mergemaster spawn." + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + +async fn on_merge_failure_transition( + pool: &AgentPool, + project_root: &Path, + fired: &crate::pipeline_state::TransitionFired, +) { + let Stage::MergeFailure { ref kind, .. } = fired.after else { + return; + }; + + let story_id = &fired.story_id.0; + + match kind { + MergeFailureKind::ConflictDetected(_) => { + let config = match crate::config::ProjectConfig::load(project_root) { + Ok(c) => c, + Err(e) => { + slog_warn!("[merge-failure-sub] Failed to load config for '{story_id}': {e}"); + return; + } + }; + + let agent_name = { + let agents = match pool.agents.lock() { + Ok(a) => a, + Err(e) => { + slog_warn!( + "[merge-failure-sub] Failed to lock agent pool for '{story_id}': {e}" + ); + return; + } + }; + if is_story_assigned_for_stage( + &config, + &agents, + story_id, + &PipelineStage::Mergemaster, + ) { + return; // mergemaster already running for this story + } + find_free_agent_for_stage(&config, &agents, &PipelineStage::Mergemaster) + .map(str::to_string) + }; + + if let Some(agent) = agent_name { + slog!( + "[merge-failure-sub] ConflictDetected on '{story_id}'; \ + auto-spawning mergemaster '{agent}'." + ); + if let Err(e) = pool + .start_agent(project_root, story_id, Some(&agent), None, None) + .await + { + slog!("[merge-failure-sub] Failed to spawn '{agent}' for '{story_id}': {e}"); + } + } else { + slog!( + "[merge-failure-sub] ConflictDetected on '{story_id}'; \ + no free mergemaster agent available." + ); + } + } + // GatesFailed, EmptyDiff, NoCommits, Other — all require human intervention. + MergeFailureKind::GatesFailed(_) + | MergeFailureKind::EmptyDiff + | MergeFailureKind::NoCommits + | MergeFailureKind::Other(_) => {} + } +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use crate::agents::{AgentPool, AgentStatus}; + use crate::io::watcher::WatcherEvent; + use std::sync::Arc; + use tokio::sync::broadcast; + + fn setup_project(tmp: &tempfile::TempDir) { + let sk = tmp.path().join(".huskies"); + std::fs::create_dir_all(&sk).unwrap(); + std::fs::write( + sk.join("project.toml"), + "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + } + + fn seed_at_merge(story_id: &str) { + crate::crdt_state::init_for_test(); + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + story_id, + "4_merge", + "---\nname: Test\n---\n", + crate::db::ItemMeta::named("Test"), + ); + } + + fn make_pool(port: u16) -> Arc { + let (tx, _) = broadcast::channel::(4); + Arc::new(AgentPool::new(port, tx)) + } + + fn make_fired( + story_id: &str, + kind: MergeFailureKind, + ) -> crate::pipeline_state::TransitionFired { + use crate::pipeline_state::{BranchName, PipelineEvent, StoryId, TransitionFired}; + use std::num::NonZeroU32; + TransitionFired { + story_id: StoryId(story_id.to_string()), + before: crate::pipeline_state::Stage::Merge { + feature_branch: BranchName("feature/test".to_string()), + commits_ahead: NonZeroU32::new(1).unwrap(), + }, + after: crate::pipeline_state::Stage::MergeFailure { + kind: kind.clone(), + feature_branch: BranchName("feature/test".to_string()), + commits_ahead: NonZeroU32::new(1).unwrap(), + }, + event: PipelineEvent::MergeFailed { kind }, + at: chrono::Utc::now(), + } + } + + // ── AC4: each MergeFailureKind variant ────────────────────────────────── + + /// ConflictDetected → on_merge_failure_transition must spawn mergemaster. + /// + /// Calls the handler directly (not via the broadcast subscriber) to avoid + /// cross-test channel contamination from the global TRANSITION_TX. + #[tokio::test] + async fn conflict_detected_spawns_mergemaster_via_subscriber() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "998_sub_conflict"; + seed_at_merge(story_id); + + let pool = make_pool(3998); + let fired = make_fired( + story_id, + MergeFailureKind::ConflictDetected(Some("CONFLICT (content): src/lib.rs".to_string())), + ); + on_merge_failure_transition(&pool, tmp.path(), &fired).await; + + let agents = pool.agents.lock().unwrap(); + assert!( + agents.iter().any(|(key, a)| { + key.contains(story_id) + && a.agent_name == "mergemaster" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }), + "mergemaster must be spawned for ConflictDetected" + ); + } + + /// GatesFailed → subscriber must NOT spawn mergemaster (human intervention needed). + #[tokio::test] + async fn gates_failed_does_not_spawn_mergemaster() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "998_sub_gates"; + seed_at_merge(story_id); + + let pool = make_pool(3997); + spawn_merge_failure_subscriber(Arc::clone(&pool), tmp.path().to_path_buf()); + + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::GatesFailed("error[E0308]: mismatched types".to_string()), + ) + .expect("transition must succeed"); + + // Give the subscriber time to run (it should do nothing). + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let agents = pool.agents.lock().unwrap(); + let spawned = agents.iter().any(|(key, a)| { + key.contains(story_id) + && a.agent_name == "mergemaster" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!(!spawned, "mergemaster must NOT be spawned for GatesFailed"); + } + + /// EmptyDiff → subscriber must NOT spawn mergemaster. + #[tokio::test] + async fn empty_diff_does_not_spawn_mergemaster() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "998_sub_emptydiff"; + seed_at_merge(story_id); + + let pool = make_pool(3996); + spawn_merge_failure_subscriber(Arc::clone(&pool), tmp.path().to_path_buf()); + + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::EmptyDiff, + ) + .expect("transition must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let agents = pool.agents.lock().unwrap(); + let spawned = agents.iter().any(|(key, a)| { + key.contains(story_id) + && a.agent_name == "mergemaster" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!(!spawned, "mergemaster must NOT be spawned for EmptyDiff"); + } + + /// NoCommits → subscriber must NOT spawn mergemaster. + #[tokio::test] + async fn no_commits_does_not_spawn_mergemaster() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "998_sub_nocommits"; + seed_at_merge(story_id); + + let pool = make_pool(3995); + spawn_merge_failure_subscriber(Arc::clone(&pool), tmp.path().to_path_buf()); + + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::NoCommits, + ) + .expect("transition must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let agents = pool.agents.lock().unwrap(); + let spawned = agents.iter().any(|(key, a)| { + key.contains(story_id) + && a.agent_name == "mergemaster" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!(!spawned, "mergemaster must NOT be spawned for NoCommits"); + } + + /// Other(_) → subscriber must NOT spawn mergemaster. + #[tokio::test] + async fn other_does_not_spawn_mergemaster() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "998_sub_other"; + seed_at_merge(story_id); + + let pool = make_pool(3994); + spawn_merge_failure_subscriber(Arc::clone(&pool), tmp.path().to_path_buf()); + + crate::agents::lifecycle::transition_to_merge_failure( + story_id, + MergeFailureKind::Other("unknown error".to_string()), + ) + .expect("transition must succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let agents = pool.agents.lock().unwrap(); + let spawned = agents.iter().any(|(key, a)| { + key.contains(story_id) + && a.agent_name == "mergemaster" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }); + assert!(!spawned, "mergemaster must NOT be spawned for Other"); + } + + /// ConflictDetected self-loop — handler must NOT spawn a second mergemaster + /// when one is already Pending/Running for the story. + /// + /// Calls the handler twice directly (no broadcast subscriber) so there is no + /// timing window: the first call sets the agent to Pending synchronously, + /// and the second call sees that Pending entry and returns early. + #[tokio::test] + async fn conflict_detected_self_loop_does_not_double_spawn() { + let tmp = tempfile::tempdir().unwrap(); + setup_project(&tmp); + let story_id = "998_sub_selfloop"; + seed_at_merge(story_id); + + let pool = make_pool(3993); + let fired = make_fired( + story_id, + MergeFailureKind::ConflictDetected(Some("CONFLICT".to_string())), + ); + + // First call — spawns mergemaster (agent enters Pending). + on_merge_failure_transition(&pool, tmp.path(), &fired).await; + { + let agents = pool.agents.lock().unwrap(); + assert!( + agents.iter().any(|(key, a)| { + key.contains(story_id) + && a.agent_name == "mergemaster" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }), + "mergemaster must be Pending after first ConflictDetected" + ); + } + + // Second call (self-loop) — agent is still Pending; guard must prevent double-spawn. + on_merge_failure_transition(&pool, tmp.path(), &fired).await; + + let agents = pool.agents.lock().unwrap(); + let active_count = agents + .iter() + .filter(|(key, a)| { + key.contains(story_id) + && a.agent_name == "mergemaster" + && matches!(a.status, AgentStatus::Pending | AgentStatus::Running) + }) + .count(); + assert_eq!( + active_count, 1, + "mergemaster must not be double-spawned on ConflictDetected self-loop" + ); + } +} diff --git a/server/src/agents/pool/auto_assign/mod.rs b/server/src/agents/pool/auto_assign/mod.rs index b2f95c31..a96a0e42 100644 --- a/server/src/agents/pool/auto_assign/mod.rs +++ b/server/src/agents/pool/auto_assign/mod.rs @@ -4,6 +4,8 @@ mod auto_assign; mod backlog; mod merge; +/// TransitionFired subscriber that auto-spawns mergemaster on ConflictDetected merge failures. +pub(crate) mod merge_failure_subscriber; mod pipeline; mod reconcile; mod scan; @@ -13,3 +15,6 @@ pub(crate) mod watchdog; // Re-export items that were pub(super) in the original monolithic auto_assign.rs // so that pool::lifecycle and pool::pipeline continue to access them unchanged. pub(super) use scan::{find_free_agent_for_stage, is_agent_free}; + +/// Re-export for `startup::tick_loop`. +pub(crate) use merge_failure_subscriber::spawn_merge_failure_subscriber; diff --git a/server/src/agents/pool/auto_assign/story_checks.rs b/server/src/agents/pool/auto_assign/story_checks.rs index ed57afd8..49066e98 100644 --- a/server/src/agents/pool/auto_assign/story_checks.rs +++ b/server/src/agents/pool/auto_assign/story_checks.rs @@ -50,46 +50,6 @@ pub(super) fn is_story_blocked(story_id: &str) -> bool { .unwrap_or(false) } -/// Return `true` if the story's merge failure is a git content-conflict -/// (`Stage::MergeFailure { kind: ConflictDetected(_), .. }`). -/// -/// Used by the auto-assigner to decide whether to spawn mergemaster automatically. -/// The typed kind is carried by the CRDT projection layer (which reads -/// `ContentKey::GateOutput` on projection to reconstruct the kind on restart), -/// so no direct content-store access is needed here (story 982). -pub(super) fn has_content_conflict_failure(story_id: &str) -> bool { - crate::pipeline_state::read_typed(story_id) - .ok() - .flatten() - .map(|item| { - matches!( - item.stage, - crate::pipeline_state::Stage::MergeFailure { - kind: crate::pipeline_state::MergeFailureKind::ConflictDetected(_), - .. - } - ) - }) - .unwrap_or(false) -} - -/// Return `true` if the story is in `Stage::MergeFailureFinal`. -/// -/// Story 945: `Stage::MergeFailureFinal` is the single source of truth — -/// the legacy `mergemaster_attempted: bool` CRDT register has been deleted. -/// Used to prevent the auto-assigner from repeatedly spawning mergemaster for -/// the same story after a failed mergemaster session. -pub(super) fn has_mergemaster_attempted(story_id: &str) -> bool { - crate::crdt_state::read_item(story_id) - .map(|view| { - matches!( - view.stage(), - crate::pipeline_state::Stage::MergeFailureFinal { .. } - ) - }) - .unwrap_or(false) -} - /// Return `true` if the story has any `depends_on` entries that are not yet in /// `5_done` or `6_archived`. Reads dependency state from the CRDT (story 929). pub(super) fn has_unmet_dependencies(story_id: &str) -> bool { @@ -345,81 +305,4 @@ mod tests { let archived_deps = check_archived_dependencies("503_story_waiting"); assert!(archived_deps.is_empty()); } - - // ── Story 982: typed MergeFailureKind — has_content_conflict_failure ────── - - /// AC2 (story 982): `has_content_conflict_failure` returns `true` when the - /// story is in `Stage::MergeFailure { kind: ConflictDetected(_), .. }`. - /// The test seeds the stage via `transition_to_merge_failure` (no direct - /// content-store or MergeJob writes in the test body). - #[test] - fn has_content_conflict_failure_true_for_conflict_detected_kind() { - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - let story_id = "982_ac2_conflict_detected"; - // Seed at Merge stage so the transition is valid. - crate::db::write_item_with_content( - story_id, - "4_merge", - "---\nname: AC2 conflict test\n---\n", - crate::db::ItemMeta::named("AC2 conflict test"), - ); - // Transition via the lifecycle helper — internally writes ContentKey::GateOutput - // so the CRDT projection can reconstruct the kind; no content-store writes here. - crate::agents::lifecycle::transition_to_merge_failure( - story_id, - crate::pipeline_state::MergeFailureKind::ConflictDetected(Some( - "CONFLICT (content): server/src/lib.rs".to_string(), - )), - ) - .expect("transition should succeed"); - - // The typed match now drives the predicate — no substring scan. - assert!( - has_content_conflict_failure(story_id), - "has_content_conflict_failure must be true for ConflictDetected kind" - ); - // Verify the projected stage carries the typed kind. - let item = crate::pipeline_state::read_typed(story_id) - .unwrap() - .unwrap(); - assert!( - matches!( - item.stage, - crate::pipeline_state::Stage::MergeFailure { - kind: crate::pipeline_state::MergeFailureKind::ConflictDetected(_), - .. - } - ), - "stage must be MergeFailure(ConflictDetected): {:?}", - item.stage - ); - } - - /// AC2 (story 982): `has_content_conflict_failure` returns `false` when the - /// kind is `GatesFailed` — no mergemaster spawn for gate-only failures. - #[test] - fn has_content_conflict_failure_false_for_gates_failed_kind() { - crate::crdt_state::init_for_test(); - crate::db::ensure_content_store(); - let story_id = "982_ac2_gates_failed"; - crate::db::write_item_with_content( - story_id, - "4_merge", - "---\nname: AC2 gates test\n---\n", - crate::db::ItemMeta::named("AC2 gates test"), - ); - crate::agents::lifecycle::transition_to_merge_failure( - story_id, - crate::pipeline_state::MergeFailureKind::GatesFailed( - "error[clippy::unused_variable]".to_string(), - ), - ) - .expect("transition should succeed"); - - assert!( - !has_content_conflict_failure(story_id), - "has_content_conflict_failure must be false for GatesFailed kind" - ); - } } diff --git a/server/src/pipeline_state/apply.rs b/server/src/pipeline_state/apply.rs index c39cb7da..b5f25519 100644 --- a/server/src/pipeline_state/apply.rs +++ b/server/src/pipeline_state/apply.rs @@ -99,6 +99,7 @@ pub fn apply_transition( stage_label(&fired.after), ); + super::events::try_broadcast(&fired); Ok(fired) } diff --git a/server/src/pipeline_state/events.rs b/server/src/pipeline_state/events.rs index a13cf096..4d2fec34 100644 --- a/server/src/pipeline_state/events.rs +++ b/server/src/pipeline_state/events.rs @@ -1,9 +1,40 @@ //! Event bus for pipeline state transitions. -#![allow(unused_imports, dead_code)] use chrono::{DateTime, Utc}; +use std::sync::OnceLock; +use tokio::sync::broadcast; -use super::{BranchName, PipelineEvent, Stage, StoryId}; +use super::{PipelineEvent, Stage, StoryId}; + +// ── Static transition broadcast channel ───────────────────────────────────── + +static TRANSITION_TX: OnceLock> = OnceLock::new(); + +fn get_or_init_tx() -> &'static broadcast::Sender { + TRANSITION_TX.get_or_init(|| { + let (tx, _) = broadcast::channel(256); + tx + }) +} + +/// Subscribe to all pipeline stage transitions. +/// +/// Every call to [`apply_transition`][super::apply_transition] broadcasts the +/// resulting [`TransitionFired`] on this channel. Returns a new receiver that +/// replays events from the moment of subscription. Lagged receivers silently +/// skip missed events — callers should handle +/// [`broadcast::error::RecvError::Lagged`]. +pub fn subscribe_transitions() -> broadcast::Receiver { + get_or_init_tx().subscribe() +} + +/// Broadcast `fired` to all active transition subscribers. +/// +/// Called from [`apply_transition`][super::apply] after writing the new stage +/// to the CRDT. No-ops safely when there are no subscribers. +pub(super) fn try_broadcast(fired: &TransitionFired) { + let _ = get_or_init_tx().send(fired.clone()); +} /// Fired when a pipeline stage transition completes. #[derive(Debug, Clone)] @@ -55,9 +86,9 @@ impl Default for EventBus { #[cfg(test)] mod tests { + use super::super::BranchName; use super::*; use std::num::NonZeroU32; - use std::sync::{Arc, Mutex}; fn nz(n: u32) -> NonZeroU32 { NonZeroU32::new(n).unwrap() diff --git a/server/src/pipeline_state/mod.rs b/server/src/pipeline_state/mod.rs index 65636a56..ad94cdb4 100644 --- a/server/src/pipeline_state/mod.rs +++ b/server/src/pipeline_state/mod.rs @@ -50,7 +50,7 @@ pub use transition::{ }; #[allow(unused_imports)] -pub use events::{EventBus, TransitionFired, TransitionSubscriber}; +pub use events::{EventBus, TransitionFired, TransitionSubscriber, subscribe_transitions}; #[allow(unused_imports)] pub use projection::ProjectionError; diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 3410fc28..f3b18647 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -66,6 +66,15 @@ pub(crate) fn spawn_event_bridges( // ensures that MergeFailure and other non-"active" stages are covered // without any per-stage special-casing. if let Some(root) = project_root { + // Mergemaster auto-spawn subscriber: reacts to TransitionFired events for + // Stage::MergeFailure { kind: ConflictDetected } and spawns mergemaster + // directly from the typed event, eliminating the predicate-mismatch + // failure mode of the previous scan-loop approach (story 998). + crate::agents::pool::auto_assign::spawn_merge_failure_subscriber( + Arc::clone(&agents), + root.clone(), + ); + let watcher_auto_rx = watcher_tx.subscribe(); let watcher_auto_agents = Arc::clone(&agents); tokio::spawn(async move {