Files
huskies/server/src/io/watcher/mod.rs
T

162 lines
6.5 KiB
Rust
Raw Normal View History

//! Filesystem watcher for `.huskies/project.toml` and `.huskies/agents.toml`.
//!
//! Watches config files for changes and broadcasts a [`WatcherEvent`] to all
//! connected WebSocket clients so the frontend can reload the agent roster
//! without a server restart.
//!
//! Work-item pipeline events (stage transitions) are driven by CRDT state
//! changes via [`crate::crdt_state::subscribe`], not by filesystem events.
//!
//! # Debouncing
//! Config-file events are buffered for 300 ms after the last activity to avoid
//! duplicate broadcasts when an editor writes multiple events in quick succession.
//!
//! # Submodules
//! - [`events`] — [`WatcherEvent`] enum definition.
//! - [`sweep`] — periodic sweep of `5_done` → `6_archived`.
mod events;
mod sweep;
pub use events::WatcherEvent;
pub(crate) use sweep::sweep_done_to_archived;
use crate::slog;
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
/// Return `true` if `path` is the root-level `.huskies/project.toml` or
/// `.huskies/agents.toml`, i.e. `{git_root}/.huskies/{project,agents}.toml`.
///
/// Returns `false` for paths inside worktree directories (paths containing
/// a `worktrees` component).
pub fn is_config_file(path: &Path, git_root: &Path) -> bool {
// Reject any path that passes through the worktrees directory.
if path.components().any(|c| c.as_os_str() == "worktrees") {
return false;
}
let huskies = git_root.join(".huskies");
path == huskies.join("project.toml") || path == huskies.join("agents.toml")
}
/// Map a pipeline directory name to a (action, commit-message-prefix) pair.
///
/// Used by the CRDT-to-watcher bridge (in `main.rs`) to derive the action and
/// commit message for `WatcherEvent::WorkItem` events.
pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> {
use crate::pipeline_state::Stage;
let (action, msg) = match Stage::from_dir(stage)? {
Stage::Backlog => ("create", format!("huskies: create {item_id}")),
Stage::Coding => ("start", format!("huskies: start {item_id}")),
Stage::Qa => ("qa", format!("huskies: queue {item_id} for QA")),
Stage::Merge { .. } => ("merge", format!("huskies: queue {item_id} for merge")),
Stage::Done { .. } => ("done", format!("huskies: done {item_id}")),
Stage::Archived { .. } => ("accept", format!("huskies: accept {item_id}")),
};
Some((action, msg))
}
/// Start the filesystem watcher on a dedicated OS thread.
///
/// Watches `.huskies/project.toml` and `.huskies/agents.toml` for config
/// hot-reload, and periodically sweeps `5_done/` → `6_archived/` via CRDT.
///
/// Work-item pipeline events (stage transitions) are no longer driven by
/// filesystem events — they originate from CRDT state changes via
/// [`crate::crdt_state::subscribe`].
///
/// `git_root` — project root (passed to `git` commands and config loading).
/// `event_tx` — broadcast sender for `ConfigChanged` events.
pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender<WatcherEvent>) {
std::thread::spawn(move || {
let (notify_tx, notify_rx) = mpsc::channel::<notify::Result<notify::Event>>();
let mut watcher: RecommendedWatcher = match recommended_watcher(move |res| {
let _ = notify_tx.send(res);
}) {
Ok(w) => w,
Err(e) => {
slog!("[watcher] failed to create watcher: {e}");
return;
}
};
// Watch config files for hot-reload. Work-item directories are NOT
// watched — CRDT state transitions drive pipeline events now.
let huskies = git_root.join(".huskies");
for config_file in [huskies.join("project.toml"), huskies.join("agents.toml")] {
if config_file.exists()
&& let Err(e) = watcher.watch(&config_file, RecursiveMode::NonRecursive)
{
slog!(
"[watcher] failed to watch config file {}: {e}",
config_file.display()
);
}
}
slog!("[watcher] watching config files for hot-reload");
const DEBOUNCE: Duration = Duration::from_millis(300);
// Whether a config file change is pending in the current debounce window.
let mut config_changed_pending = false;
let mut deadline: Option<Instant> = None;
loop {
// How long until the debounce window closes (or wait for next event).
let timeout = deadline.map_or(Duration::from_secs(60), |d| {
d.saturating_duration_since(Instant::now())
});
let flush = match notify_rx.recv_timeout(timeout) {
Ok(Ok(event)) => {
let is_relevant_kind = matches!(
event.kind,
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
);
if is_relevant_kind {
for path in event.paths {
if is_config_file(&path, &git_root) {
slog!("[watcher] config change detected: {}", path.display());
config_changed_pending = true;
deadline = Some(Instant::now() + DEBOUNCE);
}
// Work-item file changes are intentionally ignored.
// CRDT state transitions handle pipeline events.
}
}
false
}
Ok(Err(e)) => {
slog!("[watcher] notify error: {e}");
false
}
// Debounce window expired — time to flush.
Err(mpsc::RecvTimeoutError::Timeout) => true,
Err(mpsc::RecvTimeoutError::Disconnected) => {
slog!("[watcher] channel disconnected, shutting down");
break;
}
};
if flush {
if config_changed_pending {
slog!("[watcher] broadcasting agent_config_changed");
let _ = event_tx.send(WatcherEvent::ConfigChanged);
config_changed_pending = false;
}
deadline = None;
}
}
});
}
#[cfg(test)]
mod tests;