From cd9021fedf5c89a42c025d836e6941869eade670 Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 13 May 2026 21:37:07 +0000 Subject: [PATCH] huskies: merge 1006 --- server/src/agents/pool/mod.rs | 2 + server/src/agents/pool/start/spawn.rs | 75 +++-- server/src/agents/pool/worktree_lifecycle.rs | 302 +++++++++++++++++++ server/src/startup/tick_loop.rs | 26 +- server/src/worktree/mod.rs | 1 + 5 files changed, 361 insertions(+), 45 deletions(-) create mode 100644 server/src/agents/pool/worktree_lifecycle.rs diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index c897c302..4f326509 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -8,6 +8,8 @@ mod stop; mod types; mod wait; mod worktree; +/// Lifecycle subscribers that create and remove worktrees on pipeline transitions. +pub(crate) mod worktree_lifecycle; #[cfg(test)] mod test_helpers; diff --git a/server/src/agents/pool/start/spawn.rs b/server/src/agents/pool/start/spawn.rs index 57ccc2e3..e81d7690 100644 --- a/server/src/agents/pool/start/spawn.rs +++ b/server/src/agents/pool/start/spawn.rs @@ -163,35 +163,56 @@ pub(super) async fn run_agent_spawn( let watcher_tx_clone = watcher_tx; let _ = inactivity_timeout_secs; // currently unused inside the closure body - // Step 1: create the worktree (slow — git checkout, pnpm install, etc.) - let wt_info = match crate::worktree::create_worktree( - &project_root_clone, - &sid, - &config_clone, - port_for_task, - ) - .await - { - Ok(wt) => wt, - Err(e) => { - let error_msg = format!("Failed to create worktree: {e}"); - slog_error!("[agents] {error_msg}"); - let event = AgentEvent::Error { - story_id: sid.clone(), - agent_name: aname.clone(), - message: error_msg, - }; - if let Ok(mut log) = log_clone.lock() { - log.push(event.clone()); + // Step 1: wait for the worktree created by the worktree lifecycle subscriber. + // The Coding transition fires before this task is spawned; the subscriber + // creates the worktree asynchronously. Poll until it exists or the deadline. + // Tests use a short deadline (1 s) so the failure path exercises quickly; + // production uses 120 s to allow slow git operations to complete. + #[cfg(not(test))] + let worktree_wait_secs: u64 = 120; + #[cfg(test)] + let worktree_wait_secs: u64 = 1; + let wt_info = { + let wt_path = crate::worktree::worktree_path(&project_root_clone, &sid); + let branch = format!("feature/story-{sid}"); + let base_branch = config_clone + .base_branch + .clone() + .unwrap_or_else(|| crate::worktree::detect_base_branch(&project_root_clone)); + let deadline = + tokio::time::Instant::now() + std::time::Duration::from_secs(worktree_wait_secs); + loop { + if wt_path.exists() { + break crate::worktree::WorktreeInfo { + path: wt_path, + branch, + base_branch, + }; } - let _ = tx_clone.send(event); - if let Ok(mut agents) = agents_ref.lock() - && let Some(agent) = agents.get_mut(&key_clone) - { - agent.status = AgentStatus::Failed; + if tokio::time::Instant::now() >= deadline { + let error_msg = format!( + "Worktree for story '{sid}' did not appear within {worktree_wait_secs} s; \ + the lifecycle subscriber may have failed." + ); + slog_error!("[agents] {error_msg}"); + let event = AgentEvent::Error { + story_id: sid.clone(), + agent_name: aname.clone(), + message: error_msg, + }; + if let Ok(mut log) = log_clone.lock() { + log.push(event.clone()); + } + let _ = tx_clone.send(event); + if let Ok(mut agents) = agents_ref.lock() + && let Some(agent) = agents.get_mut(&key_clone) + { + agent.status = AgentStatus::Failed; + } + AgentPool::notify_agent_state_changed(&watcher_tx_clone); + return; } - AgentPool::notify_agent_state_changed(&watcher_tx_clone); - return; + tokio::time::sleep(std::time::Duration::from_millis(500)).await; } }; diff --git a/server/src/agents/pool/worktree_lifecycle.rs b/server/src/agents/pool/worktree_lifecycle.rs new file mode 100644 index 00000000..fa4236ae --- /dev/null +++ b/server/src/agents/pool/worktree_lifecycle.rs @@ -0,0 +1,302 @@ +//! TransitionFired subscribers for worktree and feature-branch lifecycle. +//! +//! `spawn_worktree_create_subscriber` creates worktrees when stories enter +//! `Stage::Coding`. `spawn_worktree_cleanup_subscriber` removes worktrees +//! when stories reach terminal stages (Done, Archived, Abandoned, Superseded). + +use std::path::{Path, PathBuf}; + +use crate::pipeline_state::Stage; +use crate::slog; +use crate::slog_warn; + +/// Spawn a background task that creates a git worktree when a story enters `Stage::Coding`. +/// +/// Subscribes to the pipeline transition broadcast channel. On each +/// `Stage::Coding` transition, creates the worktree and feature branch for the +/// story at `project_root` using `port` for the `.mcp.json` server URL. +/// The create is idempotent — if the worktree already exists it is reused. +pub(crate) fn spawn_worktree_create_subscriber(project_root: PathBuf, port: u16) { + let mut rx = crate::pipeline_state::subscribe_transitions(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(fired) => { + if matches!(fired.after, Stage::Coding) { + on_coding_transition(&project_root, port, &fired.story_id.0).await; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog_warn!( + "[worktree-create-sub] Subscriber lagged, skipped {n} event(s). \ + Some worktrees may need manual creation." + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + +/// Spawn a background task that removes git worktrees when stories reach terminal stages. +/// +/// Subscribes to the pipeline transition broadcast channel. On transitions into +/// `Stage::Done`, `Stage::Archived`, `Stage::Abandoned`, or `Stage::Superseded`, +/// removes the worktree and feature branch for the story. +/// Non-fatal if the worktree does not exist. +pub(crate) fn spawn_worktree_cleanup_subscriber(project_root: PathBuf) { + let mut rx = crate::pipeline_state::subscribe_transitions(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(fired) => { + if matches!( + fired.after, + Stage::Done { .. } + | Stage::Archived { .. } + | Stage::Abandoned { .. } + | Stage::Superseded { .. } + ) { + on_terminal_transition(&project_root, &fired.story_id.0).await; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + slog_warn!( + "[worktree-cleanup-sub] Subscriber lagged, skipped {n} event(s). \ + Some worktrees may need manual removal." + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + +/// Create the worktree and feature branch for `story_id` when it enters `Stage::Coding`. +pub(crate) async fn on_coding_transition(project_root: &Path, port: u16, story_id: &str) { + let config = match crate::config::ProjectConfig::load(project_root) { + Ok(c) => c, + Err(e) => { + slog_warn!("[worktree-create-sub] Failed to load config for '{story_id}': {e}"); + return; + } + }; + slog!("[worktree-create-sub] Story '{story_id}' entered Coding; ensuring worktree exists."); + match crate::worktree::create_worktree(project_root, story_id, &config, port).await { + Ok(info) => { + slog!( + "[worktree-create-sub] Worktree ready for '{story_id}' at {}", + info.path.display() + ); + if let Err(e) = crate::worktree::install_pre_commit_hook(&info.path) { + slog_warn!( + "[worktree-create-sub] Pre-commit hook install failed for '{story_id}': {e}" + ); + } + } + Err(e) => { + slog_warn!("[worktree-create-sub] Failed to create worktree for '{story_id}': {e}"); + } + } +} + +/// Remove the worktree and feature branch for `story_id` after it reaches a terminal stage. +pub(crate) async fn on_terminal_transition(project_root: &Path, story_id: &str) { + let config = match crate::config::ProjectConfig::load(project_root) { + Ok(c) => c, + Err(e) => { + slog_warn!("[worktree-cleanup-sub] Failed to load config for '{story_id}': {e}"); + return; + } + }; + slog!("[worktree-cleanup-sub] Story '{story_id}' reached terminal stage; removing worktree."); + match crate::worktree::remove_worktree_by_story_id(project_root, story_id, &config).await { + Ok(()) => slog!("[worktree-cleanup-sub] Worktree removed for '{story_id}'."), + Err(e) => { + // Non-fatal — worktree may not exist (story never had one, or already removed). + slog!("[worktree-cleanup-sub] Worktree removal for '{story_id}': {e}"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + use std::process::Command; + use tempfile::TempDir; + + fn init_git_repo(dir: &Path) { + Command::new("git") + .args(["init"]) + .current_dir(dir) + .output() + .expect("git init"); + Command::new("git") + .args(["commit", "--allow-empty", "-m", "init"]) + .current_dir(dir) + .output() + .expect("git commit"); + } + + fn setup_project(tmp: &TempDir) -> PathBuf { + let root = tmp.path().join("project"); + fs::create_dir_all(root.join(".huskies")).unwrap(); + std::fs::write(root.join(".huskies").join("project.toml"), "").unwrap(); + init_git_repo(&root); + root + } + + /// AC1: on_coding_transition creates the worktree and feature branch. + #[tokio::test] + async fn coding_transition_creates_worktree() { + let tmp = TempDir::new().unwrap(); + let root = setup_project(&tmp); + let story_id = "1006_test_create"; + + on_coding_transition(&root, 3001, story_id).await; + + let wt_path = crate::worktree::worktree_path(&root, story_id); + assert!( + wt_path.exists(), + "worktree directory must exist after Coding transition" + ); + assert!( + wt_path.join(".mcp.json").exists(), + ".mcp.json must be written in the worktree" + ); + // Verify the feature branch was created. + let branch_output = Command::new("git") + .args(["branch", "--list", "feature/story-1006_test_create"]) + .current_dir(&root) + .output() + .expect("git branch --list"); + assert!( + !String::from_utf8_lossy(&branch_output.stdout) + .trim() + .is_empty(), + "feature branch must exist after Coding transition" + ); + } + + /// AC1: calling on_coding_transition twice is idempotent. + #[tokio::test] + async fn coding_transition_is_idempotent() { + let tmp = TempDir::new().unwrap(); + let root = setup_project(&tmp); + let story_id = "1006_test_idempotent"; + + on_coding_transition(&root, 3001, story_id).await; + on_coding_transition(&root, 3001, story_id).await; + + let wt_path = crate::worktree::worktree_path(&root, story_id); + assert!( + wt_path.exists(), + "worktree must still exist after second Coding transition" + ); + } + + /// AC2: on_terminal_transition removes the worktree after it was created. + #[tokio::test] + async fn terminal_transition_removes_worktree() { + let tmp = TempDir::new().unwrap(); + let root = setup_project(&tmp); + let story_id = "1006_test_remove"; + + on_coding_transition(&root, 3001, story_id).await; + let wt_path = crate::worktree::worktree_path(&root, story_id); + assert!(wt_path.exists(), "worktree must exist before cleanup"); + + on_terminal_transition(&root, story_id).await; + assert!( + !wt_path.exists(), + "worktree must be removed after terminal transition" + ); + } + + /// AC2: on_terminal_transition is a no-op (non-fatal) when no worktree exists. + #[tokio::test] + async fn terminal_transition_noop_when_no_worktree() { + let tmp = TempDir::new().unwrap(); + let root = setup_project(&tmp); + + // Should not panic or error — just log and return. + on_terminal_transition(&root, "1006_test_no_wt").await; + } + + /// AC1+AC4: spawn_worktree_create_subscriber reacts to a real Coding transition. + #[tokio::test] + async fn create_subscriber_reacts_to_coding_transition() { + crate::crdt_state::init_for_test(); + crate::db::ensure_content_store(); + + let tmp = TempDir::new().unwrap(); + let root = setup_project(&tmp); + let story_id = "1006_test_sub_create"; + + crate::db::write_item_with_content( + story_id, + "1_backlog", + "---\nname: Test\n---\n", + crate::db::ItemMeta::named("Test"), + ); + + spawn_worktree_create_subscriber(root.clone(), 3001); + + // Trigger the Coding transition. + crate::agents::lifecycle::move_story_to_current(story_id) + .expect("move to current must succeed"); + + // Give the subscriber task time to run and create the worktree. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + let wt_path = crate::worktree::worktree_path(&root, story_id); + assert!( + wt_path.exists(), + "worktree must exist after Coding transition via subscriber" + ); + } + + /// AC2+AC4: spawn_worktree_cleanup_subscriber reacts to terminal transitions. + #[tokio::test] + async fn cleanup_subscriber_reacts_to_terminal_transition() { + crate::crdt_state::init_for_test(); + crate::db::ensure_content_store(); + + let tmp = TempDir::new().unwrap(); + let root = setup_project(&tmp); + let story_id = "1006_test_sub_cleanup"; + + crate::db::write_item_with_content( + story_id, + "2_current", + "---\nname: Test\n---\n", + crate::db::ItemMeta::named("Test"), + ); + + // Create the worktree manually so the cleanup subscriber has something to remove. + let config = crate::config::ProjectConfig::load(&root).unwrap(); + crate::worktree::create_worktree(&root, story_id, &config, 3001) + .await + .expect("create worktree must succeed"); + + let wt_path = crate::worktree::worktree_path(&root, story_id); + assert!( + wt_path.exists(), + "worktree must exist before subscriber test" + ); + + spawn_worktree_cleanup_subscriber(root.clone()); + + // Trigger a terminal transition (Abandoned). + crate::agents::lifecycle::abandon_story(story_id).expect("abandon must succeed"); + + // Give the subscriber time to process and remove the worktree. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + assert!( + !wt_path.exists(), + "worktree must be removed after terminal transition via subscriber" + ); + } +} diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index f3b18647..96f1ae28 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -24,27 +24,9 @@ pub(crate) fn spawn_event_bridges( // filesystem watcher or from a CRDT sync peer. { let crdt_watcher_tx = watcher_tx.clone(); - let crdt_prune_root = project_root.clone(); if let Some(mut crdt_rx) = crate::crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { - if matches!( - evt.to_stage, - crate::pipeline_state::Stage::Archived { .. } - | crate::pipeline_state::Stage::Abandoned { .. } - | crate::pipeline_state::Stage::Superseded { .. } - | crate::pipeline_state::Stage::Rejected { .. } - ) && let Some(root) = crdt_prune_root.as_ref().cloned() - { - let story_id = evt.story_id.clone(); - tokio::spawn(async move { - let config = - crate::config::ProjectConfig::load(&root).unwrap_or_default(); - crate::worktree::remove_worktree_by_story_id(&root, &story_id, &config) - .await - .ok(); - }); - } let (action, commit_msg) = io::watcher::stage_metadata(&evt.to_stage, &evt.story_id); let watcher_evt = io::watcher::WatcherEvent::WorkItem { @@ -66,6 +48,14 @@ pub(crate) fn spawn_event_bridges( // ensures that MergeFailure and other non-"active" stages are covered // without any per-stage special-casing. if let Some(root) = project_root { + // Worktree lifecycle subscribers: create worktrees on Stage::Coding + // and remove them on terminal stages (Done, Archived, Abandoned, Superseded). + crate::agents::pool::worktree_lifecycle::spawn_worktree_create_subscriber( + root.clone(), + agents.port(), + ); + crate::agents::pool::worktree_lifecycle::spawn_worktree_cleanup_subscriber(root.clone()); + // Mergemaster auto-spawn subscriber: reacts to TransitionFired events for // Stage::MergeFailure { kind: ConflictDetected } and spawns mergemaster // directly from the typed event, eliminating the predicate-mismatch diff --git a/server/src/worktree/mod.rs b/server/src/worktree/mod.rs index 9035617f..d7fa2d8c 100644 --- a/server/src/worktree/mod.rs +++ b/server/src/worktree/mod.rs @@ -10,6 +10,7 @@ mod sweep; pub use cleanup::{format_report, run_cleanup}; pub use create::create_worktree; pub use create::install_pre_commit_hook; +pub(crate) use git::detect_base_branch; pub use git::migrate_slug_paths; pub use remove::remove_worktree_by_story_id; pub use sweep::sweep_orphaned_worktrees;