From 615e1c7f73eed4e88190c6be4b49f4bb1178c6ff Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 19:51:27 +0000 Subject: [PATCH] huskies: merge 738_refactor_delete_fs_shadow_code_from_lifecycle_rs_and_the_work_directory_watcher --- server/src/agents/lifecycle.rs | 125 ++- .../agents/pool/auto_assign/auto_assign.rs | 3 +- .../src/agents/pool/auto_assign/reconcile.rs | 10 +- .../src/agents/pool/pipeline/advance/mod.rs | 43 +- .../pool/pipeline/advance/tests_regression.rs | 6 +- server/src/agents/pool/pipeline/merge.rs | 3 +- server/src/agents/pool/start/mod.rs | 2 +- server/src/agents/pool/stop.rs | 4 +- server/src/chat/commands/move_story.rs | 2 +- server/src/http/mcp/diagnostics/mod.rs | 6 +- server/src/http/mcp/diagnostics/permission.rs | 6 +- server/src/http/mcp/merge_tools.rs | 2 +- server/src/http/mcp/qa_tools.rs | 8 +- server/src/http/mcp/story_tools/bug.rs | 5 +- server/src/http/mcp/story_tools/story.rs | 4 +- server/src/io/watcher.rs | 753 +----------------- server/src/service/timer/io.rs | 4 +- server/src/service/timer/mod.rs | 2 +- 18 files changed, 105 insertions(+), 883 deletions(-) diff --git a/server/src/agents/lifecycle.rs b/server/src/agents/lifecycle.rs index c133fdac..dffbcfdb 100644 --- a/server/src/agents/lifecycle.rs +++ b/server/src/agents/lifecycle.rs @@ -1,4 +1,7 @@ -//! Story lifecycle helpers — file creation, archival, and stage transitions for pipeline items. +//! Story lifecycle helpers — archival and stage transitions for pipeline items. +//! +//! All pipeline state lives in the CRDT. These functions never consult the +//! filesystem for work-item data — CRDT lookup failures propagate as errors. use std::path::Path; use std::process::Command; @@ -25,7 +28,6 @@ pub(crate) fn item_type_from_id(item_id: &str) -> &'static str { /// `sources` stages, then updates the stage. Optionally clears front-matter /// fields from the stored content. Returns the source stage on success. fn move_item<'a>( - _project_root: &Path, story_id: &str, sources: &'a [&'a str], target_dir: &str, @@ -81,20 +83,6 @@ fn move_item<'a>( return Ok(Some(src_dir)); } - // Item not found in CRDT — check the content store as a migration - // fallback. This handles items that were imported into the DB but - // haven't yet been replicated into the CRDT layer. Unlike the old - // filesystem fallback (removed — see story 517), this path reads - // from the authoritative in-memory DB and cannot cause state drift. - if let Some(mut content) = crate::db::read_content(story_id) { - for field in fields_to_clear { - content = clear_front_matter_field_in_content(&content, field); - } - crate::db::write_item_with_content(story_id, target_dir, &content); - slog!("[lifecycle] Moved '{story_id}' to work/{target_dir}/ (content store fallback)"); - return Ok(Some(sources[0])); - } - if missing_ok { slog!("[lifecycle] Work item '{story_id}' not found; skipping move to work/{target_dir}/"); return Ok(None); @@ -114,17 +102,8 @@ fn move_item<'a>( /// etc.) are left untouched. This prevents coders from accidentally demoting a story /// that has already advanced past the coding stage. /// Idempotent: if already in `2_current/`, returns Ok. If not found, logs and returns Ok. -pub fn move_story_to_current(project_root: &Path, story_id: &str) -> Result<(), String> { - move_item( - project_root, - story_id, - &["1_backlog"], - "2_current", - &[], - true, - &[], - ) - .map(|_| ()) +pub fn move_story_to_current(story_id: &str) -> Result<(), String> { + move_item(story_id, &["1_backlog"], "2_current", &[], true, &[]).map(|_| ()) } /// Check whether a feature branch `feature/story-{story_id}` exists and has @@ -162,9 +141,8 @@ pub fn feature_branch_has_unmerged_changes(project_root: &Path, story_id: &str) /// /// Idempotent if already in `5_done/` or `6_archived/`. Errors if not found in any earlier stage. /// Spikes may transition directly from `3_qa/` to `5_done/`, skipping the merge stage. -pub fn move_story_to_done(project_root: &Path, story_id: &str) -> Result<(), String> { +pub fn move_story_to_done(story_id: &str) -> Result<(), String> { move_item( - project_root, story_id, &["2_current", "3_qa", "4_merge"], "5_done", @@ -178,9 +156,8 @@ pub fn move_story_to_done(project_root: &Path, story_id: &str) -> Result<(), Str /// Move a story/bug from `work/2_current/` or `work/3_qa/` to `work/4_merge/`. /// /// Idempotent if already in `4_merge/`. Errors if not found in `2_current/` or `3_qa/`. -pub fn move_story_to_merge(project_root: &Path, story_id: &str) -> Result<(), String> { +pub fn move_story_to_merge(story_id: &str) -> Result<(), String> { move_item( - project_root, story_id, &["2_current", "3_qa"], "4_merge", @@ -194,9 +171,8 @@ pub fn move_story_to_merge(project_root: &Path, story_id: &str) -> Result<(), St /// Move a story/bug from `work/2_current/` to `work/3_qa/`. /// /// Idempotent if already in `3_qa/`. Errors if not found in `2_current/`. -pub fn move_story_to_qa(project_root: &Path, story_id: &str) -> Result<(), String> { +pub fn move_story_to_qa(story_id: &str) -> Result<(), String> { move_item( - project_root, story_id, &["2_current"], "3_qa", @@ -208,13 +184,8 @@ pub fn move_story_to_qa(project_root: &Path, story_id: &str) -> Result<(), Strin } /// Move a story from `work/3_qa/` back to `work/2_current/`, clearing `review_hold` and writing notes. -pub fn reject_story_from_qa( - project_root: &Path, - story_id: &str, - notes: &str, -) -> Result<(), String> { +pub fn reject_story_from_qa(story_id: &str, notes: &str) -> Result<(), String> { let moved = move_item( - project_root, story_id, &["3_qa"], "2_current", @@ -240,11 +211,7 @@ pub fn reject_story_from_qa( /// Accepts `target_stage` as one of: `backlog`, `current`, `qa`, `merge`, `done`. /// Idempotent: if the item is already in the target stage, returns Ok. /// Returns `(from_stage, to_stage)` on success. -pub fn move_story_to_stage( - project_root: &Path, - story_id: &str, - target_stage: &str, -) -> Result<(String, String), String> { +pub fn move_story_to_stage(story_id: &str, target_stage: &str) -> Result<(String, String), String> { const STAGES: &[(&str, &str)] = &[ ("backlog", "1_backlog"), ("current", "2_current"), @@ -267,16 +234,8 @@ pub fn move_story_to_stage( let all_dirs: Vec<&str> = STAGES.iter().map(|(_, dir)| *dir).collect(); - match move_item( - project_root, - story_id, - &all_dirs, - target_dir, - &[], - false, - &[], - ) - .map_err(|_| format!("Work item '{story_id}' not found in any pipeline stage."))? + match move_item(story_id, &all_dirs, target_dir, &[], false, &[]) + .map_err(|_| format!("Work item '{story_id}' not found in any pipeline stage."))? { Some(src_dir) => { let from_stage = STAGES @@ -293,9 +252,8 @@ pub fn move_story_to_stage( /// Move a bug from `work/2_current/` or `work/1_backlog/` to `work/5_done/`. /// /// Idempotent if already in `5_done/`. Errors if not found in `2_current/` or `1_backlog/`. -pub fn close_bug_to_archive(project_root: &Path, bug_id: &str) -> Result<(), String> { +pub fn close_bug_to_archive(bug_id: &str) -> Result<(), String> { move_item( - project_root, bug_id, &["2_current", "1_backlog"], "5_done", @@ -313,32 +271,57 @@ mod tests { // ── move_story_to_current tests ──────────────────────────────────────────── #[test] - fn move_story_to_current_from_content_store() { - // Seed via the content store (the DB's in-memory representation). - // CRDT is not initialised in unit tests, so move_item uses the - // content-store fallback which re-imports to the target stage. + fn move_story_to_current_from_crdt() { + // Seed via CRDT — the sole source of truth for pipeline state. crate::db::ensure_content_store(); - crate::db::write_content( + crate::db::write_item_with_content( "99950_story_lifecycle", + "1_backlog", "---\nname: Lifecycle Test\n---\n# Story\n", ); - let tmp = tempfile::tempdir().unwrap(); - move_story_to_current(tmp.path(), "99950_story_lifecycle").unwrap(); + move_story_to_current("99950_story_lifecycle").unwrap(); - // Verify the content store now has the item (imported at target stage). - let content = crate::db::read_content("99950_story_lifecycle") - .expect("item should be in content store after move"); - assert!( - content.contains("Lifecycle Test"), - "content should be preserved after move" + // Verify the CRDT now has the item in 2_current. + let item = crate::pipeline_state::read_typed("99950_story_lifecycle") + .expect("CRDT read should succeed") + .expect("item should exist in CRDT after move"); + assert_eq!( + item.stage.dir_name(), + "2_current", + "item should be in 2_current after move" ); } #[test] fn move_story_to_current_noop_when_not_found() { - let tmp = tempfile::tempdir().unwrap(); - assert!(move_story_to_current(tmp.path(), "99_missing").is_ok()); + assert!(move_story_to_current("99_missing").is_ok()); + } + + /// Lifecycle operation runs to completion using only CRDT state; + /// no `.huskies/work//` tree is consulted because no `project_root` + /// is passed — the functions operate purely on the CRDT. + #[test] + fn move_story_uses_only_crdt_no_fs_shadow() { + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + "99951_story_crdt_only", + "2_current", + "---\nname: CRDT Only Test\n---\n# Story\n", + ); + + // No filesystem path is involved — lifecycle functions no longer + // accept a project_root, proving they never touch the filesystem. + move_story_to_done("99951_story_crdt_only").unwrap(); + + let item = crate::pipeline_state::read_typed("99951_story_crdt_only") + .expect("CRDT read should succeed") + .expect("item should exist in CRDT"); + assert_eq!( + item.stage.dir_name(), + "5_done", + "item should be in 5_done after move" + ); } // ── item_type_from_id tests ──────────────────────────────────────────────── diff --git a/server/src/agents/pool/auto_assign/auto_assign.rs b/server/src/agents/pool/auto_assign/auto_assign.rs index 5c8e2e2d..602459f3 100644 --- a/server/src/agents/pool/auto_assign/auto_assign.rs +++ b/server/src/agents/pool/auto_assign/auto_assign.rs @@ -64,8 +64,7 @@ impl AgentPool { } // All deps met — promote from backlog to current. slog!("[auto-assign] Story '{story_id}' deps met; promoting from backlog to current."); - if let Err(e) = crate::agents::lifecycle::move_story_to_current(project_root, story_id) - { + if let Err(e) = crate::agents::lifecycle::move_story_to_current(story_id) { slog!("[auto-assign] Failed to promote '{story_id}' to current: {e}"); } } diff --git a/server/src/agents/pool/auto_assign/reconcile.rs b/server/src/agents/pool/auto_assign/reconcile.rs index f78eb2d2..aab56ba1 100644 --- a/server/src/agents/pool/auto_assign/reconcile.rs +++ b/server/src/agents/pool/auto_assign/reconcile.rs @@ -168,7 +168,7 @@ impl AgentPool { match qa_mode { crate::io::story_metadata::QaMode::Server => { - if let Err(e) = crate::agents::move_story_to_merge(project_root, story_id) { + if let Err(e) = crate::agents::move_story_to_merge(story_id) { eprintln!( "[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}" ); @@ -189,7 +189,7 @@ impl AgentPool { } } crate::io::story_metadata::QaMode::Agent => { - if let Err(e) = crate::agents::move_story_to_qa(project_root, story_id) { + if let Err(e) = crate::agents::move_story_to_qa(story_id) { eprintln!( "[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}" ); @@ -208,7 +208,7 @@ impl AgentPool { } } crate::io::story_metadata::QaMode::Human => { - if let Err(e) = crate::agents::move_story_to_qa(project_root, story_id) { + if let Err(e) = crate::agents::move_story_to_qa(story_id) { eprintln!( "[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}" ); @@ -308,9 +308,7 @@ impl AgentPool { status: "review_hold".to_string(), message: "Passed QA — waiting for human review.".to_string(), }); - } else if let Err(e) = - crate::agents::move_story_to_merge(project_root, story_id) - { + } else if let Err(e) = crate::agents::move_story_to_merge(story_id) { eprintln!( "[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}" ); diff --git a/server/src/agents/pool/pipeline/advance/mod.rs b/server/src/agents/pool/pipeline/advance/mod.rs index 90e1be55..0c79f6f6 100644 --- a/server/src/agents/pool/pipeline/advance/mod.rs +++ b/server/src/agents/pool/pipeline/advance/mod.rs @@ -71,10 +71,8 @@ impl AgentPool { "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ qa: server — moving directly to merge." ); - if let Err(e) = crate::agents::lifecycle::move_story_to_merge( - &project_root, - story_id, - ) { + if let Err(e) = crate::agents::lifecycle::move_story_to_merge(story_id) + { slog_error!( "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" ); @@ -88,9 +86,7 @@ impl AgentPool { "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ qa: agent — moving to QA." ); - if let Err(e) = - crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) - { + if let Err(e) = crate::agents::lifecycle::move_story_to_qa(story_id) { slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); } else if let Err(e) = self .start_agent(&project_root, story_id, Some("qa"), None, None) @@ -106,9 +102,7 @@ impl AgentPool { "[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \ qa: human — holding for human review." ); - if let Err(e) = - crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) - { + if let Err(e) = crate::agents::lifecycle::move_story_to_qa(story_id) { slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}"); } else { write_review_hold_to_store(story_id); @@ -150,10 +144,9 @@ impl AgentPool { }; match qa_mode { crate::io::story_metadata::QaMode::Server => { - if let Err(e) = crate::agents::lifecycle::move_story_to_merge( - &project_root, - story_id, - ) { + if let Err(e) = + crate::agents::lifecycle::move_story_to_merge(story_id) + { slog_error!( "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" ); @@ -163,10 +156,8 @@ impl AgentPool { } } crate::io::story_metadata::QaMode::Agent => { - if let Err(e) = crate::agents::lifecycle::move_story_to_qa( - &project_root, - story_id, - ) { + if let Err(e) = crate::agents::lifecycle::move_story_to_qa(story_id) + { slog_error!( "[pipeline] Failed to move '{story_id}' to 3_qa/: {e}" ); @@ -180,10 +171,8 @@ impl AgentPool { } } crate::io::story_metadata::QaMode::Human => { - if let Err(e) = crate::agents::lifecycle::move_story_to_qa( - &project_root, - story_id, - ) { + if let Err(e) = crate::agents::lifecycle::move_story_to_qa(story_id) + { slog_error!( "[pipeline] Failed to move '{story_id}' to 3_qa/: {e}" ); @@ -277,10 +266,8 @@ impl AgentPool { "[pipeline] QA passed gates and coverage for '{story_id}'. \ Moving directly to merge." ); - if let Err(e) = crate::agents::lifecycle::move_story_to_merge( - &project_root, - story_id, - ) { + if let Err(e) = crate::agents::lifecycle::move_story_to_merge(story_id) + { slog_error!( "[pipeline] Failed to move '{story_id}' to 4_merge/: {e}" ); @@ -391,9 +378,7 @@ impl AgentPool { slog!( "[pipeline] Post-merge tests passed for '{story_id}'. Moving to done." ); - if let Err(e) = - crate::agents::lifecycle::move_story_to_done(&project_root, story_id) - { + if let Err(e) = crate::agents::lifecycle::move_story_to_done(story_id) { slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}"); } self.remove_agents_for_story(story_id); diff --git a/server/src/agents/pool/pipeline/advance/tests_regression.rs b/server/src/agents/pool/pipeline/advance/tests_regression.rs index c8114513..084c95ed 100644 --- a/server/src/agents/pool/pipeline/advance/tests_regression.rs +++ b/server/src/agents/pool/pipeline/advance/tests_regression.rs @@ -57,7 +57,11 @@ async fn mergemaster_blocks_and_sends_story_blocked_when_no_commits_ahead() { ) .unwrap(); crate::db::ensure_content_store(); - crate::db::write_content("9919_story_no_commits", "---\nname: Test\n---\n"); + crate::db::write_item_with_content( + "9919_story_no_commits", + "2_current", + "---\nname: Test\n---\n", + ); let pool = AgentPool::new_test(3001); let mut rx = pool.watcher_tx.subscribe(); diff --git a/server/src/agents/pool/pipeline/merge.rs b/server/src/agents/pool/pipeline/merge.rs index dacb4afd..c8959786 100644 --- a/server/src/agents/pool/pipeline/merge.rs +++ b/server/src/agents/pool/pipeline/merge.rs @@ -155,8 +155,7 @@ impl AgentPool { }); } - let story_archived = - crate::agents::lifecycle::move_story_to_done(project_root, story_id).is_ok(); + let story_archived = crate::agents::lifecycle::move_story_to_done(story_id).is_ok(); if story_archived { self.remove_agents_for_story(story_id); } diff --git a/server/src/agents/pool/start/mod.rs b/server/src/agents/pool/start/mod.rs index ee53abc2..17c8d972 100644 --- a/server/src/agents/pool/start/mod.rs +++ b/server/src/agents/pool/start/mod.rs @@ -79,7 +79,7 @@ impl AgentPool { .map(|s| s == PipelineStage::Coder) .unwrap_or(true); if starting_a_coder { - crate::agents::lifecycle::move_story_to_current(project_root, story_id)?; + crate::agents::lifecycle::move_story_to_current(story_id)?; } // Validate that the agent's configured stage matches the story's diff --git a/server/src/agents/pool/stop.rs b/server/src/agents/pool/stop.rs index ddb8d4f8..62a2d105 100644 --- a/server/src/agents/pool/stop.rs +++ b/server/src/agents/pool/stop.rs @@ -143,7 +143,7 @@ mod tests { let story_content = "test"; fs::write(current.join("60_story_cleanup.md"), story_content).unwrap(); crate::db::ensure_content_store(); - crate::db::write_content("60_story_cleanup", story_content); + crate::db::write_item_with_content("60_story_cleanup", "2_current", story_content); let pool = AgentPool::new_test(3001); pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); @@ -152,7 +152,7 @@ mod tests { assert_eq!(pool.list_agents().unwrap().len(), 3); - move_story_to_done(root, "60_story_cleanup").unwrap(); + move_story_to_done("60_story_cleanup").unwrap(); pool.remove_agents_for_story("60_story_cleanup"); let remaining = pool.list_agents().unwrap(); diff --git a/server/src/chat/commands/move_story.rs b/server/src/chat/commands/move_story.rs index 579986ba..eeedab71 100644 --- a/server/src/chat/commands/move_story.rs +++ b/server/src/chat/commands/move_story.rs @@ -62,7 +62,7 @@ pub(super) fn handle_move(ctx: &CommandContext) -> Option { let display_name = found_name.as_deref().unwrap_or(&story_id); - match move_story_to_stage(ctx.effective_root(), &story_id, &target_stage) { + match move_story_to_stage(&story_id, &target_stage) { Ok((from_stage, to_stage)) => Some(format!( "Moved **{display_name}** from **{from_stage}** to **{to_stage}**." )), diff --git a/server/src/http/mcp/diagnostics/mod.rs b/server/src/http/mcp/diagnostics/mod.rs index 53df068b..2892b77c 100644 --- a/server/src/http/mcp/diagnostics/mod.rs +++ b/server/src/http/mcp/diagnostics/mod.rs @@ -54,7 +54,7 @@ pub(crate) async fn tool_rebuild_and_restart(ctx: &AppContext) -> Result Result { +pub(crate) fn tool_move_story(args: &Value, _ctx: &AppContext) -> Result { let story_id = args .get("story_id") .and_then(|v| v.as_str()) @@ -64,9 +64,7 @@ pub(crate) fn tool_move_story(args: &Value, ctx: &AppContext) -> Result Result Result Result Result Result Result Option<(&'static str, Strin Some((action, msg)) } -/// Return the pipeline stage name for a path if it is a `.md` file living -/// directly inside one of the known work subdirectories, otherwise `None`. -/// -/// Explicitly returns `None` for any path under `.huskies/worktrees/` so -/// that code changes made by agents in their isolated worktrees are never -/// auto-committed to master by the watcher. -/// -/// Retained for tests; no longer called in production (CRDT drives events). -#[cfg(test)] -fn stage_for_path(path: &Path) -> Option { - // Reject any path that passes through the worktrees directory. - if path.components().any(|c| c.as_os_str() == "worktrees") { - return None; - } - - if path.extension().is_none_or(|e| e != "md") { - return None; - } - let stage = path - .parent() - .and_then(|p| p.file_name()) - .and_then(|n| n.to_str())?; - matches!( - stage, - "1_backlog" | "2_current" | "3_qa" | "4_merge" | "5_done" | "6_archived" - ) - .then(|| stage.to_string()) -} - -/// Stage all changes in the work directory and commit with the given message. -/// -/// Uses `git add -A .huskies/work/` to catch both additions and deletions in -/// a single commit. Returns `Ok(true)` if a commit was made, `Ok(false)` if -/// there was nothing to commit, and `Err` for unexpected failures. -/// -/// Retained for tests; no longer called in production (CRDT drives events). -#[cfg(test)] -fn git_add_work_and_commit(git_root: &Path, message: &str) -> Result { - let work_rel = PathBuf::from(".huskies").join("work"); - - let add_out = std::process::Command::new("git") - .args(["add", "-A"]) - .arg(&work_rel) - .current_dir(git_root) - .output() - .map_err(|e| format!("git add: {e}"))?; - if !add_out.status.success() { - return Err(format!( - "git add failed: {}", - String::from_utf8_lossy(&add_out.stderr) - )); - } - - let commit_out = std::process::Command::new("git") - .args(["commit", "-m", message]) - .current_dir(git_root) - .output() - .map_err(|e| format!("git commit: {e}"))?; - - if commit_out.status.success() { - return Ok(true); - } - - let stderr = String::from_utf8_lossy(&commit_out.stderr); - let stdout = String::from_utf8_lossy(&commit_out.stdout); - if stdout.contains("nothing to commit") || stderr.contains("nothing to commit") { - return Ok(false); - } - - Err(format!("git commit failed: {stderr}")) -} - -/// Stages that represent meaningful git checkpoints (creation and archival). -/// Intermediate stages (current, qa, merge, done) are transient pipeline state -/// that don't need to be committed — they're only relevant while the server is -/// running and are broadcast to WebSocket clients for real-time UI updates. -/// -/// Retained for tests; no longer called in production (CRDT drives events). -#[cfg(test)] -const COMMIT_WORTHY_STAGES: &[&str] = &["1_backlog", "5_done", "6_archived"]; - -/// Return `true` if changes in `stage` should be committed to git. -/// -/// Retained for tests; no longer called in production (CRDT drives events). -#[cfg(test)] -fn should_commit_stage(stage: &str) -> bool { - COMMIT_WORTHY_STAGES.contains(&stage) -} - -/// Process a batch of pending (path → stage) entries: commit and broadcast. -/// -/// Only files that still exist on disk are used to derive the commit message -/// (they represent the destination of a move or a new file). Deletions are -/// captured by `git add -A .huskies/work/` automatically. -/// -/// Only terminal stages (`1_backlog` and `6_archived`) trigger git commits. -/// All stages broadcast a [`WatcherEvent`] so the frontend stays in sync. -/// -/// Retained for tests; no longer called in production (CRDT drives events). -#[cfg(test)] -fn flush_pending( - pending: &std::collections::HashMap, - git_root: &Path, - event_tx: &broadcast::Sender, -) { - use crate::io::story_metadata::clear_front_matter_field; - - // Separate into files that exist (additions) vs gone (deletions). - let mut additions: Vec<(&PathBuf, &str)> = Vec::new(); - for (path, stage) in pending { - if path.exists() { - additions.push((path, stage.as_str())); - } - } - - // Pick the commit message from the first addition (the meaningful side of a move). - // If there are only deletions, use a generic message. - let (action, item_id, commit_msg) = if let Some((path, stage)) = additions.first() { - let item = path - .file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("unknown"); - if let Some((act, msg)) = stage_metadata(stage, item) { - (act, item.to_string(), msg) - } else { - return; - } - } else { - // Only deletions — pick any pending path for the item name. - let Some((path, _)) = pending.iter().next() else { - return; - }; - let item = path - .file_stem() - .and_then(|s| s.to_str()) - .unwrap_or("unknown"); - ( - "remove", - item.to_string(), - format!("huskies: remove {item}"), - ) - }; - - // Strip stale merge_failure front matter from any story that has left 4_merge/. - for (path, stage) in &additions { - if *stage != "4_merge" - && let Err(e) = clear_front_matter_field(path, "merge_failure") - { - slog!( - "[watcher] Warning: could not clear merge_failure from {}: {e}", - path.display() - ); - } - } - - // Only commit for terminal stages; intermediate moves are broadcast-only. - let dest_stage = additions.first().map_or("unknown", |(_, s)| *s); - let should_commit = should_commit_stage(dest_stage); - - if should_commit { - slog!("[watcher] flush: {commit_msg}"); - match git_add_work_and_commit(git_root, &commit_msg) { - Ok(committed) => { - if committed { - slog!("[watcher] committed: {commit_msg}"); - } else { - slog!("[watcher] skipped (already committed): {commit_msg}"); - } - } - Err(e) => { - slog!("[watcher] git error: {e}"); - return; - } - } - } else { - slog!("[watcher] flush (broadcast-only): {commit_msg}"); - } - - // For move operations, find the source stage from deleted entries with matching item_id. - let from_stage: Option = if !additions.is_empty() { - pending - .iter() - .filter(|(path, _)| !path.exists()) - .find(|(path, _)| path.file_stem().and_then(|s| s.to_str()) == Some(item_id.as_str())) - .map(|(_, stage)| stage.clone()) - } else { - None - }; - - // Always broadcast the event so connected WebSocket clients stay in sync. - let evt = WatcherEvent::WorkItem { - stage: dest_stage.to_string(), - item_id, - action: action.to_string(), - commit_msg, - from_stage, - }; - let _ = event_tx.send(evt); -} - /// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention /// duration to `6_archived` via CRDT state transitions. /// @@ -473,535 +260,7 @@ pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender PathBuf { - let dir = root.join(".huskies").join("work").join(stage); - fs::create_dir_all(&dir).expect("create stage dir"); - dir - } - - // ── git_add_work_and_commit ─────────────────────────────────────────────── - - #[test] - fn git_commit_returns_true_when_file_added() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), "2_current"); - fs::write(stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n").unwrap(); - - let result = git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo"); - assert_eq!( - result, - Ok(true), - "should return Ok(true) when a commit was made" - ); - } - - #[test] - fn git_commit_returns_false_when_nothing_to_commit() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), "2_current"); - fs::write(stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n").unwrap(); - - // First commit — should succeed. - git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo").unwrap(); - - // Second call with no changes — should return Ok(false). - let result = git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo"); - assert_eq!( - result, - Ok(false), - "should return Ok(false) when nothing to commit" - ); - } - - // ── flush_pending ───────────────────────────────────────────────────────── - - #[test] - fn flush_pending_commits_and_broadcasts_for_terminal_stage() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), "1_backlog"); - let story_path = stage_dir.join("42_story_foo.md"); - fs::write(&story_path, "---\nname: test\n---\n").unwrap(); - - let (tx, mut rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(story_path, "1_backlog".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - let evt = rx.try_recv().expect("expected a broadcast event"); - match evt { - WatcherEvent::WorkItem { - stage, - item_id, - action, - commit_msg, - .. - } => { - assert_eq!(stage, "1_backlog"); - assert_eq!(item_id, "42_story_foo"); - assert_eq!(action, "create"); - assert_eq!(commit_msg, "huskies: create 42_story_foo"); - } - other => panic!("unexpected event: {other:?}"), - } - - // Verify the file was actually committed. - let log = std::process::Command::new("git") - .args(["log", "--oneline", "-1"]) - .current_dir(tmp.path()) - .output() - .expect("git log"); - let log_msg = String::from_utf8_lossy(&log.stdout); - assert!( - log_msg.contains("huskies: create 42_story_foo"), - "terminal stage should produce a git commit" - ); - } - - #[test] - fn flush_pending_broadcasts_without_commit_for_intermediate_stage() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), "2_current"); - let story_path = stage_dir.join("42_story_foo.md"); - fs::write(&story_path, "---\nname: test\n---\n").unwrap(); - - let (tx, mut rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(story_path, "2_current".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - // Event should still be broadcast for frontend sync. - let evt = rx.try_recv().expect("expected a broadcast event"); - match evt { - WatcherEvent::WorkItem { - stage, - item_id, - action, - commit_msg, - .. - } => { - assert_eq!(stage, "2_current"); - assert_eq!(item_id, "42_story_foo"); - assert_eq!(action, "start"); - assert_eq!(commit_msg, "huskies: start 42_story_foo"); - } - other => panic!("unexpected event: {other:?}"), - } - - // Verify NO git commit was made (only the initial empty commit should exist). - let log = std::process::Command::new("git") - .args(["log", "--oneline"]) - .current_dir(tmp.path()) - .output() - .expect("git log"); - let log_msg = String::from_utf8_lossy(&log.stdout); - assert!( - !log_msg.contains("huskies:"), - "intermediate stage should NOT produce a git commit" - ); - } - - #[test] - fn flush_pending_broadcasts_for_all_pipeline_stages() { - let stages = [ - ("1_backlog", "create", "huskies: create 10_story_x"), - ("3_qa", "qa", "huskies: queue 10_story_x for QA"), - ("4_merge", "merge", "huskies: queue 10_story_x for merge"), - ("5_done", "done", "huskies: done 10_story_x"), - ("6_archived", "accept", "huskies: accept 10_story_x"), - ]; - - for (stage, expected_action, expected_msg) in stages { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), stage); - let story_path = stage_dir.join("10_story_x.md"); - fs::write(&story_path, "---\nname: test\n---\n").unwrap(); - - let (tx, mut rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(story_path, stage.to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - // All stages should broadcast events regardless of commit behavior. - let evt = rx.try_recv().expect("expected broadcast for stage {stage}"); - match evt { - WatcherEvent::WorkItem { - action, commit_msg, .. - } => { - assert_eq!(action, expected_action, "stage {stage}"); - assert_eq!(commit_msg, expected_msg, "stage {stage}"); - } - other => panic!("unexpected event for stage {stage}: {other:?}"), - } - } - } - - #[test] - fn flush_pending_deletion_only_broadcasts_remove_event() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - // Create the work dir tree but NOT the file (simulates a deletion). - make_stage_dir(tmp.path(), "2_current"); - let deleted_path = tmp - .path() - .join(".huskies") - .join("work") - .join("2_current") - .join("42_story_foo.md"); - - let (tx, mut rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(deleted_path, "2_current".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - // Even when nothing was committed (file never existed), an event is broadcast. - let evt = rx - .try_recv() - .expect("expected a broadcast event for deletion"); - match evt { - WatcherEvent::WorkItem { - action, item_id, .. - } => { - assert_eq!(action, "remove"); - assert_eq!(item_id, "42_story_foo"); - } - other => panic!("unexpected event: {other:?}"), - } - } - - #[test] - fn flush_pending_skips_unknown_stage_for_addition() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - // File sits in an unrecognised directory. - let unknown_dir = tmp.path().join(".huskies").join("work").join("9_unknown"); - fs::create_dir_all(&unknown_dir).unwrap(); - let path = unknown_dir.join("42_story_foo.md"); - fs::write(&path, "---\nname: test\n---\n").unwrap(); - - let (tx, mut rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(path, "9_unknown".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - // No event should be broadcast because stage_metadata returns None for unknown stages. - assert!( - rx.try_recv().is_err(), - "no event should be broadcast for unknown stage" - ); - } - - #[test] - fn flush_pending_empty_pending_does_nothing() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - make_stage_dir(tmp.path(), "2_current"); - - let (tx, mut rx) = tokio::sync::broadcast::channel(16); - let pending: HashMap = HashMap::new(); - - // Should not panic and should not broadcast anything. - flush_pending(&pending, tmp.path(), &tx); - assert!(rx.try_recv().is_err(), "no event for empty pending map"); - } - - // ── flush_pending clears merge_failure ───────────────────────────────────── - - #[test] - fn flush_pending_clears_merge_failure_when_leaving_merge_stage() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), "2_current"); - let story_path = stage_dir.join("50_story_retry.md"); - fs::write( - &story_path, - "---\nname: Retry Story\nmerge_failure: \"conflicts detected\"\n---\n# Story\n", - ) - .unwrap(); - - let (tx, _rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(story_path.clone(), "2_current".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - let contents = fs::read_to_string(&story_path).unwrap(); - assert!( - !contents.contains("merge_failure"), - "merge_failure should be stripped when story lands in 2_current" - ); - assert!(contents.contains("name: Retry Story")); - } - - #[test] - fn flush_pending_clears_merge_failure_when_moving_to_backlog() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), "1_backlog"); - let story_path = stage_dir.join("51_story_reset.md"); - fs::write( - &story_path, - "---\nname: Reset Story\nmerge_failure: \"gate failed\"\n---\n# Story\n", - ) - .unwrap(); - - let (tx, _rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(story_path.clone(), "1_backlog".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - let contents = fs::read_to_string(&story_path).unwrap(); - assert!( - !contents.contains("merge_failure"), - "merge_failure should be stripped when story lands in 1_backlog" - ); - } - - #[test] - fn flush_pending_clears_merge_failure_when_moving_to_done() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), "5_done"); - let story_path = stage_dir.join("52_story_done.md"); - fs::write( - &story_path, - "---\nname: Done Story\nmerge_failure: \"stale error\"\n---\n# Story\n", - ) - .unwrap(); - - let (tx, _rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(story_path.clone(), "5_done".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - let contents = fs::read_to_string(&story_path).unwrap(); - assert!( - !contents.contains("merge_failure"), - "merge_failure should be stripped when story lands in 5_done" - ); - } - - #[test] - fn flush_pending_preserves_merge_failure_when_in_merge_stage() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), "4_merge"); - let story_path = stage_dir.join("53_story_merging.md"); - fs::write( - &story_path, - "---\nname: Merging Story\nmerge_failure: \"conflicts\"\n---\n# Story\n", - ) - .unwrap(); - - let (tx, _rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(story_path.clone(), "4_merge".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - let contents = fs::read_to_string(&story_path).unwrap(); - assert!( - contents.contains("merge_failure"), - "merge_failure should be preserved when story is in 4_merge" - ); - } - - #[test] - fn flush_pending_no_op_when_no_merge_failure() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - let stage_dir = make_stage_dir(tmp.path(), "2_current"); - let story_path = stage_dir.join("54_story_clean.md"); - let original = "---\nname: Clean Story\n---\n# Story\n"; - fs::write(&story_path, original).unwrap(); - - let (tx, _rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(story_path.clone(), "2_current".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - let contents = fs::read_to_string(&story_path).unwrap(); - assert_eq!( - contents, original, - "file without merge_failure should be unchanged" - ); - } - - // ── flush_pending from_stage ───────────────────────────────────────────── - - /// AC3: when a pending map contains both a deletion (source stage) and a - /// creation (dest stage) for the same item_id, the broadcast event should - /// have `from_stage` set to the source stage key. - #[test] - fn flush_pending_sets_from_stage_for_move_operations() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - - // Destination exists (file moved here). - let merge_dir = make_stage_dir(tmp.path(), "4_merge"); - let merge_path = merge_dir.join("42_story_foo.md"); - fs::write(&merge_path, "---\nname: test\n---\n").unwrap(); - - // Source path does NOT exist (file was moved away). - make_stage_dir(tmp.path(), "3_qa"); - let qa_path = tmp - .path() - .join(".huskies") - .join("work") - .join("3_qa") - .join("42_story_foo.md"); - - let (tx, mut rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(merge_path, "4_merge".to_string()); // addition - pending.insert(qa_path, "3_qa".to_string()); // deletion - - flush_pending(&pending, tmp.path(), &tx); - - let evt = rx.try_recv().expect("expected event"); - match evt { - WatcherEvent::WorkItem { - stage, from_stage, .. - } => { - assert_eq!(stage, "4_merge"); - assert_eq!(from_stage, Some("3_qa".to_string())); - } - other => panic!("unexpected event: {other:?}"), - } - } - - /// AC3: when a pending map has only an addition (creation, not a move), - /// `from_stage` should be `None`. - #[test] - fn flush_pending_sets_from_stage_to_none_for_creations() { - let tmp = TempDir::new().unwrap(); - init_git_repo(tmp.path()); - - let stage_dir = make_stage_dir(tmp.path(), "2_current"); - let story_path = stage_dir.join("55_story_new.md"); - fs::write(&story_path, "---\nname: New Story\n---\n").unwrap(); - - let (tx, mut rx) = tokio::sync::broadcast::channel(16); - let mut pending = HashMap::new(); - pending.insert(story_path, "2_current".to_string()); - - flush_pending(&pending, tmp.path(), &tx); - - let evt = rx.try_recv().expect("expected event"); - match evt { - WatcherEvent::WorkItem { from_stage, .. } => { - assert_eq!(from_stage, None, "creation should have no from_stage"); - } - other => panic!("unexpected event: {other:?}"), - } - } - - // ── stage_for_path (additional edge cases) ──────────────────────────────── - - #[test] - fn stage_for_path_recognises_pipeline_dirs() { - let base = PathBuf::from("/proj/.huskies/work"); - assert_eq!( - stage_for_path(&base.join("2_current/42_story_foo.md")), - Some("2_current".to_string()) - ); - assert_eq!( - stage_for_path(&base.join("5_done/10_bug_bar.md")), - Some("5_done".to_string()) - ); - assert_eq!( - stage_for_path(&base.join("6_archived/10_bug_bar.md")), - Some("6_archived".to_string()) - ); - assert_eq!(stage_for_path(&base.join("other/file.md")), None); - assert_eq!( - stage_for_path(&base.join("2_current/42_story_foo.txt")), - None - ); - } - - #[test] - fn stage_for_path_ignores_worktree_paths() { - let worktrees = PathBuf::from("/proj/.huskies/worktrees"); - - // Code changes inside a worktree must be ignored. - assert_eq!( - stage_for_path(&worktrees.join("42_story_foo/server/src/main.rs")), - None, - ); - - // Even if a worktree happens to contain a path component that looks - // like a pipeline stage, it must still be ignored. - assert_eq!( - stage_for_path(&worktrees.join("42_story_foo/.huskies/work/2_current/42_story_foo.md")), - None, - ); - - // A path that only contains the word "worktrees" as part of a longer - // segment (not an exact component) must NOT be filtered out. - assert_eq!( - stage_for_path(&PathBuf::from( - "/proj/.huskies/work/2_current/not_worktrees_story.md" - )), - Some("2_current".to_string()), - ); - } - - #[test] - fn should_commit_stage_only_for_terminal_stages() { - // Terminal stages — should commit. - assert!(should_commit_stage("1_backlog")); - assert!(should_commit_stage("5_done")); - assert!(should_commit_stage("6_archived")); - // Intermediate stages — broadcast-only, no commit. - assert!(!should_commit_stage("2_current")); - assert!(!should_commit_stage("3_qa")); - assert!(!should_commit_stage("4_merge")); - // Unknown — no commit. - assert!(!should_commit_stage("unknown")); - } + use std::time::Duration; #[test] fn stage_metadata_returns_correct_actions() { diff --git a/server/src/service/timer/io.rs b/server/src/service/timer/io.rs index b19273df..4ebf2b71 100644 --- a/server/src/service/timer/io.rs +++ b/server/src/service/timer/io.rs @@ -257,9 +257,7 @@ pub async fn tick_once( // Move from backlog to current if needed — the auto-assign // watcher will then start an agent automatically. - if let Err(e) = - crate::agents::lifecycle::move_story_to_current(project_root, &entry.story_id) - { + if let Err(e) = crate::agents::lifecycle::move_story_to_current(&entry.story_id) { crate::slog!( "[timer] Failed to move story {} to current: {e}", entry.story_id diff --git a/server/src/service/timer/mod.rs b/server/src/service/timer/mod.rs index 8f74a669..2afd8108 100644 --- a/server/src/service/timer/mod.rs +++ b/server/src/service/timer/mod.rs @@ -450,7 +450,7 @@ mod tests { // Apply the move-to-current step the tick loop performs. for entry in &due { - crate::agents::lifecycle::move_story_to_current(root, &entry.story_id) + crate::agents::lifecycle::move_story_to_current(&entry.story_id) .expect("move_story_to_current should succeed for backlog story"); }