From 574df48ff3bf3e89a5f877dd8edb0831705db66c Mon Sep 17 00:00:00 2001 From: dave Date: Mon, 27 Apr 2026 21:55:04 +0000 Subject: [PATCH] huskies: merge 686_refactor_decompose_server_src_io_watcher_rs_1202_lines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Manual merge resolution: feature branch deleted watcher.rs and split into watcher/{mod,events,sweep,tests}.rs, while master modified the old watcher.rs (738's FS-shadow stripping). The auto-resolver kept both, producing an ambiguous-module compile error. Resolution: drop watcher.rs (feature's delete wins). The new watcher/mod.rs absorbs the FS-shadow code semantically — gates pass (cargo check, clippy --all-targets -D warnings, fmt --check, 29/29 io::watcher tests). Co-Authored-By: Claude Opus 4.7 (1M context) --- server/src/io/watcher.rs | 473 --------------------- server/src/io/watcher/events.rs | 75 ++++ server/src/io/watcher/mod.rs | 362 ++++++++++++++++ server/src/io/watcher/sweep.rs | 48 +++ server/src/io/watcher/tests.rs | 730 ++++++++++++++++++++++++++++++++ 5 files changed, 1215 insertions(+), 473 deletions(-) delete mode 100644 server/src/io/watcher.rs create mode 100644 server/src/io/watcher/events.rs create mode 100644 server/src/io/watcher/mod.rs create mode 100644 server/src/io/watcher/sweep.rs create mode 100644 server/src/io/watcher/tests.rs diff --git a/server/src/io/watcher.rs b/server/src/io/watcher.rs deleted file mode 100644 index b26d214b..00000000 --- a/server/src/io/watcher.rs +++ /dev/null @@ -1,473 +0,0 @@ -//! Filesystem watcher for `.huskies/project.toml` and `.huskies/agents.toml`. -//! -//! Watches config files for hot-reload and broadcasts [`WatcherEvent::ConfigChanged`] -//! so the frontend can reload the agent roster without a server restart. -//! -//! Work-item pipeline events are driven by CRDT state transitions, not filesystem -//! events. The watcher also periodically sweeps `5_done/` → `6_archived/`. - -use crate::slog; -use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher}; -use serde::Serialize; -use std::path::{Path, PathBuf}; -use std::sync::mpsc; -use std::time::{Duration, Instant}; -use tokio::sync::broadcast; - -/// A lifecycle event emitted by the filesystem watcher. -#[derive(Clone, Debug, Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum WatcherEvent { - /// A work-pipeline file was created, modified, or deleted. - WorkItem { - /// Pipeline stage directory (e.g. `"2_current"`, `"5_archived"`). - stage: String, - /// Work item ID (filename stem without extension, e.g. `"42_story_my_feature"`). - item_id: String, - /// Semantic action inferred from the stage (e.g. `"start"`, `"accept"`). - action: String, - /// The deterministic git commit message used (or that would have been used). - commit_msg: String, - /// The pipeline stage the item moved FROM, populated for move operations. - /// `None` for creations, deletions, or synthetic events. - from_stage: Option, - }, - /// `.huskies/project.toml` was modified at the project root (not inside a worktree). - ConfigChanged, - /// An agent's state changed (started, stopped, completed, etc.). - /// Triggers a pipeline state refresh so the frontend can update agent - /// assignments without waiting for a filesystem event. - AgentStateChanged, - /// A story encountered a failure (e.g. merge failure). - /// Triggers an error notification to configured Matrix rooms. - MergeFailure { - /// Work item ID (e.g. `"42_story_my_feature"`). - story_id: String, - /// Human-readable description of the failure. - reason: String, - }, - /// An agent hit an API rate limit. - /// Triggers a warning notification to configured chat rooms. - RateLimitWarning { - /// Work item ID the agent is working on. - story_id: String, - /// Name of the agent that hit the rate limit. - agent_name: String, - }, - /// A story has been blocked (e.g. retry limit exceeded, empty diff). - /// Triggers a warning notification to configured chat rooms. - StoryBlocked { - /// Work item ID (e.g. `"42_story_my_feature"`). - story_id: String, - /// Human-readable reason the story was blocked. - reason: String, - }, - /// An agent hit a hard API rate limit and will be blocked until `reset_at`. - /// Triggers auto-scheduling of a timer and a notification with the resume time. - RateLimitHardBlock { - /// Work item ID the agent is working on. - story_id: String, - /// Name of the agent that hit the hard rate limit. - agent_name: String, - /// UTC instant at which the rate limit resets. - reset_at: chrono::DateTime, - }, - /// An OAuth account pool swap succeeded: a different account is now active. - /// Triggers a notification to chat transports naming the new account. - OAuthAccountSwapped { - /// Email address of the newly activated account. - new_email: String, - }, - /// All OAuth accounts in the pool are rate-limited — no swap was possible. - /// Triggers a notification to chat transports with the earliest reset time. - OAuthAccountsExhausted { - /// Human-readable message describing when the earliest reset occurs. - earliest_reset_msg: String, - }, -} - -/// Return `true` if `path` is the root-level `.huskies/project.toml` or -/// `.huskies/agents.toml`, i.e. `{git_root}/.huskies/{project,agents}.toml`. -/// -/// Returns `false` for paths inside worktree directories (paths containing -/// a `worktrees` component). -pub fn is_config_file(path: &Path, git_root: &Path) -> bool { - // Reject any path that passes through the worktrees directory. - if path.components().any(|c| c.as_os_str() == "worktrees") { - return false; - } - let huskies = git_root.join(".huskies"); - path == huskies.join("project.toml") || path == huskies.join("agents.toml") -} - -/// Map a pipeline directory name to a (action, commit-message-prefix) pair. -/// -/// Used by the CRDT-to-watcher bridge (in `main.rs`) to derive the action and -/// commit message for `WatcherEvent::WorkItem` events. -pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> { - use crate::pipeline_state::Stage; - let (action, msg) = match Stage::from_dir(stage)? { - Stage::Backlog => ("create", format!("huskies: create {item_id}")), - Stage::Coding => ("start", format!("huskies: start {item_id}")), - Stage::Qa => ("qa", format!("huskies: queue {item_id} for QA")), - Stage::Merge { .. } => ("merge", format!("huskies: queue {item_id} for merge")), - Stage::Done { .. } => ("done", format!("huskies: done {item_id}")), - Stage::Archived { .. } => ("accept", format!("huskies: accept {item_id}")), - }; - Some((action, msg)) -} - -/// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention -/// duration to `6_archived` via CRDT state transitions. -/// -/// All state is read from and written to CRDT — no filesystem access. -/// Worktree pruning is handled separately by the CRDT event subscriber. -pub(crate) fn sweep_done_to_archived(done_retention: Duration) { - use crate::pipeline_state::{PipelineEvent, Stage, read_all_typed, stage_dir_name, transition}; - - for item in read_all_typed() { - if let Stage::Done { merged_at, .. } = &item.stage { - let age = chrono::Utc::now() - .signed_duration_since(*merged_at) - .to_std() - .unwrap_or_default(); - if age >= done_retention { - let story_id = item.story_id.0.clone(); - match transition(item.stage.clone(), PipelineEvent::Accepted) { - Ok(new_stage) => { - crate::crdt_state::write_item( - &story_id, - stage_dir_name(&new_stage), - None, - None, - None, - Some(false), - None, - None, - None, - None, - ); - slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); - } - Err(e) => { - slog!("[watcher] sweep: transition error for {story_id}: {e}"); - } - } - } - } - } -} - -/// Start the filesystem watcher on a dedicated OS thread. -/// -/// Watches `.huskies/project.toml` and `.huskies/agents.toml` for config -/// hot-reload, and periodically sweeps `5_done/` → `6_archived/` via CRDT. -/// -/// Work-item pipeline events (stage transitions) are no longer driven by -/// filesystem events — they originate from CRDT state changes via -/// [`crate::crdt_state::subscribe`]. -/// -/// `git_root` — project root (passed to `git` commands and config loading). -/// `event_tx` — broadcast sender for `ConfigChanged` events. -/// `watcher_config` — initial sweep configuration loaded from `project.toml`. -pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender) { - std::thread::spawn(move || { - let (notify_tx, notify_rx) = mpsc::channel::>(); - - let mut watcher: RecommendedWatcher = match recommended_watcher(move |res| { - let _ = notify_tx.send(res); - }) { - Ok(w) => w, - Err(e) => { - slog!("[watcher] failed to create watcher: {e}"); - return; - } - }; - - // Watch config files for hot-reload. Work-item directories are NOT - // watched — CRDT state transitions drive pipeline events now. - let huskies = git_root.join(".huskies"); - for config_file in [huskies.join("project.toml"), huskies.join("agents.toml")] { - if config_file.exists() - && let Err(e) = watcher.watch(&config_file, RecursiveMode::NonRecursive) - { - slog!( - "[watcher] failed to watch config file {}: {e}", - config_file.display() - ); - } - } - - slog!("[watcher] watching config files for hot-reload"); - - const DEBOUNCE: Duration = Duration::from_millis(300); - - // Whether a config file change is pending in the current debounce window. - let mut config_changed_pending = false; - let mut deadline: Option = None; - - loop { - // How long until the debounce window closes (or wait for next event). - let timeout = deadline.map_or(Duration::from_secs(60), |d| { - d.saturating_duration_since(Instant::now()) - }); - - let flush = match notify_rx.recv_timeout(timeout) { - Ok(Ok(event)) => { - let is_relevant_kind = matches!( - event.kind, - EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) - ); - - if is_relevant_kind { - for path in event.paths { - if is_config_file(&path, &git_root) { - slog!("[watcher] config change detected: {}", path.display()); - config_changed_pending = true; - deadline = Some(Instant::now() + DEBOUNCE); - } - // Work-item file changes are intentionally ignored. - // CRDT state transitions handle pipeline events. - } - } - false - } - Ok(Err(e)) => { - slog!("[watcher] notify error: {e}"); - false - } - // Debounce window expired — time to flush. - Err(mpsc::RecvTimeoutError::Timeout) => true, - Err(mpsc::RecvTimeoutError::Disconnected) => { - slog!("[watcher] channel disconnected, shutting down"); - break; - } - }; - - if flush { - if config_changed_pending { - slog!("[watcher] broadcasting agent_config_changed"); - let _ = event_tx.send(WatcherEvent::ConfigChanged); - - config_changed_pending = false; - } - deadline = None; - } - } - }); -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::Duration; - - #[test] - fn stage_metadata_returns_correct_actions() { - let (action, msg) = stage_metadata("2_current", "42_story_foo").unwrap(); - assert_eq!(action, "start"); - assert_eq!(msg, "huskies: start 42_story_foo"); - - let (action, msg) = stage_metadata("5_done", "42_story_foo").unwrap(); - assert_eq!(action, "done"); - assert_eq!(msg, "huskies: done 42_story_foo"); - - let (action, msg) = stage_metadata("6_archived", "42_story_foo").unwrap(); - assert_eq!(action, "accept"); - assert_eq!(msg, "huskies: accept 42_story_foo"); - - assert!(stage_metadata("unknown", "id").is_none()); - } - - #[test] - fn is_config_file_identifies_root_project_toml() { - let git_root = PathBuf::from("/proj"); - let config = git_root.join(".huskies").join("project.toml"); - assert!(is_config_file(&config, &git_root)); - } - - #[test] - fn is_config_file_identifies_root_agents_toml() { - let git_root = PathBuf::from("/proj"); - let agents = git_root.join(".huskies").join("agents.toml"); - assert!(is_config_file(&agents, &git_root)); - } - - #[test] - fn is_config_file_rejects_worktree_copies() { - let git_root = PathBuf::from("/proj"); - // project.toml inside a worktree must NOT be treated as the root config. - let worktree_config = - PathBuf::from("/proj/.huskies/worktrees/42_story_foo/.huskies/project.toml"); - assert!(!is_config_file(&worktree_config, &git_root)); - } - - #[test] - fn is_config_file_rejects_other_files() { - let git_root = PathBuf::from("/proj"); - // Random files must not match. - assert!(!is_config_file( - &PathBuf::from("/proj/.huskies/work/2_current/42_story_foo.md"), - &git_root - )); - assert!(!is_config_file( - &PathBuf::from("/proj/.huskies/README.md"), - &git_root - )); - } - - #[test] - fn is_config_file_rejects_wrong_root() { - let git_root = PathBuf::from("/proj"); - let other_root_config = PathBuf::from("/other/.huskies/project.toml"); - assert!(!is_config_file(&other_root_config, &git_root)); - } - - // ── sweep_done_to_archived (CRDT-based) ───────────────────────────────── - // - // The sweep function reads from `read_all_typed()` and checks - // `Stage::Done { merged_at, .. }`. Items created via - // `write_item_with_content("5_done")` project `merged_at = Utc::now()`, - // so we test with Duration::ZERO to sweep immediately and with a long - // retention to verify items are kept. No filesystem access is involved. - - #[test] - fn sweep_moves_old_items_to_archived() { - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "9880_story_sweep_old", - "5_done", - "---\nname: old\n---\n", - ); - - // With ZERO retention, any Done item should be swept. - sweep_done_to_archived(Duration::ZERO); - - // Verify the item was moved to 6_archived in the CRDT. - let items = crate::pipeline_state::read_all_typed(); - let item = items - .iter() - .find(|i| i.story_id.0 == "9880_story_sweep_old"); - assert!( - item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), - "item should be archived after sweep" - ); - } - - #[test] - fn sweep_keeps_recent_items_in_done() { - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "9881_story_sweep_new", - "5_done", - "---\nname: new\n---\n", - ); - - // With a very long retention, the item (merged_at ≈ now) should stay. - sweep_done_to_archived(Duration::from_secs(999_999)); - - let items = crate::pipeline_state::read_all_typed(); - let item = items - .iter() - .find(|i| i.story_id.0 == "9881_story_sweep_new"); - assert!( - item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })), - "item should remain in Done with long retention" - ); - } - - #[test] - fn sweep_respects_custom_retention() { - crate::db::ensure_content_store(); - crate::db::write_item_with_content( - "9882_story_sweep_custom", - "5_done", - "---\nname: custom\n---\n", - ); - - // With ZERO retention, sweep should promote. - sweep_done_to_archived(Duration::ZERO); - - let items = crate::pipeline_state::read_all_typed(); - let item = items - .iter() - .find(|i| i.story_id.0 == "9882_story_sweep_custom"); - assert!( - item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), - "item should be archived with zero retention" - ); - } - - /// Prove that the sweep reads `merged_at` from the CRDT (not `Utc::now()`). - /// - /// This test sets `merged_at` to 10 seconds in the past and uses a 5-second - /// retention. If the sweep were still using `Utc::now()` as the start time - /// (the original bug), the elapsed time would be ~0 and the item would NOT - /// be swept. With the fix, the item is swept because 10s > 5s retention. - #[test] - fn sweep_uses_crdt_merged_at_not_utc_now() { - crate::db::ensure_content_store(); - - let ten_seconds_ago = - (chrono::Utc::now() - chrono::Duration::seconds(10)).timestamp() as f64; - - // Write item in 5_done with an explicit past merged_at timestamp. - crate::crdt_state::write_item( - "9883_story_sweep_merged_at", - "5_done", - Some("merged_at test"), - None, - None, - None, - None, - None, - None, - Some(ten_seconds_ago), - ); - - // 5-second retention: item is 10s old → should be swept. - sweep_done_to_archived(Duration::from_secs(5)); - - let items = crate::pipeline_state::read_all_typed(); - let item = items - .iter() - .find(|i| i.story_id.0 == "9883_story_sweep_merged_at"); - assert!( - item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), - "item with merged_at 10s ago should be archived with 5s retention" - ); - } - - /// Prove that an item with merged_at NEWER than done_retention is NOT swept. - #[test] - fn sweep_keeps_item_newer_than_retention() { - crate::db::ensure_content_store(); - - let one_second_ago = (chrono::Utc::now() - chrono::Duration::seconds(1)).timestamp() as f64; - - crate::crdt_state::write_item( - "9884_story_sweep_recent", - "5_done", - Some("recent merged_at test"), - None, - None, - None, - None, - None, - None, - Some(one_second_ago), - ); - - // 1-hour retention: item is only 1s old → should NOT be swept. - sweep_done_to_archived(Duration::from_secs(3600)); - - let items = crate::pipeline_state::read_all_typed(); - let item = items - .iter() - .find(|i| i.story_id.0 == "9884_story_sweep_recent"); - assert!( - item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })), - "item with merged_at 1s ago should stay in Done with 1-hour retention" - ); - } -} diff --git a/server/src/io/watcher/events.rs b/server/src/io/watcher/events.rs new file mode 100644 index 00000000..d2b0b08f --- /dev/null +++ b/server/src/io/watcher/events.rs @@ -0,0 +1,75 @@ +//! Watcher event types emitted to WebSocket clients and internal subscribers. + +use serde::Serialize; + +/// A lifecycle event emitted by the filesystem watcher. +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WatcherEvent { + /// A work-pipeline file was created, modified, or deleted. + WorkItem { + /// Pipeline stage directory (e.g. `"2_current"`, `"5_archived"`). + stage: String, + /// Work item ID (filename stem without extension, e.g. `"42_story_my_feature"`). + item_id: String, + /// Semantic action inferred from the stage (e.g. `"start"`, `"accept"`). + action: String, + /// The deterministic git commit message used (or that would have been used). + commit_msg: String, + /// The pipeline stage the item moved FROM, populated for move operations. + /// `None` for creations, deletions, or synthetic events. + from_stage: Option, + }, + /// `.huskies/project.toml` was modified at the project root (not inside a worktree). + ConfigChanged, + /// An agent's state changed (started, stopped, completed, etc.). + /// Triggers a pipeline state refresh so the frontend can update agent + /// assignments without waiting for a filesystem event. + AgentStateChanged, + /// A story encountered a failure (e.g. merge failure). + /// Triggers an error notification to configured Matrix rooms. + MergeFailure { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// Human-readable description of the failure. + reason: String, + }, + /// An agent hit an API rate limit. + /// Triggers a warning notification to configured chat rooms. + RateLimitWarning { + /// Work item ID the agent is working on. + story_id: String, + /// Name of the agent that hit the rate limit. + agent_name: String, + }, + /// A story has been blocked (e.g. retry limit exceeded, empty diff). + /// Triggers a warning notification to configured chat rooms. + StoryBlocked { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// Human-readable reason the story was blocked. + reason: String, + }, + /// An agent hit a hard API rate limit and will be blocked until `reset_at`. + /// Triggers auto-scheduling of a timer and a notification with the resume time. + RateLimitHardBlock { + /// Work item ID the agent is working on. + story_id: String, + /// Name of the agent that hit the hard rate limit. + agent_name: String, + /// UTC instant at which the rate limit resets. + reset_at: chrono::DateTime, + }, + /// An OAuth account pool swap succeeded: a different account is now active. + /// Triggers a notification to chat transports naming the new account. + OAuthAccountSwapped { + /// Email address of the newly activated account. + new_email: String, + }, + /// All OAuth accounts in the pool are rate-limited — no swap was possible. + /// Triggers a notification to chat transports with the earliest reset time. + OAuthAccountsExhausted { + /// Human-readable message describing when the earliest reset occurs. + earliest_reset_msg: String, + }, +} diff --git a/server/src/io/watcher/mod.rs b/server/src/io/watcher/mod.rs new file mode 100644 index 00000000..0c117e5a --- /dev/null +++ b/server/src/io/watcher/mod.rs @@ -0,0 +1,362 @@ +//! Filesystem watcher for `.huskies/project.toml` and `.huskies/agents.toml`. +//! +//! Watches config files for changes and broadcasts a [`WatcherEvent`] to all +//! connected WebSocket clients so the frontend can reload the agent roster +//! without a server restart. +//! +//! Work-item pipeline events (stage transitions) are driven by CRDT state +//! changes via [`crate::crdt_state::subscribe`], not by filesystem events. +//! +//! # Debouncing +//! Config-file events are buffered for 300 ms after the last activity to avoid +//! duplicate broadcasts when an editor writes multiple events in quick succession. +//! +//! # Submodules +//! - [`events`] — [`WatcherEvent`] enum definition. +//! - [`sweep`] — periodic sweep of `5_done` → `6_archived`. + +mod events; +mod sweep; + +pub use events::WatcherEvent; +pub(crate) use sweep::sweep_done_to_archived; + +use crate::slog; +use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher}; +use std::path::{Path, PathBuf}; +use std::sync::mpsc; +use std::time::{Duration, Instant}; +use tokio::sync::broadcast; + +/// Return `true` if `path` is the root-level `.huskies/project.toml` or +/// `.huskies/agents.toml`, i.e. `{git_root}/.huskies/{project,agents}.toml`. +/// +/// Returns `false` for paths inside worktree directories (paths containing +/// a `worktrees` component). +pub fn is_config_file(path: &Path, git_root: &Path) -> bool { + // Reject any path that passes through the worktrees directory. + if path.components().any(|c| c.as_os_str() == "worktrees") { + return false; + } + let huskies = git_root.join(".huskies"); + path == huskies.join("project.toml") || path == huskies.join("agents.toml") +} + +/// Map a pipeline directory name to a (action, commit-message-prefix) pair. +/// +/// Used by the CRDT-to-watcher bridge (in `main.rs`) to derive the action and +/// commit message for `WatcherEvent::WorkItem` events. +pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> { + use crate::pipeline_state::Stage; + let (action, msg) = match Stage::from_dir(stage)? { + Stage::Backlog => ("create", format!("huskies: create {item_id}")), + Stage::Coding => ("start", format!("huskies: start {item_id}")), + Stage::Qa => ("qa", format!("huskies: queue {item_id} for QA")), + Stage::Merge { .. } => ("merge", format!("huskies: queue {item_id} for merge")), + Stage::Done { .. } => ("done", format!("huskies: done {item_id}")), + Stage::Archived { .. } => ("accept", format!("huskies: accept {item_id}")), + }; + Some((action, msg)) +} + +/// Start the filesystem watcher on a dedicated OS thread. +/// +/// Watches `.huskies/project.toml` and `.huskies/agents.toml` for config +/// hot-reload, and periodically sweeps `5_done/` → `6_archived/` via CRDT. +/// +/// Work-item pipeline events (stage transitions) are no longer driven by +/// filesystem events — they originate from CRDT state changes via +/// [`crate::crdt_state::subscribe`]. +/// +/// `git_root` — project root (passed to `git` commands and config loading). +/// `event_tx` — broadcast sender for `ConfigChanged` events. +pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender) { + std::thread::spawn(move || { + let (notify_tx, notify_rx) = mpsc::channel::>(); + + let mut watcher: RecommendedWatcher = match recommended_watcher(move |res| { + let _ = notify_tx.send(res); + }) { + Ok(w) => w, + Err(e) => { + slog!("[watcher] failed to create watcher: {e}"); + return; + } + }; + + // Watch config files for hot-reload. Work-item directories are NOT + // watched — CRDT state transitions drive pipeline events now. + let huskies = git_root.join(".huskies"); + for config_file in [huskies.join("project.toml"), huskies.join("agents.toml")] { + if config_file.exists() + && let Err(e) = watcher.watch(&config_file, RecursiveMode::NonRecursive) + { + slog!( + "[watcher] failed to watch config file {}: {e}", + config_file.display() + ); + } + } + + slog!("[watcher] watching config files for hot-reload"); + + const DEBOUNCE: Duration = Duration::from_millis(300); + + // Whether a config file change is pending in the current debounce window. + let mut config_changed_pending = false; + let mut deadline: Option = None; + + loop { + // How long until the debounce window closes (or wait for next event). + let timeout = deadline.map_or(Duration::from_secs(60), |d| { + d.saturating_duration_since(Instant::now()) + }); + + let flush = match notify_rx.recv_timeout(timeout) { + Ok(Ok(event)) => { + let is_relevant_kind = matches!( + event.kind, + EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) + ); + + if is_relevant_kind { + for path in event.paths { + if is_config_file(&path, &git_root) { + slog!("[watcher] config change detected: {}", path.display()); + config_changed_pending = true; + deadline = Some(Instant::now() + DEBOUNCE); + } + // Work-item file changes are intentionally ignored. + // CRDT state transitions handle pipeline events. + } + } + false + } + Ok(Err(e)) => { + slog!("[watcher] notify error: {e}"); + false + } + // Debounce window expired — time to flush. + Err(mpsc::RecvTimeoutError::Timeout) => true, + Err(mpsc::RecvTimeoutError::Disconnected) => { + slog!("[watcher] channel disconnected, shutting down"); + break; + } + }; + + if flush { + if config_changed_pending { + slog!("[watcher] broadcasting agent_config_changed"); + let _ = event_tx.send(WatcherEvent::ConfigChanged); + + config_changed_pending = false; + } + deadline = None; + } + } + }); +} + +// ── Test-only helpers (legacy; retained for the test suite) ─────────────── + +/// Return the pipeline stage name for a path if it is a `.md` file living +/// directly inside one of the known work subdirectories, otherwise `None`. +/// +/// Explicitly returns `None` for any path under `.huskies/worktrees/` so +/// that code changes made by agents in their isolated worktrees are never +/// auto-committed to master by the watcher. +/// +/// Retained for tests; no longer called in production (CRDT drives events). +#[cfg(test)] +fn stage_for_path(path: &Path) -> Option { + // Reject any path that passes through the worktrees directory. + if path.components().any(|c| c.as_os_str() == "worktrees") { + return None; + } + + if path.extension().is_none_or(|e| e != "md") { + return None; + } + let stage = path + .parent() + .and_then(|p| p.file_name()) + .and_then(|n| n.to_str())?; + matches!( + stage, + "1_backlog" | "2_current" | "3_qa" | "4_merge" | "5_done" | "6_archived" + ) + .then(|| stage.to_string()) +} + +/// Stage all changes in the work directory and commit with the given message. +/// +/// Uses `git add -A .huskies/work/` to catch both additions and deletions in +/// a single commit. Returns `Ok(true)` if a commit was made, `Ok(false)` if +/// there was nothing to commit, and `Err` for unexpected failures. +/// +/// Retained for tests; no longer called in production (CRDT drives events). +#[cfg(test)] +fn git_add_work_and_commit(git_root: &Path, message: &str) -> Result { + let work_rel = PathBuf::from(".huskies").join("work"); + + let add_out = std::process::Command::new("git") + .args(["add", "-A"]) + .arg(&work_rel) + .current_dir(git_root) + .output() + .map_err(|e| format!("git add: {e}"))?; + if !add_out.status.success() { + return Err(format!( + "git add failed: {}", + String::from_utf8_lossy(&add_out.stderr) + )); + } + + let commit_out = std::process::Command::new("git") + .args(["commit", "-m", message]) + .current_dir(git_root) + .output() + .map_err(|e| format!("git commit: {e}"))?; + + if commit_out.status.success() { + return Ok(true); + } + + let stderr = String::from_utf8_lossy(&commit_out.stderr); + let stdout = String::from_utf8_lossy(&commit_out.stdout); + if stdout.contains("nothing to commit") || stderr.contains("nothing to commit") { + return Ok(false); + } + + Err(format!("git commit failed: {stderr}")) +} + +/// Stages that represent meaningful git checkpoints (creation and archival). +/// Intermediate stages (current, qa, merge, done) are transient pipeline state +/// that don't need to be committed. +/// +/// Retained for tests; no longer called in production (CRDT drives events). +#[cfg(test)] +const COMMIT_WORTHY_STAGES: &[&str] = &["1_backlog", "5_done", "6_archived"]; + +/// Return `true` if changes in `stage` should be committed to git. +/// +/// Retained for tests; no longer called in production (CRDT drives events). +#[cfg(test)] +fn should_commit_stage(stage: &str) -> bool { + COMMIT_WORTHY_STAGES.contains(&stage) +} + +/// Process a batch of pending (path → stage) entries: commit and broadcast. +/// +/// Only files that still exist on disk are used to derive the commit message +/// (they represent the destination of a move or a new file). Deletions are +/// captured by `git add -A .huskies/work/` automatically. +/// +/// Only terminal stages (`1_backlog` and `6_archived`) trigger git commits. +/// All stages broadcast a [`WatcherEvent`] so the frontend stays in sync. +/// +/// Retained for tests; no longer called in production (CRDT drives events). +#[cfg(test)] +fn flush_pending( + pending: &std::collections::HashMap, + git_root: &Path, + event_tx: &broadcast::Sender, +) { + use crate::io::story_metadata::clear_front_matter_field; + + // Separate into files that exist (additions) vs gone (deletions). + let mut additions: Vec<(&PathBuf, &str)> = Vec::new(); + for (path, stage) in pending { + if path.exists() { + additions.push((path, stage.as_str())); + } + } + + // Pick the commit message from the first addition (the meaningful side of a move). + // If there are only deletions, use a generic message. + let (action, item_id, commit_msg) = if let Some((path, stage)) = additions.first() { + let item = path + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unknown"); + if let Some((act, msg)) = stage_metadata(stage, item) { + (act, item.to_string(), msg) + } else { + return; + } + } else { + // Only deletions — pick any pending path for the item name. + let Some((path, _)) = pending.iter().next() else { + return; + }; + let item = path + .file_stem() + .and_then(|s| s.to_str()) + .unwrap_or("unknown"); + ( + "remove", + item.to_string(), + format!("huskies: remove {item}"), + ) + }; + + // Strip stale merge_failure front matter from any story that has left 4_merge/. + for (path, stage) in &additions { + if *stage != "4_merge" + && let Err(e) = clear_front_matter_field(path, "merge_failure") + { + slog!( + "[watcher] Warning: could not clear merge_failure from {}: {e}", + path.display() + ); + } + } + + // Only commit for terminal stages; intermediate moves are broadcast-only. + let dest_stage = additions.first().map_or("unknown", |(_, s)| *s); + let should_commit = should_commit_stage(dest_stage); + + if should_commit { + slog!("[watcher] flush: {commit_msg}"); + match git_add_work_and_commit(git_root, &commit_msg) { + Ok(committed) => { + if committed { + slog!("[watcher] committed: {commit_msg}"); + } else { + slog!("[watcher] skipped (already committed): {commit_msg}"); + } + } + Err(e) => { + slog!("[watcher] git error: {e}"); + return; + } + } + } else { + slog!("[watcher] flush (broadcast-only): {commit_msg}"); + } + + // For move operations, find the source stage from deleted entries with matching item_id. + let from_stage: Option = if !additions.is_empty() { + pending + .iter() + .filter(|(path, _)| !path.exists()) + .find(|(path, _)| path.file_stem().and_then(|s| s.to_str()) == Some(item_id.as_str())) + .map(|(_, stage)| stage.clone()) + } else { + None + }; + + // Always broadcast the event so connected WebSocket clients stay in sync. + let evt = WatcherEvent::WorkItem { + stage: dest_stage.to_string(), + item_id, + action: action.to_string(), + commit_msg, + from_stage, + }; + let _ = event_tx.send(evt); +} + +#[cfg(test)] +mod tests; diff --git a/server/src/io/watcher/sweep.rs b/server/src/io/watcher/sweep.rs new file mode 100644 index 00000000..e3136510 --- /dev/null +++ b/server/src/io/watcher/sweep.rs @@ -0,0 +1,48 @@ +//! Periodic sweep of completed work items from `5_done` to `6_archived`. +//! +//! Items that have been in `5_done` for longer than the configured retention +//! period are automatically promoted to `6_archived` via CRDT state transitions. + +use crate::slog; +use std::time::Duration; + +/// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention +/// duration to `6_archived` via CRDT state transitions. +/// +/// All state is read from and written to CRDT — no filesystem access. +/// Worktree pruning is handled separately by the CRDT event subscriber. +pub(crate) fn sweep_done_to_archived(done_retention: Duration) { + use crate::pipeline_state::{PipelineEvent, Stage, read_all_typed, stage_dir_name, transition}; + + for item in read_all_typed() { + if let Stage::Done { merged_at, .. } = &item.stage { + let age = chrono::Utc::now() + .signed_duration_since(*merged_at) + .to_std() + .unwrap_or_default(); + if age >= done_retention { + let story_id = item.story_id.0.clone(); + match transition(item.stage.clone(), PipelineEvent::Accepted) { + Ok(new_stage) => { + crate::crdt_state::write_item( + &story_id, + stage_dir_name(&new_stage), + None, + None, + None, + Some(false), + None, + None, + None, + None, + ); + slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); + } + Err(e) => { + slog!("[watcher] sweep: transition error for {story_id}: {e}"); + } + } + } + } + } +} diff --git a/server/src/io/watcher/tests.rs b/server/src/io/watcher/tests.rs new file mode 100644 index 00000000..2a6cd553 --- /dev/null +++ b/server/src/io/watcher/tests.rs @@ -0,0 +1,730 @@ +use super::*; +use std::collections::HashMap; +use std::fs; +use tempfile::TempDir; + +/// Initialise a minimal git repo so commit operations work. +fn init_git_repo(dir: &std::path::Path) { + use std::process::Command; + Command::new("git") + .args(["init"]) + .current_dir(dir) + .output() + .expect("git init"); + Command::new("git") + .args(["config", "user.email", "test@example.com"]) + .current_dir(dir) + .output() + .expect("git config email"); + Command::new("git") + .args(["config", "user.name", "Test"]) + .current_dir(dir) + .output() + .expect("git config name"); + Command::new("git") + .args(["commit", "--allow-empty", "-m", "init"]) + .current_dir(dir) + .output() + .expect("git initial commit"); +} + +/// Create the `.huskies/work/{stage}/` dir tree inside `root`. +fn make_stage_dir(root: &std::path::Path, stage: &str) -> PathBuf { + let dir = root.join(".huskies").join("work").join(stage); + fs::create_dir_all(&dir).expect("create stage dir"); + dir +} + +// ── git_add_work_and_commit ─────────────────────────────────────────────── + +#[test] +fn git_commit_returns_true_when_file_added() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), "2_current"); + fs::write(stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n").unwrap(); + + let result = git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo"); + assert_eq!( + result, + Ok(true), + "should return Ok(true) when a commit was made" + ); +} + +#[test] +fn git_commit_returns_false_when_nothing_to_commit() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), "2_current"); + fs::write(stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n").unwrap(); + + // First commit — should succeed. + git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo").unwrap(); + + // Second call with no changes — should return Ok(false). + let result = git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo"); + assert_eq!( + result, + Ok(false), + "should return Ok(false) when nothing to commit" + ); +} + +// ── flush_pending ───────────────────────────────────────────────────────── + +#[test] +fn flush_pending_commits_and_broadcasts_for_terminal_stage() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), "1_backlog"); + let story_path = stage_dir.join("42_story_foo.md"); + fs::write(&story_path, "---\nname: test\n---\n").unwrap(); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path, "1_backlog".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + let evt = rx.try_recv().expect("expected a broadcast event"); + match evt { + WatcherEvent::WorkItem { + stage, + item_id, + action, + commit_msg, + .. + } => { + assert_eq!(stage, "1_backlog"); + assert_eq!(item_id, "42_story_foo"); + assert_eq!(action, "create"); + assert_eq!(commit_msg, "huskies: create 42_story_foo"); + } + other => panic!("unexpected event: {other:?}"), + } + + // Verify the file was actually committed. + let log = std::process::Command::new("git") + .args(["log", "--oneline", "-1"]) + .current_dir(tmp.path()) + .output() + .expect("git log"); + let log_msg = String::from_utf8_lossy(&log.stdout); + assert!( + log_msg.contains("huskies: create 42_story_foo"), + "terminal stage should produce a git commit" + ); +} + +#[test] +fn flush_pending_broadcasts_without_commit_for_intermediate_stage() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), "2_current"); + let story_path = stage_dir.join("42_story_foo.md"); + fs::write(&story_path, "---\nname: test\n---\n").unwrap(); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path, "2_current".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + // Event should still be broadcast for frontend sync. + let evt = rx.try_recv().expect("expected a broadcast event"); + match evt { + WatcherEvent::WorkItem { + stage, + item_id, + action, + commit_msg, + .. + } => { + assert_eq!(stage, "2_current"); + assert_eq!(item_id, "42_story_foo"); + assert_eq!(action, "start"); + assert_eq!(commit_msg, "huskies: start 42_story_foo"); + } + other => panic!("unexpected event: {other:?}"), + } + + // Verify NO git commit was made (only the initial empty commit should exist). + let log = std::process::Command::new("git") + .args(["log", "--oneline"]) + .current_dir(tmp.path()) + .output() + .expect("git log"); + let log_msg = String::from_utf8_lossy(&log.stdout); + assert!( + !log_msg.contains("huskies:"), + "intermediate stage should NOT produce a git commit" + ); +} + +#[test] +fn flush_pending_broadcasts_for_all_pipeline_stages() { + let stages = [ + ("1_backlog", "create", "huskies: create 10_story_x"), + ("3_qa", "qa", "huskies: queue 10_story_x for QA"), + ("4_merge", "merge", "huskies: queue 10_story_x for merge"), + ("5_done", "done", "huskies: done 10_story_x"), + ("6_archived", "accept", "huskies: accept 10_story_x"), + ]; + + for (stage, expected_action, expected_msg) in stages { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), stage); + let story_path = stage_dir.join("10_story_x.md"); + fs::write(&story_path, "---\nname: test\n---\n").unwrap(); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path, stage.to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + // All stages should broadcast events regardless of commit behavior. + let evt = rx.try_recv().expect("expected broadcast for stage {stage}"); + match evt { + WatcherEvent::WorkItem { + action, commit_msg, .. + } => { + assert_eq!(action, expected_action, "stage {stage}"); + assert_eq!(commit_msg, expected_msg, "stage {stage}"); + } + other => panic!("unexpected event for stage {stage}: {other:?}"), + } + } +} + +#[test] +fn flush_pending_deletion_only_broadcasts_remove_event() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + // Create the work dir tree but NOT the file (simulates a deletion). + make_stage_dir(tmp.path(), "2_current"); + let deleted_path = tmp + .path() + .join(".huskies") + .join("work") + .join("2_current") + .join("42_story_foo.md"); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(deleted_path, "2_current".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + // Even when nothing was committed (file never existed), an event is broadcast. + let evt = rx + .try_recv() + .expect("expected a broadcast event for deletion"); + match evt { + WatcherEvent::WorkItem { + action, item_id, .. + } => { + assert_eq!(action, "remove"); + assert_eq!(item_id, "42_story_foo"); + } + other => panic!("unexpected event: {other:?}"), + } +} + +#[test] +fn flush_pending_skips_unknown_stage_for_addition() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + // File sits in an unrecognised directory. + let unknown_dir = tmp.path().join(".huskies").join("work").join("9_unknown"); + fs::create_dir_all(&unknown_dir).unwrap(); + let path = unknown_dir.join("42_story_foo.md"); + fs::write(&path, "---\nname: test\n---\n").unwrap(); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(path, "9_unknown".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + // No event should be broadcast because stage_metadata returns None for unknown stages. + assert!( + rx.try_recv().is_err(), + "no event should be broadcast for unknown stage" + ); +} + +#[test] +fn flush_pending_empty_pending_does_nothing() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + make_stage_dir(tmp.path(), "2_current"); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let pending: HashMap = HashMap::new(); + + // Should not panic and should not broadcast anything. + flush_pending(&pending, tmp.path(), &tx); + assert!(rx.try_recv().is_err(), "no event for empty pending map"); +} + +// ── flush_pending clears merge_failure ───────────────────────────────────── + +#[test] +fn flush_pending_clears_merge_failure_when_leaving_merge_stage() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), "2_current"); + let story_path = stage_dir.join("50_story_retry.md"); + fs::write( + &story_path, + "---\nname: Retry Story\nmerge_failure: \"conflicts detected\"\n---\n# Story\n", + ) + .unwrap(); + + let (tx, _rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path.clone(), "2_current".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + let contents = fs::read_to_string(&story_path).unwrap(); + assert!( + !contents.contains("merge_failure"), + "merge_failure should be stripped when story lands in 2_current" + ); + assert!(contents.contains("name: Retry Story")); +} + +#[test] +fn flush_pending_clears_merge_failure_when_moving_to_backlog() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), "1_backlog"); + let story_path = stage_dir.join("51_story_reset.md"); + fs::write( + &story_path, + "---\nname: Reset Story\nmerge_failure: \"gate failed\"\n---\n# Story\n", + ) + .unwrap(); + + let (tx, _rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path.clone(), "1_backlog".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + let contents = fs::read_to_string(&story_path).unwrap(); + assert!( + !contents.contains("merge_failure"), + "merge_failure should be stripped when story lands in 1_backlog" + ); +} + +#[test] +fn flush_pending_clears_merge_failure_when_moving_to_done() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), "5_done"); + let story_path = stage_dir.join("52_story_done.md"); + fs::write( + &story_path, + "---\nname: Done Story\nmerge_failure: \"stale error\"\n---\n# Story\n", + ) + .unwrap(); + + let (tx, _rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path.clone(), "5_done".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + let contents = fs::read_to_string(&story_path).unwrap(); + assert!( + !contents.contains("merge_failure"), + "merge_failure should be stripped when story lands in 5_done" + ); +} + +#[test] +fn flush_pending_preserves_merge_failure_when_in_merge_stage() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), "4_merge"); + let story_path = stage_dir.join("53_story_merging.md"); + fs::write( + &story_path, + "---\nname: Merging Story\nmerge_failure: \"conflicts\"\n---\n# Story\n", + ) + .unwrap(); + + let (tx, _rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path.clone(), "4_merge".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + let contents = fs::read_to_string(&story_path).unwrap(); + assert!( + contents.contains("merge_failure"), + "merge_failure should be preserved when story is in 4_merge" + ); +} + +#[test] +fn flush_pending_no_op_when_no_merge_failure() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + let stage_dir = make_stage_dir(tmp.path(), "2_current"); + let story_path = stage_dir.join("54_story_clean.md"); + let original = "---\nname: Clean Story\n---\n# Story\n"; + fs::write(&story_path, original).unwrap(); + + let (tx, _rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path.clone(), "2_current".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + let contents = fs::read_to_string(&story_path).unwrap(); + assert_eq!( + contents, original, + "file without merge_failure should be unchanged" + ); +} + +// ── flush_pending from_stage ───────────────────────────────────────────── + +/// AC3: when a pending map contains both a deletion (source stage) and a +/// creation (dest stage) for the same item_id, the broadcast event should +/// have `from_stage` set to the source stage key. +#[test] +fn flush_pending_sets_from_stage_for_move_operations() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + + // Destination exists (file moved here). + let merge_dir = make_stage_dir(tmp.path(), "4_merge"); + let merge_path = merge_dir.join("42_story_foo.md"); + fs::write(&merge_path, "---\nname: test\n---\n").unwrap(); + + // Source path does NOT exist (file was moved away). + make_stage_dir(tmp.path(), "3_qa"); + let qa_path = tmp + .path() + .join(".huskies") + .join("work") + .join("3_qa") + .join("42_story_foo.md"); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(merge_path, "4_merge".to_string()); // addition + pending.insert(qa_path, "3_qa".to_string()); // deletion + + flush_pending(&pending, tmp.path(), &tx); + + let evt = rx.try_recv().expect("expected event"); + match evt { + WatcherEvent::WorkItem { + stage, from_stage, .. + } => { + assert_eq!(stage, "4_merge"); + assert_eq!(from_stage, Some("3_qa".to_string())); + } + other => panic!("unexpected event: {other:?}"), + } +} + +/// AC3: when a pending map has only an addition (creation, not a move), +/// `from_stage` should be `None`. +#[test] +fn flush_pending_sets_from_stage_to_none_for_creations() { + let tmp = TempDir::new().unwrap(); + init_git_repo(tmp.path()); + + let stage_dir = make_stage_dir(tmp.path(), "2_current"); + let story_path = stage_dir.join("55_story_new.md"); + fs::write(&story_path, "---\nname: New Story\n---\n").unwrap(); + + let (tx, mut rx) = tokio::sync::broadcast::channel(16); + let mut pending = HashMap::new(); + pending.insert(story_path, "2_current".to_string()); + + flush_pending(&pending, tmp.path(), &tx); + + let evt = rx.try_recv().expect("expected event"); + match evt { + WatcherEvent::WorkItem { from_stage, .. } => { + assert_eq!(from_stage, None, "creation should have no from_stage"); + } + other => panic!("unexpected event: {other:?}"), + } +} + +// ── stage_for_path (additional edge cases) ──────────────────────────────── + +#[test] +fn stage_for_path_recognises_pipeline_dirs() { + let base = PathBuf::from("/proj/.huskies/work"); + assert_eq!( + stage_for_path(&base.join("2_current/42_story_foo.md")), + Some("2_current".to_string()) + ); + assert_eq!( + stage_for_path(&base.join("5_done/10_bug_bar.md")), + Some("5_done".to_string()) + ); + assert_eq!( + stage_for_path(&base.join("6_archived/10_bug_bar.md")), + Some("6_archived".to_string()) + ); + assert_eq!(stage_for_path(&base.join("other/file.md")), None); + assert_eq!( + stage_for_path(&base.join("2_current/42_story_foo.txt")), + None + ); +} + +#[test] +fn stage_for_path_ignores_worktree_paths() { + let worktrees = PathBuf::from("/proj/.huskies/worktrees"); + + // Code changes inside a worktree must be ignored. + assert_eq!( + stage_for_path(&worktrees.join("42_story_foo/server/src/main.rs")), + None, + ); + + // Even if a worktree happens to contain a path component that looks + // like a pipeline stage, it must still be ignored. + assert_eq!( + stage_for_path(&worktrees.join("42_story_foo/.huskies/work/2_current/42_story_foo.md")), + None, + ); + + // A path that only contains the word "worktrees" as part of a longer + // segment (not an exact component) must NOT be filtered out. + assert_eq!( + stage_for_path(&PathBuf::from( + "/proj/.huskies/work/2_current/not_worktrees_story.md" + )), + Some("2_current".to_string()), + ); +} + +#[test] +fn should_commit_stage_only_for_terminal_stages() { + // Terminal stages — should commit. + assert!(should_commit_stage("1_backlog")); + assert!(should_commit_stage("5_done")); + assert!(should_commit_stage("6_archived")); + // Intermediate stages — broadcast-only, no commit. + assert!(!should_commit_stage("2_current")); + assert!(!should_commit_stage("3_qa")); + assert!(!should_commit_stage("4_merge")); + // Unknown — no commit. + assert!(!should_commit_stage("unknown")); +} + +#[test] +fn stage_metadata_returns_correct_actions() { + let (action, msg) = stage_metadata("2_current", "42_story_foo").unwrap(); + assert_eq!(action, "start"); + assert_eq!(msg, "huskies: start 42_story_foo"); + + let (action, msg) = stage_metadata("5_done", "42_story_foo").unwrap(); + assert_eq!(action, "done"); + assert_eq!(msg, "huskies: done 42_story_foo"); + + let (action, msg) = stage_metadata("6_archived", "42_story_foo").unwrap(); + assert_eq!(action, "accept"); + assert_eq!(msg, "huskies: accept 42_story_foo"); + + assert!(stage_metadata("unknown", "id").is_none()); +} + +#[test] +fn is_config_file_identifies_root_project_toml() { + let git_root = PathBuf::from("/proj"); + let config = git_root.join(".huskies").join("project.toml"); + assert!(is_config_file(&config, &git_root)); +} + +#[test] +fn is_config_file_identifies_root_agents_toml() { + let git_root = PathBuf::from("/proj"); + let agents = git_root.join(".huskies").join("agents.toml"); + assert!(is_config_file(&agents, &git_root)); +} + +#[test] +fn is_config_file_rejects_worktree_copies() { + let git_root = PathBuf::from("/proj"); + // project.toml inside a worktree must NOT be treated as the root config. + let worktree_config = + PathBuf::from("/proj/.huskies/worktrees/42_story_foo/.huskies/project.toml"); + assert!(!is_config_file(&worktree_config, &git_root)); +} + +#[test] +fn is_config_file_rejects_other_files() { + let git_root = PathBuf::from("/proj"); + // Random files must not match. + assert!(!is_config_file( + &PathBuf::from("/proj/.huskies/work/2_current/42_story_foo.md"), + &git_root + )); + assert!(!is_config_file( + &PathBuf::from("/proj/.huskies/README.md"), + &git_root + )); +} + +#[test] +fn is_config_file_rejects_wrong_root() { + let git_root = PathBuf::from("/proj"); + let other_root_config = PathBuf::from("/other/.huskies/project.toml"); + assert!(!is_config_file(&other_root_config, &git_root)); +} + +// ── sweep_done_to_archived (CRDT-based) ───────────────────────────────── +// +// The sweep function reads from `read_all_typed()` and checks +// `Stage::Done { merged_at, .. }`. Items created via +// `write_item_with_content("5_done")` project `merged_at = Utc::now()`, +// so we test with Duration::ZERO to sweep immediately and with a long +// retention to verify items are kept. No filesystem access is involved. + +#[test] +fn sweep_moves_old_items_to_archived() { + crate::db::ensure_content_store(); + crate::db::write_item_with_content("9880_story_sweep_old", "5_done", "---\nname: old\n---\n"); + + // With ZERO retention, any Done item should be swept. + sweep_done_to_archived(Duration::ZERO); + + // Verify the item was moved to 6_archived in the CRDT. + let items = crate::pipeline_state::read_all_typed(); + let item = items + .iter() + .find(|i| i.story_id.0 == "9880_story_sweep_old"); + assert!( + item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), + "item should be archived after sweep" + ); +} + +#[test] +fn sweep_keeps_recent_items_in_done() { + crate::db::ensure_content_store(); + crate::db::write_item_with_content("9881_story_sweep_new", "5_done", "---\nname: new\n---\n"); + + // With a very long retention, the item (merged_at ≈ now) should stay. + sweep_done_to_archived(Duration::from_secs(999_999)); + + let items = crate::pipeline_state::read_all_typed(); + let item = items + .iter() + .find(|i| i.story_id.0 == "9881_story_sweep_new"); + assert!( + item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })), + "item should remain in Done with long retention" + ); +} + +#[test] +fn sweep_respects_custom_retention() { + crate::db::ensure_content_store(); + crate::db::write_item_with_content( + "9882_story_sweep_custom", + "5_done", + "---\nname: custom\n---\n", + ); + + // With ZERO retention, sweep should promote. + sweep_done_to_archived(Duration::ZERO); + + let items = crate::pipeline_state::read_all_typed(); + let item = items + .iter() + .find(|i| i.story_id.0 == "9882_story_sweep_custom"); + assert!( + item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), + "item should be archived with zero retention" + ); +} + +/// Prove that the sweep reads `merged_at` from the CRDT (not `Utc::now()`). +/// +/// This test sets `merged_at` to 10 seconds in the past and uses a 5-second +/// retention. If the sweep were still using `Utc::now()` as the start time +/// (the original bug), the elapsed time would be ~0 and the item would NOT +/// be swept. With the fix, the item is swept because 10s > 5s retention. +#[test] +fn sweep_uses_crdt_merged_at_not_utc_now() { + crate::db::ensure_content_store(); + + let ten_seconds_ago = (chrono::Utc::now() - chrono::Duration::seconds(10)).timestamp() as f64; + + // Write item in 5_done with an explicit past merged_at timestamp. + crate::crdt_state::write_item( + "9883_story_sweep_merged_at", + "5_done", + Some("merged_at test"), + None, + None, + None, + None, + None, + None, + Some(ten_seconds_ago), + ); + + // 5-second retention: item is 10s old → should be swept. + sweep_done_to_archived(Duration::from_secs(5)); + + let items = crate::pipeline_state::read_all_typed(); + let item = items + .iter() + .find(|i| i.story_id.0 == "9883_story_sweep_merged_at"); + assert!( + item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })), + "item with merged_at 10s ago should be archived with 5s retention" + ); +} + +/// Prove that an item with merged_at NEWER than done_retention is NOT swept. +#[test] +fn sweep_keeps_item_newer_than_retention() { + crate::db::ensure_content_store(); + + let one_second_ago = (chrono::Utc::now() - chrono::Duration::seconds(1)).timestamp() as f64; + + crate::crdt_state::write_item( + "9884_story_sweep_recent", + "5_done", + Some("recent merged_at test"), + None, + None, + None, + None, + None, + None, + Some(one_second_ago), + ); + + // 1-hour retention: item is only 1s old → should NOT be swept. + sweep_done_to_archived(Duration::from_secs(3600)); + + let items = crate::pipeline_state::read_all_typed(); + let item = items + .iter() + .find(|i| i.story_id.0 == "9884_story_sweep_recent"); + assert!( + item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })), + "item with merged_at 1s ago should stay in Done with 1-hour retention" + ); +}