Spike 61: filesystem watcher and UI simplification
Add notify-based filesystem watcher for .story_kit/work/ that auto-commits changes with deterministic messages and broadcasts events over WebSocket. Push full pipeline state (Upcoming, Current, QA, To Merge) to frontend on connect and after every watcher event. Strip dead UI: remove ReviewPanel, GatePanel, TodoPanel, UpcomingPanel and all associated REST polling. Replace with 4 generic StagePanel components driven by WebSocket. Simplify AgentPanel to roster-only. Delete all 11 workflow HTTP endpoints and 16 request/response types from the server. Clean dead code from workflow module. MCP tools call Rust functions directly and need none of the HTTP layer. Net: ~4,100 lines deleted, ~400 added. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,3 +2,4 @@ pub mod fs;
|
||||
pub mod search;
|
||||
pub mod shell;
|
||||
pub mod story_metadata;
|
||||
pub mod watcher;
|
||||
|
||||
282
server/src/io/watcher.rs
Normal file
282
server/src/io/watcher.rs
Normal file
@@ -0,0 +1,282 @@
|
||||
//! Filesystem watcher for `.story_kit/work/`.
|
||||
//!
|
||||
//! Watches the work pipeline directories for file changes, infers the lifecycle
|
||||
//! stage from the target directory name, auto-commits with a deterministic message,
|
||||
//! and broadcasts a [`WatcherEvent`] to all connected WebSocket clients.
|
||||
//!
|
||||
//! # Debouncing
|
||||
//! Events are buffered for 300 ms after the last activity. All changes within the
|
||||
//! window are batched into a single `git add + commit`. This avoids double-commits
|
||||
//! when `fs::rename` fires both a remove and a create event.
|
||||
//!
|
||||
//! # Race conditions
|
||||
//! If a mutation handler (e.g. `move_story_to_current`) already committed the
|
||||
//! change, `git commit` will return "nothing to commit". The watcher detects this
|
||||
//! via exit-code inspection and silently skips the commit while still broadcasting
|
||||
//! the event so connected clients stay in sync.
|
||||
|
||||
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher};
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
/// A lifecycle event emitted by the filesystem watcher after auto-committing.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct WatcherEvent {
|
||||
/// Pipeline stage directory (e.g. `"2_current"`, `"5_archived"`).
|
||||
pub stage: String,
|
||||
/// Work item ID (filename stem without extension, e.g. `"42_story_my_feature"`).
|
||||
pub item_id: String,
|
||||
/// Semantic action inferred from the stage (e.g. `"start"`, `"accept"`).
|
||||
pub action: String,
|
||||
/// The deterministic git commit message used (or that would have been used).
|
||||
pub commit_msg: String,
|
||||
}
|
||||
|
||||
/// Map a pipeline directory name to a (action, commit-message-prefix) pair.
|
||||
fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> {
|
||||
let (action, prefix) = match stage {
|
||||
"1_upcoming" => ("create", format!("story-kit: create {item_id}")),
|
||||
"2_current" => ("start", format!("story-kit: start {item_id}")),
|
||||
"3_qa" => ("qa", format!("story-kit: queue {item_id} for QA")),
|
||||
"4_merge" => ("merge", format!("story-kit: queue {item_id} for merge")),
|
||||
"5_archived" => ("accept", format!("story-kit: accept {item_id}")),
|
||||
_ => return None,
|
||||
};
|
||||
Some((action, prefix))
|
||||
}
|
||||
|
||||
/// Return the pipeline stage name for a path if it is a `.md` file living
|
||||
/// directly inside one of the known work subdirectories, otherwise `None`.
|
||||
fn stage_for_path(path: &Path) -> Option<String> {
|
||||
if path.extension().is_none_or(|e| e != "md") {
|
||||
return None;
|
||||
}
|
||||
let stage = path
|
||||
.parent()
|
||||
.and_then(|p| p.file_name())
|
||||
.and_then(|n| n.to_str())?;
|
||||
matches!(stage, "1_upcoming" | "2_current" | "3_qa" | "4_merge" | "5_archived")
|
||||
.then(|| stage.to_string())
|
||||
}
|
||||
|
||||
/// Stage all changes in the work directory and commit with the given message.
|
||||
///
|
||||
/// Uses `git add -A .story_kit/work/` to catch both additions and deletions in
|
||||
/// a single commit. Returns `Ok(true)` if a commit was made, `Ok(false)` if
|
||||
/// there was nothing to commit, and `Err` for unexpected failures.
|
||||
fn git_add_work_and_commit(git_root: &Path, message: &str) -> Result<bool, String> {
|
||||
let work_rel = PathBuf::from(".story_kit").join("work");
|
||||
|
||||
let add_out = std::process::Command::new("git")
|
||||
.args(["add", "-A"])
|
||||
.arg(&work_rel)
|
||||
.current_dir(git_root)
|
||||
.output()
|
||||
.map_err(|e| format!("git add: {e}"))?;
|
||||
if !add_out.status.success() {
|
||||
return Err(format!(
|
||||
"git add failed: {}",
|
||||
String::from_utf8_lossy(&add_out.stderr)
|
||||
));
|
||||
}
|
||||
|
||||
let commit_out = std::process::Command::new("git")
|
||||
.args(["commit", "-m", message])
|
||||
.current_dir(git_root)
|
||||
.output()
|
||||
.map_err(|e| format!("git commit: {e}"))?;
|
||||
|
||||
if commit_out.status.success() {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let stderr = String::from_utf8_lossy(&commit_out.stderr);
|
||||
let stdout = String::from_utf8_lossy(&commit_out.stdout);
|
||||
if stdout.contains("nothing to commit") || stderr.contains("nothing to commit") {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
Err(format!("git commit failed: {stderr}"))
|
||||
}
|
||||
|
||||
/// Process a batch of pending (path → stage) entries: commit and broadcast.
|
||||
///
|
||||
/// Only files that still exist on disk are used to derive the commit message
|
||||
/// (they represent the destination of a move or a new file). Deletions are
|
||||
/// captured by `git add -A .story_kit/work/` automatically.
|
||||
fn flush_pending(
|
||||
pending: &HashMap<PathBuf, String>,
|
||||
git_root: &Path,
|
||||
event_tx: &broadcast::Sender<WatcherEvent>,
|
||||
) {
|
||||
// Separate into files that exist (additions) vs gone (deletions).
|
||||
let mut additions: Vec<(&PathBuf, &str)> = Vec::new();
|
||||
for (path, stage) in pending {
|
||||
if path.exists() {
|
||||
additions.push((path, stage.as_str()));
|
||||
}
|
||||
}
|
||||
|
||||
// Pick the commit message from the first addition (the meaningful side of a move).
|
||||
// If there are only deletions, use a generic message.
|
||||
let (action, item_id, commit_msg) = if let Some((path, stage)) = additions.first() {
|
||||
let item = path.file_stem().and_then(|s| s.to_str()).unwrap_or("unknown");
|
||||
if let Some((act, msg)) = stage_metadata(stage, item) {
|
||||
(act, item.to_string(), msg)
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// Only deletions — pick any pending path for the item name.
|
||||
let Some((path, _)) = pending.iter().next() else {
|
||||
return;
|
||||
};
|
||||
let item = path.file_stem().and_then(|s| s.to_str()).unwrap_or("unknown");
|
||||
("remove", item.to_string(), format!("story-kit: remove {item}"))
|
||||
};
|
||||
|
||||
eprintln!("[watcher] flush: {commit_msg}");
|
||||
match git_add_work_and_commit(git_root, &commit_msg) {
|
||||
Ok(committed) => {
|
||||
if committed {
|
||||
eprintln!("[watcher] committed: {commit_msg}");
|
||||
} else {
|
||||
eprintln!("[watcher] skipped (already committed): {commit_msg}");
|
||||
}
|
||||
let stage = additions.first().map_or("unknown", |(_, s)| s);
|
||||
let evt = WatcherEvent {
|
||||
stage: stage.to_string(),
|
||||
item_id,
|
||||
action: action.to_string(),
|
||||
commit_msg,
|
||||
};
|
||||
let _ = event_tx.send(evt);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("[watcher] git error: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Start the filesystem watcher on a dedicated OS thread.
|
||||
///
|
||||
/// `work_dir` — absolute path to `.story_kit/work/` (watched recursively).
|
||||
/// `git_root` — project root (passed to `git` commands as cwd).
|
||||
/// `event_tx` — broadcast sender; each connected WebSocket client holds a receiver.
|
||||
pub fn start_watcher(
|
||||
work_dir: PathBuf,
|
||||
git_root: PathBuf,
|
||||
event_tx: broadcast::Sender<WatcherEvent>,
|
||||
) {
|
||||
std::thread::spawn(move || {
|
||||
let (notify_tx, notify_rx) = mpsc::channel::<notify::Result<notify::Event>>();
|
||||
|
||||
let mut watcher: RecommendedWatcher = match recommended_watcher(move |res| {
|
||||
let _ = notify_tx.send(res);
|
||||
}) {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
eprintln!("[watcher] failed to create watcher: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = watcher.watch(&work_dir, RecursiveMode::Recursive) {
|
||||
eprintln!("[watcher] failed to watch {}: {e}", work_dir.display());
|
||||
return;
|
||||
}
|
||||
|
||||
eprintln!("[watcher] watching {}", work_dir.display());
|
||||
|
||||
const DEBOUNCE: Duration = Duration::from_millis(300);
|
||||
|
||||
// Map path → stage for pending (uncommitted) changes.
|
||||
let mut pending: HashMap<PathBuf, String> = HashMap::new();
|
||||
let mut deadline: Option<Instant> = None;
|
||||
|
||||
loop {
|
||||
// How long until the debounce window closes (or wait for next event).
|
||||
let timeout = deadline.map_or(Duration::from_secs(60), |d| {
|
||||
d.saturating_duration_since(Instant::now())
|
||||
});
|
||||
|
||||
let flush = match notify_rx.recv_timeout(timeout) {
|
||||
Ok(Ok(event)) => {
|
||||
// Track creates, modifies, AND removes. Removes are needed so
|
||||
// that standalone deletions trigger a flush, and so that moves
|
||||
// (which fire Remove + Create) land in the same debounce window.
|
||||
let is_relevant_kind = matches!(
|
||||
event.kind,
|
||||
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
|
||||
);
|
||||
|
||||
if is_relevant_kind {
|
||||
for path in event.paths {
|
||||
if let Some(stage) = stage_for_path(&path) {
|
||||
pending.insert(path, stage);
|
||||
deadline = Some(Instant::now() + DEBOUNCE);
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
eprintln!("[watcher] notify error: {e}");
|
||||
false
|
||||
}
|
||||
// Debounce window expired — time to flush.
|
||||
Err(mpsc::RecvTimeoutError::Timeout) => true,
|
||||
Err(mpsc::RecvTimeoutError::Disconnected) => {
|
||||
eprintln!("[watcher] channel disconnected, shutting down");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if flush && !pending.is_empty() {
|
||||
flush_pending(&pending, &git_root, &event_tx);
|
||||
pending.clear();
|
||||
deadline = None;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn stage_for_path_recognises_pipeline_dirs() {
|
||||
let base = PathBuf::from("/proj/.story_kit/work");
|
||||
assert_eq!(
|
||||
stage_for_path(&base.join("2_current/42_story_foo.md")),
|
||||
Some("2_current".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
stage_for_path(&base.join("5_archived/10_bug_bar.md")),
|
||||
Some("5_archived".to_string())
|
||||
);
|
||||
assert_eq!(stage_for_path(&base.join("other/file.md")), None);
|
||||
assert_eq!(
|
||||
stage_for_path(&base.join("2_current/42_story_foo.txt")),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_metadata_returns_correct_actions() {
|
||||
let (action, msg) = stage_metadata("2_current", "42_story_foo").unwrap();
|
||||
assert_eq!(action, "start");
|
||||
assert_eq!(msg, "story-kit: start 42_story_foo");
|
||||
|
||||
let (action, msg) = stage_metadata("5_archived", "42_story_foo").unwrap();
|
||||
assert_eq!(action, "accept");
|
||||
assert_eq!(msg, "story-kit: accept 42_story_foo");
|
||||
|
||||
assert!(stage_metadata("unknown", "id").is_none());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user