//! Filesystem watcher for `.story_kit/work/`. //! //! Watches the work pipeline directories for file changes, infers the lifecycle //! stage from the target directory name, auto-commits with a deterministic message, //! and broadcasts a [`WatcherEvent`] to all connected WebSocket clients. //! //! # Debouncing //! Events are buffered for 300 ms after the last activity. All changes within the //! window are batched into a single `git add + commit`. This avoids double-commits //! when `fs::rename` fires both a remove and a create event. //! //! # Race conditions //! If a mutation handler (e.g. `move_story_to_current`) already committed the //! change, `git commit` will return "nothing to commit". The watcher detects this //! via exit-code inspection and silently skips the commit while still broadcasting //! the event so connected clients stay in sync. use crate::slog; use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher}; use serde::Serialize; use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::mpsc; use std::time::{Duration, Instant}; use tokio::sync::broadcast; /// A lifecycle event emitted by the filesystem watcher after auto-committing. #[derive(Clone, Debug, Serialize)] pub struct WatcherEvent { /// Pipeline stage directory (e.g. `"2_current"`, `"5_archived"`). pub stage: String, /// Work item ID (filename stem without extension, e.g. `"42_story_my_feature"`). pub item_id: String, /// Semantic action inferred from the stage (e.g. `"start"`, `"accept"`). pub action: String, /// The deterministic git commit message used (or that would have been used). pub commit_msg: String, } /// Map a pipeline directory name to a (action, commit-message-prefix) pair. fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> { let (action, prefix) = match stage { "1_upcoming" => ("create", format!("story-kit: create {item_id}")), "2_current" => ("start", format!("story-kit: start {item_id}")), "3_qa" => ("qa", format!("story-kit: queue {item_id} for QA")), "4_merge" => ("merge", format!("story-kit: queue {item_id} for merge")), "5_archived" => ("accept", format!("story-kit: accept {item_id}")), _ => return None, }; Some((action, prefix)) } /// Return the pipeline stage name for a path if it is a `.md` file living /// directly inside one of the known work subdirectories, otherwise `None`. /// /// Explicitly returns `None` for any path under `.story_kit/worktrees/` so /// that code changes made by agents in their isolated worktrees are never /// auto-committed to master by the watcher. fn stage_for_path(path: &Path) -> Option { // Reject any path that passes through the worktrees directory. if path .components() .any(|c| c.as_os_str() == "worktrees") { return None; } if path.extension().is_none_or(|e| e != "md") { return None; } let stage = path .parent() .and_then(|p| p.file_name()) .and_then(|n| n.to_str())?; matches!(stage, "1_upcoming" | "2_current" | "3_qa" | "4_merge" | "5_archived") .then(|| stage.to_string()) } /// Stage all changes in the work directory and commit with the given message. /// /// Uses `git add -A .story_kit/work/` to catch both additions and deletions in /// a single commit. Returns `Ok(true)` if a commit was made, `Ok(false)` if /// there was nothing to commit, and `Err` for unexpected failures. fn git_add_work_and_commit(git_root: &Path, message: &str) -> Result { let work_rel = PathBuf::from(".story_kit").join("work"); let add_out = std::process::Command::new("git") .args(["add", "-A"]) .arg(&work_rel) .current_dir(git_root) .output() .map_err(|e| format!("git add: {e}"))?; if !add_out.status.success() { return Err(format!( "git add failed: {}", String::from_utf8_lossy(&add_out.stderr) )); } let commit_out = std::process::Command::new("git") .args(["commit", "-m", message]) .current_dir(git_root) .output() .map_err(|e| format!("git commit: {e}"))?; if commit_out.status.success() { return Ok(true); } let stderr = String::from_utf8_lossy(&commit_out.stderr); let stdout = String::from_utf8_lossy(&commit_out.stdout); if stdout.contains("nothing to commit") || stderr.contains("nothing to commit") { return Ok(false); } Err(format!("git commit failed: {stderr}")) } /// Process a batch of pending (path → stage) entries: commit and broadcast. /// /// Only files that still exist on disk are used to derive the commit message /// (they represent the destination of a move or a new file). Deletions are /// captured by `git add -A .story_kit/work/` automatically. fn flush_pending( pending: &HashMap, git_root: &Path, event_tx: &broadcast::Sender, ) { // Separate into files that exist (additions) vs gone (deletions). let mut additions: Vec<(&PathBuf, &str)> = Vec::new(); for (path, stage) in pending { if path.exists() { additions.push((path, stage.as_str())); } } // Pick the commit message from the first addition (the meaningful side of a move). // If there are only deletions, use a generic message. let (action, item_id, commit_msg) = if let Some((path, stage)) = additions.first() { let item = path.file_stem().and_then(|s| s.to_str()).unwrap_or("unknown"); if let Some((act, msg)) = stage_metadata(stage, item) { (act, item.to_string(), msg) } else { return; } } else { // Only deletions — pick any pending path for the item name. let Some((path, _)) = pending.iter().next() else { return; }; let item = path.file_stem().and_then(|s| s.to_str()).unwrap_or("unknown"); ("remove", item.to_string(), format!("story-kit: remove {item}")) }; slog!("[watcher] flush: {commit_msg}"); match git_add_work_and_commit(git_root, &commit_msg) { Ok(committed) => { if committed { slog!("[watcher] committed: {commit_msg}"); } else { slog!("[watcher] skipped (already committed): {commit_msg}"); } let stage = additions.first().map_or("unknown", |(_, s)| s); let evt = WatcherEvent { stage: stage.to_string(), item_id, action: action.to_string(), commit_msg, }; let _ = event_tx.send(evt); } Err(e) => { slog!("[watcher] git error: {e}"); } } } /// Start the filesystem watcher on a dedicated OS thread. /// /// `work_dir` — absolute path to `.story_kit/work/` (watched recursively). /// `git_root` — project root (passed to `git` commands as cwd). /// `event_tx` — broadcast sender; each connected WebSocket client holds a receiver. pub fn start_watcher( work_dir: PathBuf, git_root: PathBuf, event_tx: broadcast::Sender, ) { std::thread::spawn(move || { let (notify_tx, notify_rx) = mpsc::channel::>(); let mut watcher: RecommendedWatcher = match recommended_watcher(move |res| { let _ = notify_tx.send(res); }) { Ok(w) => w, Err(e) => { slog!("[watcher] failed to create watcher: {e}"); return; } }; if let Err(e) = watcher.watch(&work_dir, RecursiveMode::Recursive) { slog!("[watcher] failed to watch {}: {e}", work_dir.display()); return; } slog!("[watcher] watching {}", work_dir.display()); const DEBOUNCE: Duration = Duration::from_millis(300); // Map path → stage for pending (uncommitted) changes. let mut pending: HashMap = HashMap::new(); let mut deadline: Option = None; loop { // How long until the debounce window closes (or wait for next event). let timeout = deadline.map_or(Duration::from_secs(60), |d| { d.saturating_duration_since(Instant::now()) }); let flush = match notify_rx.recv_timeout(timeout) { Ok(Ok(event)) => { // Track creates, modifies, AND removes. Removes are needed so // that standalone deletions trigger a flush, and so that moves // (which fire Remove + Create) land in the same debounce window. let is_relevant_kind = matches!( event.kind, EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_) ); if is_relevant_kind { for path in event.paths { if let Some(stage) = stage_for_path(&path) { pending.insert(path, stage); deadline = Some(Instant::now() + DEBOUNCE); } } } false } Ok(Err(e)) => { slog!("[watcher] notify error: {e}"); false } // Debounce window expired — time to flush. Err(mpsc::RecvTimeoutError::Timeout) => true, Err(mpsc::RecvTimeoutError::Disconnected) => { slog!("[watcher] channel disconnected, shutting down"); break; } }; if flush && !pending.is_empty() { flush_pending(&pending, &git_root, &event_tx); pending.clear(); deadline = None; } } }); } #[cfg(test)] mod tests { use super::*; #[test] fn stage_for_path_recognises_pipeline_dirs() { let base = PathBuf::from("/proj/.story_kit/work"); assert_eq!( stage_for_path(&base.join("2_current/42_story_foo.md")), Some("2_current".to_string()) ); assert_eq!( stage_for_path(&base.join("5_archived/10_bug_bar.md")), Some("5_archived".to_string()) ); assert_eq!(stage_for_path(&base.join("other/file.md")), None); assert_eq!( stage_for_path(&base.join("2_current/42_story_foo.txt")), None ); } #[test] fn stage_for_path_ignores_worktree_paths() { let worktrees = PathBuf::from("/proj/.story_kit/worktrees"); // Code changes inside a worktree must be ignored. assert_eq!( stage_for_path(&worktrees.join("42_story_foo/server/src/main.rs")), None, ); // Even if a worktree happens to contain a path component that looks // like a pipeline stage, it must still be ignored. assert_eq!( stage_for_path(&worktrees.join("42_story_foo/.story_kit/work/2_current/42_story_foo.md")), None, ); // A path that only contains the word "worktrees" as part of a longer // segment (not an exact component) must NOT be filtered out. assert_eq!( stage_for_path(&PathBuf::from("/proj/.story_kit/work/2_current/not_worktrees_story.md")), Some("2_current".to_string()), ); } #[test] fn stage_metadata_returns_correct_actions() { let (action, msg) = stage_metadata("2_current", "42_story_foo").unwrap(); assert_eq!(action, "start"); assert_eq!(msg, "story-kit: start 42_story_foo"); let (action, msg) = stage_metadata("5_archived", "42_story_foo").unwrap(); assert_eq!(action, "accept"); assert_eq!(msg, "story-kit: accept 42_story_foo"); assert!(stage_metadata("unknown", "id").is_none()); } }