From 4c8fe910a7f9573701b1fdab71ab554dacbaa90a Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 10 Apr 2026 16:54:14 +0000 Subject: [PATCH] huskies: merge 533_story_crdt_based_done_archived_sweep_to_replace_filesystem_based_watcher_sweep --- server/src/io/watcher.rs | 194 ++++----------------------------------- server/src/main.rs | 34 +++++-- 2 files changed, 43 insertions(+), 185 deletions(-) diff --git a/server/src/io/watcher.rs b/server/src/io/watcher.rs index e9690662..2d833c35 100644 --- a/server/src/io/watcher.rs +++ b/server/src/io/watcher.rs @@ -323,37 +323,24 @@ fn flush_pending( } /// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention -/// duration to `6_archived` via CRDT state transitions. Also prunes worktrees -/// for items already in `6_archived`. +/// duration to `6_archived` via CRDT state transitions. /// -/// All state is read from CRDT — no filesystem access. -fn sweep_done_to_archived(_work_dir: &Path, git_root: &Path, done_retention: Duration) { +/// All state is read from and written to CRDT — no filesystem access. +/// Worktree pruning is handled separately by the CRDT event subscriber. +pub(crate) fn sweep_done_to_archived(done_retention: Duration) { use crate::pipeline_state::{Stage, read_all_typed}; for item in read_all_typed() { - match &item.stage { - Stage::Done { merged_at, .. } => { - let age = chrono::Utc::now() - .signed_duration_since(*merged_at) - .to_std() - .unwrap_or_default(); - if age >= done_retention { - let story_id = &item.story_id.0; - crate::db::move_item_stage(story_id, "6_archived", None); - slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); - if let Err(e) = crate::worktree::prune_worktree_sync(git_root, story_id) { - slog!("[watcher] sweep: worktree prune failed for {story_id}: {e}"); - } - } - } - Stage::Archived { .. } => { - // Prune stale worktrees for archived items. + if let Stage::Done { merged_at, .. } = &item.stage { + let age = chrono::Utc::now() + .signed_duration_since(*merged_at) + .to_std() + .unwrap_or_default(); + if age >= done_retention { let story_id = &item.story_id.0; - if let Err(e) = crate::worktree::prune_worktree_sync(git_root, story_id) { - slog!("[watcher] sweep: worktree prune failed for {story_id}: {e}"); - } + crate::db::move_item_stage(story_id, "6_archived", None); + slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); } - _ => {} } } } @@ -361,18 +348,16 @@ fn sweep_done_to_archived(_work_dir: &Path, git_root: &Path, done_retention: Dur /// Start the filesystem watcher on a dedicated OS thread. /// /// Watches `.huskies/project.toml` and `.huskies/agents.toml` for config -/// hot-reload, and periodically sweeps `5_done/` → `6_archived/`. +/// hot-reload, and periodically sweeps `5_done/` → `6_archived/` via CRDT. /// /// Work-item pipeline events (stage transitions) are no longer driven by /// filesystem events — they originate from CRDT state changes via /// [`crate::crdt_state::subscribe`]. /// -/// `work_dir` — absolute path to `.huskies/work/` (used for sweep only). /// `git_root` — project root (passed to `git` commands and config loading). /// `event_tx` — broadcast sender for `ConfigChanged` events. /// `watcher_config` — initial sweep configuration loaded from `project.toml`. pub fn start_watcher( - work_dir: PathBuf, git_root: PathBuf, event_tx: broadcast::Sender, watcher_config: WatcherConfig, @@ -498,7 +483,7 @@ pub fn start_watcher( let now = Instant::now(); if now.duration_since(last_sweep) >= sweep_interval { last_sweep = now; - sweep_done_to_archived(&work_dir, &git_root, done_retention); + sweep_done_to_archived(done_retention); } } } @@ -1101,16 +1086,14 @@ mod tests { // ── sweep_done_to_archived (CRDT-based) ───────────────────────────────── // - // The sweep function now reads from `read_all_typed()` and checks + // The sweep function reads from `read_all_typed()` and checks // `Stage::Done { merged_at, .. }`. Items created via // `write_item_with_content("5_done")` project `merged_at = Utc::now()`, // so we test with Duration::ZERO to sweep immediately and with a long - // retention to verify items are kept. + // retention to verify items are kept. No filesystem access is involved. #[test] fn sweep_moves_old_items_to_archived() { - let tmp = TempDir::new().unwrap(); - crate::db::ensure_content_store(); crate::db::write_item_with_content( "9880_story_sweep_old", @@ -1119,11 +1102,7 @@ mod tests { ); // With ZERO retention, any Done item should be swept. - sweep_done_to_archived( - &tmp.path().join(".huskies/work"), - tmp.path(), - Duration::ZERO, - ); + sweep_done_to_archived(Duration::ZERO); // Verify the item was moved to 6_archived in the CRDT. let items = crate::pipeline_state::read_all_typed(); @@ -1136,8 +1115,6 @@ mod tests { #[test] fn sweep_keeps_recent_items_in_done() { - let tmp = TempDir::new().unwrap(); - crate::db::ensure_content_store(); crate::db::write_item_with_content( "9881_story_sweep_new", @@ -1146,11 +1123,7 @@ mod tests { ); // With a very long retention, the item (merged_at ≈ now) should stay. - sweep_done_to_archived( - &tmp.path().join(".huskies/work"), - tmp.path(), - Duration::from_secs(999_999), - ); + sweep_done_to_archived(Duration::from_secs(999_999)); let items = crate::pipeline_state::read_all_typed(); let item = items.iter().find(|i| i.story_id.0 == "9881_story_sweep_new"); @@ -1162,8 +1135,6 @@ mod tests { #[test] fn sweep_respects_custom_retention() { - let tmp = TempDir::new().unwrap(); - crate::db::ensure_content_store(); crate::db::write_item_with_content( "9882_story_sweep_custom", @@ -1172,11 +1143,7 @@ mod tests { ); // With ZERO retention, sweep should promote. - sweep_done_to_archived( - &tmp.path().join(".huskies/work"), - tmp.path(), - Duration::ZERO, - ); + sweep_done_to_archived(Duration::ZERO); let items = crate::pipeline_state::read_all_typed(); let item = items.iter().find(|i| i.story_id.0 == "9882_story_sweep_custom"); @@ -1185,127 +1152,4 @@ mod tests { "item should be archived with zero retention" ); } - - // ── sweep worktree pruning ───────────────────────────────────────────── - - /// Helper: create a real git worktree at `wt_path` on a new branch. - fn create_git_worktree(git_root: &std::path::Path, wt_path: &std::path::Path, branch: &str) { - use std::process::Command; - let _ = Command::new("git") - .args(["branch", branch]) - .current_dir(git_root) - .output(); - Command::new("git") - .args(["worktree", "add", &wt_path.to_string_lossy(), branch]) - .current_dir(git_root) - .output() - .expect("git worktree add"); - } - - #[test] - fn sweep_prunes_worktree_when_story_promoted_to_archived() { - let tmp = TempDir::new().unwrap(); - let git_root = tmp.path().to_path_buf(); - init_git_repo(&git_root); - - crate::db::ensure_content_store(); - let story_id = "9883_story_prune_on_promote"; - crate::db::write_item_with_content( - story_id, - "5_done", - "---\nname: test\n---\n", - ); - - // Create a real git worktree for this story. - let wt_path = crate::worktree::worktree_path(&git_root, story_id); - fs::create_dir_all(wt_path.parent().unwrap()).unwrap(); - create_git_worktree(&git_root, &wt_path, &format!("feature/story-{story_id}")); - assert!(wt_path.exists(), "worktree must exist before sweep"); - - sweep_done_to_archived( - &git_root.join(".huskies/work"), - &git_root, - Duration::ZERO, - ); - - // Story must be archived in CRDT. - let items = crate::pipeline_state::read_all_typed(); - let item = items.iter().find(|i| i.story_id.0 == story_id); - assert!( - item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), - "story should be archived" - ); - // Worktree must be removed. - assert!( - !wt_path.exists(), - "worktree should be removed after archiving" - ); - } - - #[test] - fn sweep_prunes_worktrees_for_already_archived_stories() { - let tmp = TempDir::new().unwrap(); - let git_root = tmp.path().to_path_buf(); - init_git_repo(&git_root); - - crate::db::ensure_content_store(); - let story_id = "9884_story_stale_worktree"; - crate::db::write_item_with_content( - story_id, - "6_archived", - "---\nname: stale\n---\n", - ); - - // Create a real git worktree that was never cleaned up. - let wt_path = crate::worktree::worktree_path(&git_root, story_id); - fs::create_dir_all(wt_path.parent().unwrap()).unwrap(); - create_git_worktree(&git_root, &wt_path, &format!("feature/story-{story_id}")); - assert!(wt_path.exists(), "stale worktree must exist before sweep"); - - sweep_done_to_archived( - &git_root.join(".huskies/work"), - &git_root, - Duration::from_secs(999_999), - ); - - // Stale worktree should be pruned. - assert!( - !wt_path.exists(), - "stale worktree should be pruned by sweep" - ); - } - - #[test] - fn sweep_archives_story_even_when_worktree_removal_fails() { - let tmp = TempDir::new().unwrap(); - let git_root = tmp.path().to_path_buf(); - init_git_repo(&git_root); - - crate::db::ensure_content_store(); - let story_id = "9885_story_fake_worktree"; - crate::db::write_item_with_content( - story_id, - "5_done", - "---\nname: test\n---\n", - ); - - // Create a plain directory at the expected worktree path — not a real - // git worktree, so `git worktree remove` will fail. - let wt_path = crate::worktree::worktree_path(&git_root, story_id); - fs::create_dir_all(&wt_path).unwrap(); - - sweep_done_to_archived( - &git_root.join(".huskies/work"), - &git_root, - Duration::ZERO, - ); - - // Story must be archived in CRDT despite worktree removal failure. - let items = crate::pipeline_state::read_all_typed(); - let item = items.iter().find(|i| i.story_id.0 == story_id); - assert!( - item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), - "story should be archived even when worktree removal fails" - ); - } } diff --git a/server/src/main.rs b/server/src/main.rs index 1c7e3fdf..bf1b9ddb 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -328,26 +328,40 @@ async fn main() -> Result<(), std::io::Error> { let watchdog_root: Option = app_state.project_root.lock().unwrap().clone(); AgentPool::spawn_watchdog(Arc::clone(&agents), watchdog_root); - // Filesystem watcher: only watches config files (project.toml, agents.toml) and - // handles the sweep of done→archived. Work-item pipeline events are now driven - // by CRDT state transitions via crdt_state::subscribe(). + // Filesystem watcher: watches config files (project.toml, agents.toml) for + // hot-reload and runs the CRDT-based done→archived sweep. Work-item pipeline + // events are driven by CRDT state transitions via crdt_state::subscribe(). if let Some(ref root) = *app_state.project_root.lock().unwrap() { - let work_dir = root.join(".huskies").join("work"); - if work_dir.is_dir() { - let watcher_config = config::ProjectConfig::load(root) - .map(|c| c.watcher) - .unwrap_or_default(); - io::watcher::start_watcher(work_dir, root.clone(), watcher_tx.clone(), watcher_config); - } + let watcher_config = config::ProjectConfig::load(root) + .map(|c| c.watcher) + .unwrap_or_default(); + io::watcher::start_watcher(root.clone(), watcher_tx.clone(), watcher_config); } // Bridge CRDT state-transition events to the watcher broadcast channel. // This replaces the filesystem watcher as the source of WorkItem events. + // Also prunes worktrees when stories transition to 6_archived. { let crdt_watcher_tx = watcher_tx.clone(); + let crdt_prune_root: Option = app_state.project_root.lock().unwrap().clone(); if let Some(mut crdt_rx) = crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { + // Prune the worktree when a story is archived. + if evt.to_stage == "6_archived" + && let Some(root) = crdt_prune_root.as_ref().cloned() + { + let story_id = evt.story_id.clone(); + tokio::task::spawn_blocking(move || { + if let Err(e) = + crate::worktree::prune_worktree_sync(&root, &story_id) + { + crate::slog!( + "[crdt] worktree prune failed for {story_id}: {e}" + ); + } + }); + } let (action, commit_msg) = io::watcher::stage_metadata(&evt.to_stage, &evt.story_id) .unwrap_or(("update", format!("huskies: update {}", evt.story_id)));