From cfccc2e73c06ad5d78796b8881a6c4a38f997e1d Mon Sep 17 00:00:00 2001 From: dave Date: Thu, 14 May 2026 14:48:55 +0000 Subject: [PATCH] huskies: merge 1044 --- server/src/agents/pool/start/spawn.rs | 224 ++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) diff --git a/server/src/agents/pool/start/spawn.rs b/server/src/agents/pool/start/spawn.rs index 07bec664..74c1d0ad 100644 --- a/server/src/agents/pool/start/spawn.rs +++ b/server/src/agents/pool/start/spawn.rs @@ -647,6 +647,53 @@ pub(super) async fn run_agent_spawn( }); AgentPool::notify_agent_state_changed(&watcher_tx_clone); // Do NOT send WorkItem/reassign — story is already Done. + // Drain one queued ConflictDetected story now that this + // mergemaster slot is free (story 1044). + if let Some((candidate_id, candidate_agent)) = + crate::config::ProjectConfig::load(&project_root_clone) + .ok() + .and_then(|cfg| { + agents_ref.lock().ok().as_ref().and_then(|agts| { + pick_queued_conflict_detected(&cfg, agts, &sid) + }) + }) + { + slog!( + "[agents] Mergemaster exit for '{sid}' (success): \ + queued ConflictDetected story '{candidate_id}' found; \ + spawning '{candidate_agent}'." + ); + let agents_for_cd = Arc::clone(&agents_ref); + let watcher_for_cd = watcher_tx_clone.clone(); + let root_for_cd = project_root_clone.clone(); + let port_for_cd = port_for_task; + tokio::spawn(async move { + let pool = AgentPool { + agents: agents_for_cd, + port: port_for_cd, + child_killers: Arc::new(Mutex::new(HashMap::new())), + watcher_tx: watcher_for_cd, + status_broadcaster: Arc::new( + crate::service::status::StatusBroadcaster::new(), + ), + }; + if let Err(e) = pool + .start_agent( + &root_for_cd, + &candidate_id, + Some(&candidate_agent), + None, + None, + ) + .await + { + slog_error!( + "[agents] Failed to spawn '{candidate_agent}' for queued \ + ConflictDetected story '{candidate_id}': {e}" + ); + } + }); + } return; } // Clear any stale Running merge job so the next mergemaster @@ -719,6 +766,54 @@ pub(super) async fn run_agent_spawn( commit_msg: String::new(), from_stage: None, }); + // Drain one queued ConflictDetected story now that this + // mergemaster slot is free (story 1044). + if let Some((candidate_id, candidate_agent)) = + crate::config::ProjectConfig::load(&project_root_clone) + .ok() + .and_then(|cfg| { + agents_ref + .lock() + .ok() + .as_ref() + .and_then(|agts| pick_queued_conflict_detected(&cfg, agts, &sid)) + }) + { + slog!( + "[agents] Mergemaster exit for '{sid}': queued ConflictDetected \ + story '{candidate_id}' found; spawning '{candidate_agent}'." + ); + let agents_for_cd = Arc::clone(&agents_ref); + let watcher_for_cd = watcher_tx_clone.clone(); + let root_for_cd = project_root_clone.clone(); + let port_for_cd = port_for_task; + tokio::spawn(async move { + let pool = AgentPool { + agents: agents_for_cd, + port: port_for_cd, + child_killers: Arc::new(Mutex::new(HashMap::new())), + watcher_tx: watcher_for_cd, + status_broadcaster: Arc::new( + crate::service::status::StatusBroadcaster::new(), + ), + }; + if let Err(e) = pool + .start_agent( + &root_for_cd, + &candidate_id, + Some(&candidate_agent), + None, + None, + ) + .await + { + slog_error!( + "[agents] Failed to spawn '{candidate_agent}' for queued \ + ConflictDetected story '{candidate_id}': {e}" + ); + } + }); + } } else { // Server-owned completion: run acceptance gates automatically // when the agent process exits normally. @@ -755,6 +850,63 @@ pub(super) async fn run_agent_spawn( } } +/// Find the first story in `Stage::MergeFailure { kind: ConflictDetected }` that +/// has no active mergemaster in the pool (excluding `exclude_story_id`), together +/// with a free mergemaster agent name. Returns `None` if no eligible story exists +/// or if all configured mergemaster agents are currently busy. +/// +/// Called from the mergemaster exit handler to drain the queue of waiting +/// ConflictDetected stories one slot at a time (story 1044). +fn pick_queued_conflict_detected( + config: &ProjectConfig, + agents: &HashMap, + exclude_story_id: &str, +) -> Option<(String, String)> { + use crate::pipeline_state::{MergeFailureKind, Stage}; + + // Find a free mergemaster agent first; bail early if the pool is saturated. + let agent_name = config + .agent + .iter() + .find(|ac| { + agent_config_stage(ac) == PipelineStage::Mergemaster + && !agents.values().any(|a| { + a.agent_name == ac.name + && matches!(a.status, AgentStatus::Running | AgentStatus::Pending) + }) + })? + .name + .clone(); + + // Find the first eligible ConflictDetected story with no active mergemaster. + for item in crate::pipeline_state::read_all_typed() { + if item.story_id.0 == exclude_story_id { + continue; + } + let Stage::MergeFailure { + kind: MergeFailureKind::ConflictDetected(_), + .. + } = &item.stage + else { + continue; + }; + let has_mergemaster = agents.iter().any(|(key, agt)| { + let key_sid = key.rsplit_once(':').map(|(s, _)| s).unwrap_or(key.as_str()); + let agt_stage = config + .find_agent(&agt.agent_name) + .map(agent_config_stage) + .unwrap_or_else(|| pipeline_stage(&agt.agent_name)); + key_sid == item.story_id.0 + && agt_stage == PipelineStage::Mergemaster + && matches!(agt.status, AgentStatus::Running | AgentStatus::Pending) + }); + if !has_mergemaster { + return Some((item.story_id.0.clone(), agent_name)); + } + } + None +} + #[cfg(test)] mod tests { use super::*; @@ -865,6 +1017,78 @@ mod tests { ); } + /// AC4 (story 1044): three ConflictDetected stories queued with no active mergemaster; + /// `pick_queued_conflict_detected` must return exactly one story per call. + #[test] + fn mergemaster_exit_picks_up_one_queued_conflict_detected() { + use crate::agents::lifecycle::transition_to_merge_failure; + use crate::pipeline_state::MergeFailureKind; + + crate::crdt_state::init_for_test(); + crate::db::ensure_content_store(); + + let story_ids = ["1044_qcd_alpha", "1044_qcd_beta", "1044_qcd_gamma"]; + // Seed each story in 4_merge then transition to MergeFailure ConflictDetected. + for sid in &story_ids { + crate::db::write_item_with_content( + sid, + "4_merge", + "---\nname: Test\n---\n", + crate::db::ItemMeta::named("Test"), + ); + transition_to_merge_failure( + sid, + MergeFailureKind::ConflictDetected(Some( + "CONFLICT (content): src/lib.rs".to_string(), + )), + ) + .expect("transition to ConflictDetected must succeed"); + } + + let config = crate::config::ProjectConfig::parse( + "[[agent]]\nname = \"mergemaster\"\nstage = \"mergemaster\"\n", + ) + .unwrap(); + + // No active mergemaster in the pool. + let agents: HashMap = HashMap::new(); + let exclude = "9999_finished"; + + let result = pick_queued_conflict_detected(&config, &agents, exclude); + assert!( + result.is_some(), + "must find a queued ConflictDetected story when none have a mergemaster" + ); + let (chosen_id, chosen_agent) = result.unwrap(); + assert!( + story_ids.contains(&chosen_id.as_str()), + "chosen story must be one of the three queued stories; got: {chosen_id}" + ); + assert_eq!( + chosen_agent, "mergemaster", + "chosen agent must be the configured mergemaster" + ); + + // AC3: a second call (simulating the next mergemaster exit) must pick a + // different story. The mergemaster for `chosen_id` has now exited and + // freed its slot — agents map is empty again — so we exclude `chosen_id` + // and expect the scan to return one of the remaining two. + let result2 = pick_queued_conflict_detected(&config, &HashMap::new(), &chosen_id); + assert!( + result2.is_some(), + "second exit must find another queued story" + ); + let (chosen_id2, _) = result2.unwrap(); + assert_ne!( + chosen_id2, chosen_id, + "second pick must be a different story from the first" + ); + assert!( + story_ids.contains(&chosen_id2.as_str()), + "second chosen story must be one of the three; got: {chosen_id2}" + ); + } + /// AC3 (bug 882): simulates the abort-respawn counter mechanism to verify that /// retry_count is never bumped during consecutive aborted+no-session exits and /// that the abort counter reaches the cap (5) before blocking.