//! Filesystem watcher for `.story_kit/work/` and `.story_kit/project.toml`. //! //! Watches the work pipeline directories for file changes, infers the lifecycle //! stage from the target directory name, auto-commits with a deterministic message, //! and broadcasts a [`WatcherEvent`] to all connected WebSocket clients. //! //! Also watches `.story_kit/project.toml` for modifications and broadcasts //! [`WatcherEvent::ConfigChanged`] so the frontend can reload the agent roster //! without a server restart. //! //! # Debouncing //! Events are buffered for 300 ms after the last activity. All changes within the //! window are batched into a single `git add + commit`. This avoids double-commits //! when `fs::rename` fires both a remove and a create event. //! //! # Race conditions //! If a mutation handler (e.g. `move_story_to_current`) already committed the //! change, `git commit` will return "nothing to commit". The watcher detects this //! via exit-code inspection and silently skips the commit while still broadcasting //! the event so connected clients stay in sync. use crate::slog; use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher}; use serde::Serialize; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::mpsc; use std::time::{Duration, Instant}; use tokio::sync::broadcast; /// A lifecycle event emitted by the filesystem watcher. #[derive(Clone, Debug, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum WatcherEvent { /// A work-pipeline file was created, modified, or deleted. WorkItem { /// Pipeline stage directory (e.g. `"2_current"`, `"5_archived"`). stage: String, /// Work item ID (filename stem without extension, e.g. `"42_story_my_feature"`). item_id: String, /// Semantic action inferred from the stage (e.g. `"start"`, `"accept"`). action: String, /// The deterministic git commit message used (or that would have been used). commit_msg: String, }, /// `.story_kit/project.toml` was modified at the project root (not inside a worktree). ConfigChanged, } /// Return `true` if `path` is the root-level `.story_kit/project.toml`, i.e. /// `{git_root}/.story_kit/project.toml`. /// /// Returns `false` for paths inside worktree directories (paths containing /// a `worktrees` component). pub fn is_config_file(path: &Path, git_root: &Path) -> bool { // Reject any path that passes through the worktrees directory. if path.components().any(|c| c.as_os_str() == "worktrees") { return false; } let expected = git_root.join(".story_kit").join("project.toml"); path == expected } /// Map a pipeline directory name to a (action, commit-message-prefix) pair. fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> { let (action, prefix) = match stage { "1_upcoming" => ("create", format!("story-kit: create {item_id}")), "2_current" => ("start", format!("story-kit: start {item_id}")), "3_qa" => ("qa", format!("story-kit: queue {item_id} for QA")), "4_merge" => ("merge", format!("story-kit: queue {item_id} for merge")), "5_archived" => ("accept", format!("story-kit: accept {item_id}")), _ => return None, }; Some((action, prefix)) } /// Return the pipeline stage name for a path if it is a `.md` file living /// directly inside one of the known work subdirectories, otherwise `None`. /// /// Explicitly returns `None` for any path under `.story_kit/worktrees/` so /// that code changes made by agents in their isolated worktrees are never /// auto-committed to master by the watcher. fn stage_for_path(path: &Path) -> Option { // Reject any path that passes through the worktrees directory. if path .components() .any(|c| c.as_os_str() == "worktrees") { return None; } if path.extension().is_none_or(|e| e != "md") { return None; } let stage = path .parent() .and_then(|p| p.file_name()) .and_then(|n| n.to_str())?; matches!(stage, "1_upcoming" | "2_current" | "3_qa" | "4_merge" | "5_archived") .then(|| stage.to_string()) } /// Stage all changes in the work directory and commit with the given message. /// /// Uses `git add -A .story_kit/work/` to catch both additions and deletions in /// a single commit. Returns `Ok(true)` if a commit was made, `Ok(false)` if /// there was nothing to commit, and `Err` for unexpected failures. fn git_add_work_and_commit(git_root: &Path, message: &str) -> Result { let work_rel = PathBuf::from(".story_kit").join("work"); let add_out = std::process::Command::new("git") .args(["add", "-A"]) .arg(&work_rel) .current_dir(git_root) .output() .map_err(|e| format!("git add: {e}"))?; if !add_out.status.success() { return Err(format!( "git add failed: {}", String::from_utf8_lossy(&add_out.stderr) )); } let commit_out = std::process::Command::new("git") .args(["commit", "-m", message]) .current_dir(git_root) .output() .map_err(|e| format!("git commit: {e}"))?; if commit_out.status.success() { return Ok(true); } let stderr = String::from_utf8_lossy(&commit_out.stderr); let stdout = String::from_utf8_lossy(&commit_out.stdout); if stdout.contains("nothing to commit") || stderr.contains("nothing to commit") { return Ok(false); } Err(format!("git commit failed: {stderr}")) } /// Process a batch of pending (path → stage) entries: commit and broadcast. /// /// Only files that still exist on disk are used to derive the commit message /// (they represent the destination of a move or a new file). Deletions are /// captured by `git add -A .story_kit/work/` automatically. fn flush_pending( pending: &HashMap, git_root: &Path, event_tx: &broadcast::Sender, ) { // Separate into files that exist (additions) vs gone (deletions). let mut additions: Vec<(&PathBuf, &str)> = Vec::new(); for (path, stage) in pending { if path.exists() { additions.push((path, stage.as_str())); } } // Pick the commit message from the first addition (the meaningful side of a move). // If there are only deletions, use a generic message. let (action, item_id, commit_msg) = if let Some((path, stage)) = additions.first() { let item = path.file_stem().and_then(|s| s.to_str()).unwrap_or("unknown"); if let Some((act, msg)) = stage_metadata(stage, item) { (act, item.to_string(), msg) } else { return; } } else { // Only deletions — pick any pending path for the item name. let Some((path, _)) = pending.iter().next() else { return; }; let item = path.file_stem().and_then(|s| s.to_str()).unwrap_or("unknown"); ("remove", item.to_string(), format!("story-kit: remove {item}")) }; slog!("[watcher] flush: {commit_msg}"); match git_add_work_and_commit(git_root, &commit_msg) { Ok(committed) => { if committed { slog!("[watcher] committed: {commit_msg}"); } else { slog!("[watcher] skipped (already committed): {commit_msg}"); } let stage = additions.first().map_or("unknown", |(_, s)| s); let evt = WatcherEvent::WorkItem { stage: stage.to_string(), item_id, action: action.to_string(), commit_msg, }; let _ = event_tx.send(evt); } Err(e) => { slog!("[watcher] git error: {e}"); } } } /// Start the filesystem watcher on a dedicated OS thread. /// /// `work_dir` — absolute path to `.story_kit/work/` (watched recursively). /// `git_root` — project root (passed to `git` commands as cwd, and used to /// derive the config file path `.story_kit/project.toml`). /// `event_tx` — broadcast sender; each connected WebSocket client holds a receiver. pub fn start_watcher( work_dir: PathBuf, git_root: PathBuf, event_tx: broadcast::Sender, ) { std::thread::spawn(move || { let (notify_tx, notify_rx) = mpsc::channel::>(); let mut watcher: RecommendedWatcher = match recommended_watcher(move |res| { let _ = notify_tx.send(res); }) { Ok(w) => w, Err(e) => { slog!("[watcher] failed to create watcher: {e}"); return; } }; if let Err(e) = watcher.watch(&work_dir, RecursiveMode::Recursive) { slog!("[watcher] failed to watch {}: {e}", work_dir.display()); return; } // Also watch .story_kit/project.toml for hot-reload of agent config. let config_file = git_root.join(".story_kit").join("project.toml"); if config_file.exists() && let Err(e) = watcher.watch(&config_file, RecursiveMode::NonRecursive) { slog!("[watcher] failed to watch config file {}: {e}", config_file.display()); } slog!("[watcher] watching {}", work_dir.display()); const DEBOUNCE: Duration = Duration::from_millis(300); // Map path → stage for pending (uncommitted) work-item changes. let mut pending: HashMap = HashMap::new(); // Whether a config file change is pending in the current debounce window. let mut config_changed_pending = false; let mut deadline: Option = None; loop { // How long until the debounce window closes (or wait for next event). let timeout = deadline.map_or(Duration::from_secs(60), |d| { d.saturating_duration_since(Instant::now()) }); let flush = match notify_rx.recv_timeout(timeout) { Ok(Ok(event)) => { // Track creates, modifies, AND removes. Removes are needed so // that standalone deletions trigger a flush, and so that moves // (which fire Remove + Create) land in the same debounce window. let is_relevant_kind = matches!( event.kind, EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) ); if is_relevant_kind { for path in event.paths { if is_config_file(&path, &git_root) { slog!("[watcher] config change detected: {}", path.display()); config_changed_pending = true; deadline = Some(Instant::now() + DEBOUNCE); } else if let Some(stage) = stage_for_path(&path) { pending.insert(path, stage); deadline = Some(Instant::now() + DEBOUNCE); } } } false } Ok(Err(e)) => { slog!("[watcher] notify error: {e}"); false } // Debounce window expired — time to flush. Err(mpsc::RecvTimeoutError::Timeout) => true, Err(mpsc::RecvTimeoutError::Disconnected) => { slog!("[watcher] channel disconnected, shutting down"); break; } }; if flush { if !pending.is_empty() { flush_pending(&pending, &git_root, &event_tx); pending.clear(); } if config_changed_pending { slog!("[watcher] broadcasting agent_config_changed"); let _ = event_tx.send(WatcherEvent::ConfigChanged); config_changed_pending = false; } deadline = None; } } }); } #[cfg(test)] mod tests { use super::*; use std::collections::HashMap; use std::fs; use tempfile::TempDir; /// Initialise a minimal git repo so commit operations work. fn init_git_repo(dir: &std::path::Path) { use std::process::Command; Command::new("git") .args(["init"]) .current_dir(dir) .output() .expect("git init"); Command::new("git") .args(["config", "user.email", "test@example.com"]) .current_dir(dir) .output() .expect("git config email"); Command::new("git") .args(["config", "user.name", "Test"]) .current_dir(dir) .output() .expect("git config name"); Command::new("git") .args(["commit", "--allow-empty", "-m", "init"]) .current_dir(dir) .output() .expect("git initial commit"); } /// Create the `.story_kit/work/{stage}/` dir tree inside `root`. fn make_stage_dir(root: &std::path::Path, stage: &str) -> PathBuf { let dir = root.join(".story_kit").join("work").join(stage); fs::create_dir_all(&dir).expect("create stage dir"); dir } // ── git_add_work_and_commit ─────────────────────────────────────────────── #[test] fn git_commit_returns_true_when_file_added() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "2_current"); fs::write( stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n", ) .unwrap(); let result = git_add_work_and_commit(tmp.path(), "story-kit: start 42_story_foo"); assert_eq!(result, Ok(true), "should return Ok(true) when a commit was made"); } #[test] fn git_commit_returns_false_when_nothing_to_commit() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "2_current"); fs::write( stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n", ) .unwrap(); // First commit — should succeed. git_add_work_and_commit(tmp.path(), "story-kit: start 42_story_foo").unwrap(); // Second call with no changes — should return Ok(false). let result = git_add_work_and_commit(tmp.path(), "story-kit: start 42_story_foo"); assert_eq!( result, Ok(false), "should return Ok(false) when nothing to commit" ); } // ── flush_pending ───────────────────────────────────────────────────────── #[test] fn flush_pending_commits_and_broadcasts_work_item_for_addition() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), "2_current"); let story_path = stage_dir.join("42_story_foo.md"); fs::write(&story_path, "---\nname: test\n---\n").unwrap(); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path, "2_current".to_string()); flush_pending(&pending, tmp.path(), &tx); let evt = rx.try_recv().expect("expected a broadcast event"); match evt { WatcherEvent::WorkItem { stage, item_id, action, commit_msg, } => { assert_eq!(stage, "2_current"); assert_eq!(item_id, "42_story_foo"); assert_eq!(action, "start"); assert_eq!(commit_msg, "story-kit: start 42_story_foo"); } other => panic!("unexpected event: {other:?}"), } } #[test] fn flush_pending_broadcasts_for_all_pipeline_stages() { let stages = [ ("1_upcoming", "create", "story-kit: create 10_story_x"), ("3_qa", "qa", "story-kit: queue 10_story_x for QA"), ("4_merge", "merge", "story-kit: queue 10_story_x for merge"), ("5_archived", "accept", "story-kit: accept 10_story_x"), ]; for (stage, expected_action, expected_msg) in stages { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); let stage_dir = make_stage_dir(tmp.path(), stage); let story_path = stage_dir.join("10_story_x.md"); fs::write(&story_path, "---\nname: test\n---\n").unwrap(); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(story_path, stage.to_string()); flush_pending(&pending, tmp.path(), &tx); let evt = rx.try_recv().expect("expected broadcast for stage {stage}"); match evt { WatcherEvent::WorkItem { action, commit_msg, .. } => { assert_eq!(action, expected_action, "stage {stage}"); assert_eq!(commit_msg, expected_msg, "stage {stage}"); } other => panic!("unexpected event for stage {stage}: {other:?}"), } } } #[test] fn flush_pending_deletion_only_broadcasts_remove_event() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); // Create the work dir tree but NOT the file (simulates a deletion). make_stage_dir(tmp.path(), "2_current"); let deleted_path = tmp .path() .join(".story_kit") .join("work") .join("2_current") .join("42_story_foo.md"); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(deleted_path, "2_current".to_string()); flush_pending(&pending, tmp.path(), &tx); // Even when nothing was committed (file never existed), an event is broadcast. let evt = rx.try_recv().expect("expected a broadcast event for deletion"); match evt { WatcherEvent::WorkItem { action, item_id, .. } => { assert_eq!(action, "remove"); assert_eq!(item_id, "42_story_foo"); } other => panic!("unexpected event: {other:?}"), } } #[test] fn flush_pending_skips_unknown_stage_for_addition() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); // File sits in an unrecognised directory. let unknown_dir = tmp.path().join(".story_kit").join("work").join("9_unknown"); fs::create_dir_all(&unknown_dir).unwrap(); let path = unknown_dir.join("42_story_foo.md"); fs::write(&path, "---\nname: test\n---\n").unwrap(); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let mut pending = HashMap::new(); pending.insert(path, "9_unknown".to_string()); flush_pending(&pending, tmp.path(), &tx); // No event should be broadcast because stage_metadata returns None for unknown stages. assert!( rx.try_recv().is_err(), "no event should be broadcast for unknown stage" ); } #[test] fn flush_pending_empty_pending_does_nothing() { let tmp = TempDir::new().unwrap(); init_git_repo(tmp.path()); make_stage_dir(tmp.path(), "2_current"); let (tx, mut rx) = tokio::sync::broadcast::channel(16); let pending: HashMap = HashMap::new(); // Should not panic and should not broadcast anything. flush_pending(&pending, tmp.path(), &tx); assert!(rx.try_recv().is_err(), "no event for empty pending map"); } // ── stage_for_path (additional edge cases) ──────────────────────────────── #[test] fn stage_for_path_recognises_pipeline_dirs() { let base = PathBuf::from("/proj/.story_kit/work"); assert_eq!( stage_for_path(&base.join("2_current/42_story_foo.md")), Some("2_current".to_string()) ); assert_eq!( stage_for_path(&base.join("5_archived/10_bug_bar.md")), Some("5_archived".to_string()) ); assert_eq!(stage_for_path(&base.join("other/file.md")), None); assert_eq!( stage_for_path(&base.join("2_current/42_story_foo.txt")), None ); } #[test] fn stage_for_path_ignores_worktree_paths() { let worktrees = PathBuf::from("/proj/.story_kit/worktrees"); // Code changes inside a worktree must be ignored. assert_eq!( stage_for_path(&worktrees.join("42_story_foo/server/src/main.rs")), None, ); // Even if a worktree happens to contain a path component that looks // like a pipeline stage, it must still be ignored. assert_eq!( stage_for_path(&worktrees.join("42_story_foo/.story_kit/work/2_current/42_story_foo.md")), None, ); // A path that only contains the word "worktrees" as part of a longer // segment (not an exact component) must NOT be filtered out. assert_eq!( stage_for_path(&PathBuf::from("/proj/.story_kit/work/2_current/not_worktrees_story.md")), Some("2_current".to_string()), ); } #[test] fn stage_metadata_returns_correct_actions() { let (action, msg) = stage_metadata("2_current", "42_story_foo").unwrap(); assert_eq!(action, "start"); assert_eq!(msg, "story-kit: start 42_story_foo"); let (action, msg) = stage_metadata("5_archived", "42_story_foo").unwrap(); assert_eq!(action, "accept"); assert_eq!(msg, "story-kit: accept 42_story_foo"); assert!(stage_metadata("unknown", "id").is_none()); } #[test] fn is_config_file_identifies_root_project_toml() { let git_root = PathBuf::from("/proj"); let config = git_root.join(".story_kit").join("project.toml"); assert!(is_config_file(&config, &git_root)); } #[test] fn is_config_file_rejects_worktree_copies() { let git_root = PathBuf::from("/proj"); // project.toml inside a worktree must NOT be treated as the root config. let worktree_config = PathBuf::from( "/proj/.story_kit/worktrees/42_story_foo/.story_kit/project.toml", ); assert!(!is_config_file(&worktree_config, &git_root)); } #[test] fn is_config_file_rejects_other_files() { let git_root = PathBuf::from("/proj"); // Random files must not match. assert!(!is_config_file( &PathBuf::from("/proj/.story_kit/work/2_current/42_story_foo.md"), &git_root )); assert!(!is_config_file( &PathBuf::from("/proj/.story_kit/README.md"), &git_root )); } #[test] fn is_config_file_rejects_wrong_root() { let git_root = PathBuf::from("/proj"); let other_root_config = PathBuf::from("/other/.story_kit/project.toml"); assert!(!is_config_file(&other_root_config, &git_root)); } }