//! Filesystem watcher for `.huskies/work/` and `.huskies/project.toml`. //! //! Watches the work pipeline directories for file changes, infers the lifecycle //! stage from the target directory name, auto-commits with a deterministic message, //! and broadcasts a [`WatcherEvent`] to all connected WebSocket clients. //! //! Also watches `.huskies/project.toml` for modifications and broadcasts //! [`WatcherEvent::ConfigChanged`] so the frontend can reload the agent roster //! without a server restart. //! //! # Debouncing //! Events are buffered for 300 ms after the last activity. All changes within the //! window are batched into a single `git add + commit`. This avoids double-commits //! when `fs::rename` fires both a remove and a create event. //! //! # Race conditions //! If a mutation handler (e.g. `move_story_to_current`) already committed the //! change, `git commit` will return "nothing to commit". The watcher detects this //! via exit-code inspection and silently skips the commit while still broadcasting //! the event so connected clients stay in sync. use crate::config::{ProjectConfig, WatcherConfig}; use crate::slog; use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher}; use serde::Serialize; use std::path::{Path, PathBuf}; use std::sync::mpsc; use std::time::{Duration, Instant}; use tokio::sync::broadcast; /// A lifecycle event emitted by the filesystem watcher. #[derive(Clone, Debug, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum WatcherEvent { /// A work-pipeline file was created, modified, or deleted. WorkItem { /// Pipeline stage directory (e.g. `"2_current"`, `"5_archived"`). stage: String, /// Work item ID (filename stem without extension, e.g. `"42_story_my_feature"`). item_id: String, /// Semantic action inferred from the stage (e.g. `"start"`, `"accept"`). action: String, /// The deterministic git commit message used (or that would have been used). commit_msg: String, /// The pipeline stage the item moved FROM, populated for move operations. /// `None` for creations, deletions, or synthetic events. from_stage: Option, }, /// `.huskies/project.toml` was modified at the project root (not inside a worktree). ConfigChanged, /// An agent's state changed (started, stopped, completed, etc.). /// Triggers a pipeline state refresh so the frontend can update agent /// assignments without waiting for a filesystem event. AgentStateChanged, /// A story encountered a failure (e.g. merge failure). /// Triggers an error notification to configured Matrix rooms. MergeFailure { /// Work item ID (e.g. `"42_story_my_feature"`). story_id: String, /// Human-readable description of the failure. reason: String, }, /// An agent hit an API rate limit. /// Triggers a warning notification to configured chat rooms. RateLimitWarning { /// Work item ID the agent is working on. story_id: String, /// Name of the agent that hit the rate limit. agent_name: String, }, /// A story has been blocked (e.g. retry limit exceeded, empty diff). /// Triggers a warning notification to configured chat rooms. StoryBlocked { /// Work item ID (e.g. `"42_story_my_feature"`). story_id: String, /// Human-readable reason the story was blocked. reason: String, }, /// An agent hit a hard API rate limit and will be blocked until `reset_at`. /// Triggers auto-scheduling of a timer and a notification with the resume time. RateLimitHardBlock { /// Work item ID the agent is working on. story_id: String, /// Name of the agent that hit the hard rate limit. agent_name: String, /// UTC instant at which the rate limit resets. reset_at: chrono::DateTime, }, } /// Return `true` if `path` is the root-level `.huskies/project.toml` or /// `.huskies/agents.toml`, i.e. `{git_root}/.huskies/{project,agents}.toml`. /// /// Returns `false` for paths inside worktree directories (paths containing /// a `worktrees` component). pub fn is_config_file(path: &Path, git_root: &Path) -> bool { // Reject any path that passes through the worktrees directory. if path.components().any(|c| c.as_os_str() == "worktrees") { return false; } let huskies = git_root.join(".huskies"); path == huskies.join("project.toml") || path == huskies.join("agents.toml") } /// Map a pipeline directory name to a (action, commit-message-prefix) pair. /// /// Used by the CRDT-to-watcher bridge (in `main.rs`) to derive the action and /// commit message for `WatcherEvent::WorkItem` events. pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> { let (action, prefix) = match stage { "1_backlog" => ("create", format!("huskies: create {item_id}")), "2_current" => ("start", format!("huskies: start {item_id}")), "3_qa" => ("qa", format!("huskies: queue {item_id} for QA")), "4_merge" => ("merge", format!("huskies: queue {item_id} for merge")), "5_done" => ("done", format!("huskies: done {item_id}")), "6_archived" => ("accept", format!("huskies: accept {item_id}")), _ => return None, }; Some((action, prefix)) } /// Return the pipeline stage name for a path if it is a `.md` file living /// directly inside one of the known work subdirectories, otherwise `None`. /// /// Explicitly returns `None` for any path under `.huskies/worktrees/` so /// that code changes made by agents in their isolated worktrees are never /// auto-committed to master by the watcher. /// /// Retained for tests; no longer called in production (CRDT drives events). #[cfg(test)] fn stage_for_path(path: &Path) -> Option { // Reject any path that passes through the worktrees directory. if path.components().any(|c| c.as_os_str() == "worktrees") { return None; } if path.extension().is_none_or(|e| e != "md") { return None; } let stage = path .parent() .and_then(|p| p.file_name()) .and_then(|n| n.to_str())?; matches!( stage, "1_backlog" | "2_current" | "3_qa" | "4_merge" | "5_done" | "6_archived" ) .then(|| stage.to_string()) } /// Stage all changes in the work directory and commit with the given message. /// /// Uses `git add -A .huskies/work/` to catch both additions and deletions in /// a single commit. Returns `Ok(true)` if a commit was made, `Ok(false)` if /// there was nothing to commit, and `Err` for unexpected failures. /// /// Retained for tests; no longer called in production (CRDT drives events). #[cfg(test)] fn git_add_work_and_commit(git_root: &Path, message: &str) -> Result { let work_rel = PathBuf::from(".huskies").join("work"); let add_out = std::process::Command::new("git") .args(["add", "-A"]) .arg(&work_rel) .current_dir(git_root) .output() .map_err(|e| format!("git add: {e}"))?; if !add_out.status.success() { return Err(format!( "git add failed: {}", String::from_utf8_lossy(&add_out.stderr) )); } let commit_out = std::process::Command::new("git") .args(["commit", "-m", message]) .current_dir(git_root) .output() .map_err(|e| format!("git commit: {e}"))?; if commit_out.status.success() { return Ok(true); } let stderr = String::from_utf8_lossy(&commit_out.stderr); let stdout = String::from_utf8_lossy(&commit_out.stdout); if stdout.contains("nothing to commit") || stderr.contains("nothing to commit") { return Ok(false); } Err(format!("git commit failed: {stderr}")) } /// Stages that represent meaningful git checkpoints (creation and archival). /// Intermediate stages (current, qa, merge, done) are transient pipeline state /// that don't need to be committed — they're only relevant while the server is /// running and are broadcast to WebSocket clients for real-time UI updates. /// /// Retained for tests; no longer called in production (CRDT drives events). #[cfg(test)] const COMMIT_WORTHY_STAGES: &[&str] = &["1_backlog", "5_done", "6_archived"]; /// Return `true` if changes in `stage` should be committed to git. /// /// Retained for tests; no longer called in production (CRDT drives events). #[cfg(test)] fn should_commit_stage(stage: &str) -> bool { COMMIT_WORTHY_STAGES.contains(&stage) } /// Process a batch of pending (path → stage) entries: commit and broadcast. /// /// Only files that still exist on disk are used to derive the commit message /// (they represent the destination of a move or a new file). Deletions are /// captured by `git add -A .huskies/work/` automatically. /// /// Only terminal stages (`1_backlog` and `6_archived`) trigger git commits. /// All stages broadcast a [`WatcherEvent`] so the frontend stays in sync. /// /// Retained for tests; no longer called in production (CRDT drives events). #[cfg(test)] fn flush_pending( pending: &std::collections::HashMap, git_root: &Path, event_tx: &broadcast::Sender, ) { use crate::io::story_metadata::clear_front_matter_field; // Separate into files that exist (additions) vs gone (deletions). let mut additions: Vec<(&PathBuf, &str)> = Vec::new(); for (path, stage) in pending { if path.exists() { additions.push((path, stage.as_str())); } } // Pick the commit message from the first addition (the meaningful side of a move). // If there are only deletions, use a generic message. let (action, item_id, commit_msg) = if let Some((path, stage)) = additions.first() { let item = path .file_stem() .and_then(|s| s.to_str()) .unwrap_or("unknown"); if let Some((act, msg)) = stage_metadata(stage, item) { (act, item.to_string(), msg) } else { return; } } else { // Only deletions — pick any pending path for the item name. let Some((path, _)) = pending.iter().next() else { return; }; let item = path .file_stem() .and_then(|s| s.to_str()) .unwrap_or("unknown"); ( "remove", item.to_string(), format!("huskies: remove {item}"), ) }; // Strip stale merge_failure front matter from any story that has left 4_merge/. for (path, stage) in &additions { if *stage != "4_merge" && let Err(e) = clear_front_matter_field(path, "merge_failure") { slog!( "[watcher] Warning: could not clear merge_failure from {}: {e}", path.display() ); } } // Only commit for terminal stages; intermediate moves are broadcast-only. let dest_stage = additions.first().map_or("unknown", |(_, s)| *s); let should_commit = should_commit_stage(dest_stage); if should_commit { slog!("[watcher] flush: {commit_msg}"); match git_add_work_and_commit(git_root, &commit_msg) { Ok(committed) => { if committed { slog!("[watcher] committed: {commit_msg}"); } else { slog!("[watcher] skipped (already committed): {commit_msg}"); } } Err(e) => { slog!("[watcher] git error: {e}"); return; } } } else { slog!("[watcher] flush (broadcast-only): {commit_msg}"); } // For move operations, find the source stage from deleted entries with matching item_id. let from_stage: Option = if !additions.is_empty() { pending .iter() .filter(|(path, _)| !path.exists()) .find(|(path, _)| { path.file_stem() .and_then(|s| s.to_str()) == Some(item_id.as_str()) }) .map(|(_, stage)| stage.clone()) } else { None }; // Always broadcast the event so connected WebSocket clients stay in sync. let evt = WatcherEvent::WorkItem { stage: dest_stage.to_string(), item_id, action: action.to_string(), commit_msg, from_stage, }; let _ = event_tx.send(evt); } /// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention /// duration to `6_archived` via CRDT state transitions. Also prunes worktrees /// for items already in `6_archived`. /// /// All state is read from CRDT — no filesystem access. fn sweep_done_to_archived(_work_dir: &Path, git_root: &Path, done_retention: Duration) { use crate::pipeline_state::{Stage, read_all_typed}; for item in read_all_typed() { match &item.stage { Stage::Done { merged_at, .. } => { let age = chrono::Utc::now() .signed_duration_since(*merged_at) .to_std() .unwrap_or_default(); if age >= done_retention { let story_id = &item.story_id.0; crate::db::move_item_stage(story_id, "6_archived", None); slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); if let Err(e) = crate::worktree::prune_worktree_sync(git_root, story_id) { slog!("[watcher] sweep: worktree prune failed for {story_id}: {e}"); } } } Stage::Archived { .. } => { // Prune stale worktrees for archived items. let story_id = &item.story_id.0; if let Err(e) = crate::worktree::prune_worktree_sync(git_root, story_id) { slog!("[watcher] sweep: worktree prune failed for {story_id}: {e}"); } } _ => {} } } } /// Start the filesystem watcher on a dedicated OS thread. /// /// Watches `.huskies/project.toml` and `.huskies/agents.toml` for config /// hot-reload, and periodically sweeps `5_done/` → `6_archived/`. /// /// Work-item pipeline events (stage transitions) are no longer driven by /// filesystem events — they originate from CRDT state changes via /// [`crate::crdt_state::subscribe`]. /// /// `work_dir` — absolute path to `.huskies/work/` (used for sweep only). /// `git_root` — project root (passed to `git` commands and config loading). /// `event_tx` — broadcast sender for `ConfigChanged` events. /// `watcher_config` — initial sweep configuration loaded from `project.toml`. pub fn start_watcher( work_dir: PathBuf, git_root: PathBuf, event_tx: broadcast::Sender, watcher_config: WatcherConfig, ) { std::thread::spawn(move || { let (notify_tx, notify_rx) = mpsc::channel::>(); let mut watcher: RecommendedWatcher = match recommended_watcher(move |res| { let _ = notify_tx.send(res); }) { Ok(w) => w, Err(e) => { slog!("[watcher] failed to create watcher: {e}"); return; } }; // Watch config files for hot-reload. Work-item directories are NOT // watched — CRDT state transitions drive pipeline events now. let huskies = git_root.join(".huskies"); for config_file in [huskies.join("project.toml"), huskies.join("agents.toml")] { if config_file.exists() && let Err(e) = watcher.watch(&config_file, RecursiveMode::NonRecursive) { slog!( "[watcher] failed to watch config file {}: {e}", config_file.display() ); } } slog!("[watcher] watching config files and running sweep timer"); const DEBOUNCE: Duration = Duration::from_millis(300); // Mutable sweep config — hot-reloaded when project.toml changes. let mut sweep_interval = Duration::from_secs(watcher_config.sweep_interval_secs); let mut done_retention = Duration::from_secs(watcher_config.done_retention_secs); slog!( "[watcher] sweep_interval={}s done_retention={}s", watcher_config.sweep_interval_secs, watcher_config.done_retention_secs ); // Whether a config file change is pending in the current debounce window. let mut config_changed_pending = false; let mut deadline: Option = None; // Track when we last swept 5_done/ → 6_archived/. // Initialise to "now minus interval" so the first sweep runs on startup. let mut last_sweep = Instant::now() .checked_sub(sweep_interval) .unwrap_or_else(Instant::now); loop { // How long until the debounce window closes (or wait for next event). let timeout = deadline.map_or(Duration::from_secs(60), |d| { d.saturating_duration_since(Instant::now()) }); let flush = match notify_rx.recv_timeout(timeout) { Ok(Ok(event)) => { let is_relevant_kind = matches!( event.kind, EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) ); if is_relevant_kind { for path in event.paths { if is_config_file(&path, &git_root) { slog!("[watcher] config change detected: {}", path.display()); config_changed_pending = true; deadline = Some(Instant::now() + DEBOUNCE); } // Work-item file changes are intentionally ignored. // CRDT state transitions handle pipeline events. } } false } Ok(Err(e)) => { slog!("[watcher] notify error: {e}"); false } // Debounce window expired — time to flush. Err(mpsc::RecvTimeoutError::Timeout) => true, Err(mpsc::RecvTimeoutError::Disconnected) => { slog!("[watcher] channel disconnected, shutting down"); break; } }; if flush { if config_changed_pending { slog!("[watcher] broadcasting agent_config_changed"); let _ = event_tx.send(WatcherEvent::ConfigChanged); // Hot-reload sweep config from project.toml. match ProjectConfig::load(&git_root) { Ok(cfg) => { let new_sweep = Duration::from_secs(cfg.watcher.sweep_interval_secs); let new_retention = Duration::from_secs(cfg.watcher.done_retention_secs); if new_sweep != sweep_interval || new_retention != done_retention { slog!( "[watcher] hot-reload: sweep_interval={}s done_retention={}s", cfg.watcher.sweep_interval_secs, cfg.watcher.done_retention_secs ); sweep_interval = new_sweep; done_retention = new_retention; } } Err(e) => { slog!("[watcher] hot-reload: failed to parse config: {e}"); } } config_changed_pending = false; } deadline = None; // Periodically promote old items from 5_done/ to 6_archived/. let now = Instant::now(); if now.duration_since(last_sweep) >= sweep_interval { last_sweep = now; sweep_done_to_archived(&work_dir, &git_root, done_retention); } } } }); } #[cfg(test)] mod tests { use super::*; use std::collections::HashMap; use std::fs; use tempfile::TempDir; /// Initialise a minimal git repo so commit operations work. fn init_git_repo(dir: &std::path::Path) { use std::process::Command; Command::new("git") .args(["init"]) .current_dir(dir) .output() .expect("git init"); Command::new("git") .args(["config", "user.email", "test@example.com"]) .current_dir(dir) .output() .expect("git config email"); Command::new("git") .args(["config", "user.name", "Test"]) .current_dir(dir) .output() .expect("git config name"); Command::new("git") .args(["commit", "--allow-empty", "-m", "init"]) .current_dir(dir) .output() .expect("git initial commit"); } /// Create the `.huskies/work/{stage}/` dir tree inside `root`. fn make_stage_dir(root: &std::path::Path, stage: &str) -> PathBuf { let dir = root.join(".huskies").join("work").join(stage); fs::create_dir_all(&dir).expect("create stage dir"); dir } // ── git_add_work_and_commit ─────────────────────────────────────────────── #[test] fn git_commit_returns_true_when_file_added() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "2_current"); fs::write(stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n").unwrap(); let result = git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo"); assert_eq!( result, Ok(true), "should return Ok(true) when a commit was made" ); } #[test] fn git_commit_returns_false_when_nothing_to_commit() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "2_current"); fs::write(stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n").unwrap(); // First commit — should succeed. git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo").unwrap(); // Second call with no changes — should return Ok(false). let result = git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo"); assert_eq!( result, Ok(false), "should return Ok(false) when nothing to commit" ); } // ── flush_pending ───────────────────────────────────────────────────────── #[test] fn flush_pending_commits_and_broadcasts_for_terminal_stage() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "1_backlog"); let story_path = stage_dir.join("42_story_foo.md"); fs::write(&story_path, "---\nname: test\n---\n").unwrap(); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path, "1_backlog".to_string()); flush_pending(&pending, tmp.path(), &tx); let evt = rx.try_recv().expect("expected a broadcast event"); match evt { WatcherEvent::WorkItem { stage, item_id, action, commit_msg, .. } => { assert_eq!(stage, "1_backlog"); assert_eq!(item_id, "42_story_foo"); assert_eq!(action, "create"); assert_eq!(commit_msg, "huskies: create 42_story_foo"); } other => panic!("unexpected event: {other:?}"), } // Verify the file was actually committed. let log = std::process::Command::new("git") .args(["log", "--oneline", "-1"]) .current_dir(tmp.path()) .output() .expect("git log"); let log_msg = String::from_utf8_lossy(&log.stdout); assert!( log_msg.contains("huskies: create 42_story_foo"), "terminal stage should produce a git commit" ); } #[test] fn flush_pending_broadcasts_without_commit_for_intermediate_stage() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "2_current"); let story_path = stage_dir.join("42_story_foo.md"); fs::write(&story_path, "---\nname: test\n---\n").unwrap(); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path, "2_current".to_string()); flush_pending(&pending, tmp.path(), &tx); // Event should still be broadcast for frontend sync. let evt = rx.try_recv().expect("expected a broadcast event"); match evt { WatcherEvent::WorkItem { stage, item_id, action, commit_msg, .. } => { assert_eq!(stage, "2_current"); assert_eq!(item_id, "42_story_foo"); assert_eq!(action, "start"); assert_eq!(commit_msg, "huskies: start 42_story_foo"); } other => panic!("unexpected event: {other:?}"), } // Verify NO git commit was made (only the initial empty commit should exist). let log = std::process::Command::new("git") .args(["log", "--oneline"]) .current_dir(tmp.path()) .output() .expect("git log"); let log_msg = String::from_utf8_lossy(&log.stdout); assert!( !log_msg.contains("huskies:"), "intermediate stage should NOT produce a git commit" ); } #[test] fn flush_pending_broadcasts_for_all_pipeline_stages() { let stages = [ ("1_backlog", "create", "huskies: create 10_story_x"), ("3_qa", "qa", "huskies: queue 10_story_x for QA"), ("4_merge", "merge", "huskies: queue 10_story_x for merge"), ("5_done", "done", "huskies: done 10_story_x"), ("6_archived", "accept", "huskies: accept 10_story_x"), ]; for (stage, expected_action, expected_msg) in stages { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), stage); let story_path = stage_dir.join("10_story_x.md"); fs::write(&story_path, "---\nname: test\n---\n").unwrap(); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path, stage.to_string()); flush_pending(&pending, tmp.path(), &tx); // All stages should broadcast events regardless of commit behavior. let evt = rx.try_recv().expect("expected broadcast for stage {stage}"); match evt { WatcherEvent::WorkItem { action, commit_msg, .. } => { assert_eq!(action, expected_action, "stage {stage}"); assert_eq!(commit_msg, expected_msg, "stage {stage}"); } other => panic!("unexpected event for stage {stage}: {other:?}"), } } } #[test] fn flush_pending_deletion_only_broadcasts_remove_event() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); // Create the work dir tree but NOT the file (simulates a deletion). make_stage_dir(tmp.path(), "2_current"); let deleted_path = tmp .path() .join(".huskies") .join("work") .join("2_current") .join("42_story_foo.md"); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(deleted_path, "2_current".to_string()); flush_pending(&pending, tmp.path(), &tx); // Even when nothing was committed (file never existed), an event is broadcast. let evt = rx .try_recv() .expect("expected a broadcast event for deletion"); match evt { WatcherEvent::WorkItem { action, item_id, .. } => { assert_eq!(action, "remove"); assert_eq!(item_id, "42_story_foo"); } other => panic!("unexpected event: {other:?}"), } } #[test] fn flush_pending_skips_unknown_stage_for_addition() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); // File sits in an unrecognised directory. let unknown_dir = tmp.path().join(".huskies").join("work").join("9_unknown"); fs::create_dir_all(&unknown_dir).unwrap(); let path = unknown_dir.join("42_story_foo.md"); fs::write(&path, "---\nname: test\n---\n").unwrap(); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(path, "9_unknown".to_string()); flush_pending(&pending, tmp.path(), &tx); // No event should be broadcast because stage_metadata returns None for unknown stages. assert!( rx.try_recv().is_err(), "no event should be broadcast for unknown stage" ); } #[test] fn flush_pending_empty_pending_does_nothing() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); make_stage_dir(tmp.path(), "2_current"); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let pending: HashMap = HashMap::new(); // Should not panic and should not broadcast anything. flush_pending(&pending, tmp.path(), &tx); assert!(rx.try_recv().is_err(), "no event for empty pending map"); } // ── flush_pending clears merge_failure ───────────────────────────────────── #[test] fn flush_pending_clears_merge_failure_when_leaving_merge_stage() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "2_current"); let story_path = stage_dir.join("50_story_retry.md"); fs::write( &story_path, "---\nname: Retry Story\nmerge_failure: \"conflicts detected\"\n---\n# Story\n", ) .unwrap(); let (tx, _rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path.clone(), "2_current".to_string()); flush_pending(&pending, tmp.path(), &tx); let contents = fs::read_to_string(&story_path).unwrap(); assert!( !contents.contains("merge_failure"), "merge_failure should be stripped when story lands in 2_current" ); assert!(contents.contains("name: Retry Story")); } #[test] fn flush_pending_clears_merge_failure_when_moving_to_backlog() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "1_backlog"); let story_path = stage_dir.join("51_story_reset.md"); fs::write( &story_path, "---\nname: Reset Story\nmerge_failure: \"gate failed\"\n---\n# Story\n", ) .unwrap(); let (tx, _rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path.clone(), "1_backlog".to_string()); flush_pending(&pending, tmp.path(), &tx); let contents = fs::read_to_string(&story_path).unwrap(); assert!( !contents.contains("merge_failure"), "merge_failure should be stripped when story lands in 1_backlog" ); } #[test] fn flush_pending_clears_merge_failure_when_moving_to_done() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "5_done"); let story_path = stage_dir.join("52_story_done.md"); fs::write( &story_path, "---\nname: Done Story\nmerge_failure: \"stale error\"\n---\n# Story\n", ) .unwrap(); let (tx, _rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path.clone(), "5_done".to_string()); flush_pending(&pending, tmp.path(), &tx); let contents = fs::read_to_string(&story_path).unwrap(); assert!( !contents.contains("merge_failure"), "merge_failure should be stripped when story lands in 5_done" ); } #[test] fn flush_pending_preserves_merge_failure_when_in_merge_stage() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "4_merge"); let story_path = stage_dir.join("53_story_merging.md"); fs::write( &story_path, "---\nname: Merging Story\nmerge_failure: \"conflicts\"\n---\n# Story\n", ) .unwrap(); let (tx, _rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path.clone(), "4_merge".to_string()); flush_pending(&pending, tmp.path(), &tx); let contents = fs::read_to_string(&story_path).unwrap(); assert!( contents.contains("merge_failure"), "merge_failure should be preserved when story is in 4_merge" ); } #[test] fn flush_pending_no_op_when_no_merge_failure() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "2_current"); let story_path = stage_dir.join("54_story_clean.md"); let original = "---\nname: Clean Story\n---\n# Story\n"; fs::write(&story_path, original).unwrap(); let (tx, _rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path.clone(), "2_current".to_string()); flush_pending(&pending, tmp.path(), &tx); let contents = fs::read_to_string(&story_path).unwrap(); assert_eq!( contents, original, "file without merge_failure should be unchanged" ); } // ── flush_pending from_stage ───────────────────────────────────────────── /// AC3: when a pending map contains both a deletion (source stage) and a /// creation (dest stage) for the same item_id, the broadcast event should /// have `from_stage` set to the source stage key. #[test] fn flush_pending_sets_from_stage_for_move_operations() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); // Destination exists (file moved here). let merge_dir = make_stage_dir(tmp.path(), "4_merge"); let merge_path = merge_dir.join("42_story_foo.md"); fs::write(&merge_path, "---\nname: test\n---\n").unwrap(); // Source path does NOT exist (file was moved away). make_stage_dir(tmp.path(), "3_qa"); let qa_path = tmp .path() .join(".huskies") .join("work") .join("3_qa") .join("42_story_foo.md"); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(merge_path, "4_merge".to_string()); // addition pending.insert(qa_path, "3_qa".to_string()); // deletion flush_pending(&pending, tmp.path(), &tx); let evt = rx.try_recv().expect("expected event"); match evt { WatcherEvent::WorkItem { stage, from_stage, .. } => { assert_eq!(stage, "4_merge"); assert_eq!(from_stage, Some("3_qa".to_string())); } other => panic!("unexpected event: {other:?}"), } } /// AC3: when a pending map has only an addition (creation, not a move), /// `from_stage` should be `None`. #[test] fn flush_pending_sets_from_stage_to_none_for_creations() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "2_current"); let story_path = stage_dir.join("55_story_new.md"); fs::write(&story_path, "---\nname: New Story\n---\n").unwrap(); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path, "2_current".to_string()); flush_pending(&pending, tmp.path(), &tx); let evt = rx.try_recv().expect("expected event"); match evt { WatcherEvent::WorkItem { from_stage, .. } => { assert_eq!(from_stage, None, "creation should have no from_stage"); } other => panic!("unexpected event: {other:?}"), } } // ── stage_for_path (additional edge cases) ──────────────────────────────── #[test] fn stage_for_path_recognises_pipeline_dirs() { let base = PathBuf::from("/proj/.huskies/work"); assert_eq!( stage_for_path(&base.join("2_current/42_story_foo.md")), Some("2_current".to_string()) ); assert_eq!( stage_for_path(&base.join("5_done/10_bug_bar.md")), Some("5_done".to_string()) ); assert_eq!( stage_for_path(&base.join("6_archived/10_bug_bar.md")), Some("6_archived".to_string()) ); assert_eq!(stage_for_path(&base.join("other/file.md")), None); assert_eq!( stage_for_path(&base.join("2_current/42_story_foo.txt")), None ); } #[test] fn stage_for_path_ignores_worktree_paths() { let worktrees = PathBuf::from("/proj/.huskies/worktrees"); // Code changes inside a worktree must be ignored. assert_eq!( stage_for_path(&worktrees.join("42_story_foo/server/src/main.rs")), None, ); // Even if a worktree happens to contain a path component that looks // like a pipeline stage, it must still be ignored. assert_eq!( stage_for_path(&worktrees.join("42_story_foo/.huskies/work/2_current/42_story_foo.md")), None, ); // A path that only contains the word "worktrees" as part of a longer // segment (not an exact component) must NOT be filtered out. assert_eq!( stage_for_path(&PathBuf::from( "/proj/.huskies/work/2_current/not_worktrees_story.md" )), Some("2_current".to_string()), ); } #[test] fn should_commit_stage_only_for_terminal_stages() { // Terminal stages — should commit. assert!(should_commit_stage("1_backlog")); assert!(should_commit_stage("5_done")); assert!(should_commit_stage("6_archived")); // Intermediate stages — broadcast-only, no commit. assert!(!should_commit_stage("2_current")); assert!(!should_commit_stage("3_qa")); assert!(!should_commit_stage("4_merge")); // Unknown — no commit. assert!(!should_commit_stage("unknown")); } #[test] fn stage_metadata_returns_correct_actions() { let (action, msg) = stage_metadata("2_current", "42_story_foo").unwrap(); assert_eq!(action, "start"); assert_eq!(msg, "huskies: start 42_story_foo"); let (action, msg) = stage_metadata("5_done", "42_story_foo").unwrap(); assert_eq!(action, "done"); assert_eq!(msg, "huskies: done 42_story_foo"); let (action, msg) = stage_metadata("6_archived", "42_story_foo").unwrap(); assert_eq!(action, "accept"); assert_eq!(msg, "huskies: accept 42_story_foo"); assert!(stage_metadata("unknown", "id").is_none()); } #[test] fn is_config_file_identifies_root_project_toml() { let git_root = PathBuf::from("/proj"); let config = git_root.join(".huskies").join("project.toml"); assert!(is_config_file(&config, &git_root)); } #[test] fn is_config_file_identifies_root_agents_toml() { let git_root = PathBuf::from("/proj"); let agents = git_root.join(".huskies").join("agents.toml"); assert!(is_config_file(&agents, &git_root)); } #[test] fn is_config_file_rejects_worktree_copies() { let git_root = PathBuf::from("/proj"); // project.toml inside a worktree must NOT be treated as the root config. let worktree_config = PathBuf::from("/proj/.huskies/worktrees/42_story_foo/.huskies/project.toml"); assert!(!is_config_file(&worktree_config, &git_root)); } #[test] fn is_config_file_rejects_other_files() { let git_root = PathBuf::from("/proj"); // Random files must not match. assert!(!is_config_file( &PathBuf::from("/proj/.huskies/work/2_current/42_story_foo.md"), &git_root )); assert!(!is_config_file( &PathBuf::from("/proj/.huskies/README.md"), &git_root )); } #[test] fn is_config_file_rejects_wrong_root() { let git_root = PathBuf::from("/proj"); let other_root_config = PathBuf::from("/other/.huskies/project.toml"); assert!(!is_config_file(&other_root_config, &git_root)); } // ── sweep_done_to_archived (CRDT-based) ───────────────────────────────── // // The sweep function now reads from `read_all_typed()` and checks // `Stage::Done { merged_at, .. }`. Items created via // `write_item_with_content("5_done")` project `merged_at = Utc::now()`, // so we test with Duration::ZERO to sweep immediately and with a long // retention to verify items are kept. #[test] fn sweep_moves_old_items_to_archived() { let tmp = TempDir::new().unwrap(); crate::db::ensure_content_store(); crate::db::write_item_with_content( "9880_story_sweep_old", "5_done", "---\nname: old\n---\n", ); // With ZERO retention, any Done item should be swept. sweep_done_to_archived( &tmp.path().join(".huskies/work"), tmp.path(), Duration::ZERO, ); // Verify the item was moved to 6_archived in the CRDT. let items = crate::pipeline_state::read_all_typed(); let item = items.iter().find(|i| i.story_id.0 == "9880_story_sweep_old"); assert!( item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), "item should be archived after sweep" ); } #[test] fn sweep_keeps_recent_items_in_done() { let tmp = TempDir::new().unwrap(); crate::db::ensure_content_store(); crate::db::write_item_with_content( "9881_story_sweep_new", "5_done", "---\nname: new\n---\n", ); // With a very long retention, the item (merged_at ≈ now) should stay. sweep_done_to_archived( &tmp.path().join(".huskies/work"), tmp.path(), Duration::from_secs(999_999), ); let items = crate::pipeline_state::read_all_typed(); let item = items.iter().find(|i| i.story_id.0 == "9881_story_sweep_new"); assert!( item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })), "item should remain in Done with long retention" ); } #[test] fn sweep_respects_custom_retention() { let tmp = TempDir::new().unwrap(); crate::db::ensure_content_store(); crate::db::write_item_with_content( "9882_story_sweep_custom", "5_done", "---\nname: custom\n---\n", ); // With ZERO retention, sweep should promote. sweep_done_to_archived( &tmp.path().join(".huskies/work"), tmp.path(), Duration::ZERO, ); let items = crate::pipeline_state::read_all_typed(); let item = items.iter().find(|i| i.story_id.0 == "9882_story_sweep_custom"); assert!( item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), "item should be archived with zero retention" ); } // ── sweep worktree pruning ───────────────────────────────────────────── /// Helper: create a real git worktree at `wt_path` on a new branch. fn create_git_worktree(git_root: &std::path::Path, wt_path: &std::path::Path, branch: &str) { use std::process::Command; let _ = Command::new("git") .args(["branch", branch]) .current_dir(git_root) .output(); Command::new("git") .args(["worktree", "add", &wt_path.to_string_lossy(), branch]) .current_dir(git_root) .output() .expect("git worktree add"); } #[test] fn sweep_prunes_worktree_when_story_promoted_to_archived() { let tmp = TempDir::new().unwrap(); let git_root = tmp.path().to_path_buf(); init_git_repo(&git_root); crate::db::ensure_content_store(); let story_id = "9883_story_prune_on_promote"; crate::db::write_item_with_content( story_id, "5_done", "---\nname: test\n---\n", ); // Create a real git worktree for this story. let wt_path = crate::worktree::worktree_path(&git_root, story_id); fs::create_dir_all(wt_path.parent().unwrap()).unwrap(); create_git_worktree(&git_root, &wt_path, &format!("feature/story-{story_id}")); assert!(wt_path.exists(), "worktree must exist before sweep"); sweep_done_to_archived( &git_root.join(".huskies/work"), &git_root, Duration::ZERO, ); // Story must be archived in CRDT. let items = crate::pipeline_state::read_all_typed(); let item = items.iter().find(|i| i.story_id.0 == story_id); assert!( item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), "story should be archived" ); // Worktree must be removed. assert!( !wt_path.exists(), "worktree should be removed after archiving" ); } #[test] fn sweep_prunes_worktrees_for_already_archived_stories() { let tmp = TempDir::new().unwrap(); let git_root = tmp.path().to_path_buf(); init_git_repo(&git_root); crate::db::ensure_content_store(); let story_id = "9884_story_stale_worktree"; crate::db::write_item_with_content( story_id, "6_archived", "---\nname: stale\n---\n", ); // Create a real git worktree that was never cleaned up. let wt_path = crate::worktree::worktree_path(&git_root, story_id); fs::create_dir_all(wt_path.parent().unwrap()).unwrap(); create_git_worktree(&git_root, &wt_path, &format!("feature/story-{story_id}")); assert!(wt_path.exists(), "stale worktree must exist before sweep"); sweep_done_to_archived( &git_root.join(".huskies/work"), &git_root, Duration::from_secs(999_999), ); // Stale worktree should be pruned. assert!( !wt_path.exists(), "stale worktree should be pruned by sweep" ); } #[test] fn sweep_archives_story_even_when_worktree_removal_fails() { let tmp = TempDir::new().unwrap(); let git_root = tmp.path().to_path_buf(); init_git_repo(&git_root); crate::db::ensure_content_store(); let story_id = "9885_story_fake_worktree"; crate::db::write_item_with_content( story_id, "5_done", "---\nname: test\n---\n", ); // Create a plain directory at the expected worktree path — not a real // git worktree, so `git worktree remove` will fail. let wt_path = crate::worktree::worktree_path(&git_root, story_id); fs::create_dir_all(&wt_path).unwrap(); sweep_done_to_archived( &git_root.join(".huskies/work"), &git_root, Duration::ZERO, ); // Story must be archived in CRDT despite worktree removal failure. let items = crate::pipeline_state::read_all_typed(); let item = items.iter().find(|i| i.story_id.0 == story_id); assert!( item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), "story should be archived even when worktree removal fails" ); } }