huskies: merge 686_refactor_decompose_server_src_io_watcher_rs_1202_lines
Manual merge resolution: feature branch deleted watcher.rs and split
into watcher/{mod,events,sweep,tests}.rs, while master modified the
old watcher.rs (738's FS-shadow stripping). The auto-resolver kept
both, producing an ambiguous-module compile error.
Resolution: drop watcher.rs (feature's delete wins). The new
watcher/mod.rs absorbs the FS-shadow code semantically — gates pass
(cargo check, clippy --all-targets -D warnings, fmt --check, 29/29
io::watcher tests).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,473 +0,0 @@
|
||||
//! Filesystem watcher for `.huskies/project.toml` and `.huskies/agents.toml`.
|
||||
//!
|
||||
//! Watches config files for hot-reload and broadcasts [`WatcherEvent::ConfigChanged`]
|
||||
//! so the frontend can reload the agent roster without a server restart.
|
||||
//!
|
||||
//! Work-item pipeline events are driven by CRDT state transitions, not filesystem
|
||||
//! events. The watcher also periodically sweeps `5_done/` → `6_archived/`.
|
||||
|
||||
use crate::slog;
|
||||
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher};
|
||||
use serde::Serialize;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
/// A lifecycle event emitted by the filesystem watcher.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum WatcherEvent {
|
||||
/// A work-pipeline file was created, modified, or deleted.
|
||||
WorkItem {
|
||||
/// Pipeline stage directory (e.g. `"2_current"`, `"5_archived"`).
|
||||
stage: String,
|
||||
/// Work item ID (filename stem without extension, e.g. `"42_story_my_feature"`).
|
||||
item_id: String,
|
||||
/// Semantic action inferred from the stage (e.g. `"start"`, `"accept"`).
|
||||
action: String,
|
||||
/// The deterministic git commit message used (or that would have been used).
|
||||
commit_msg: String,
|
||||
/// The pipeline stage the item moved FROM, populated for move operations.
|
||||
/// `None` for creations, deletions, or synthetic events.
|
||||
from_stage: Option<String>,
|
||||
},
|
||||
/// `.huskies/project.toml` was modified at the project root (not inside a worktree).
|
||||
ConfigChanged,
|
||||
/// An agent's state changed (started, stopped, completed, etc.).
|
||||
/// Triggers a pipeline state refresh so the frontend can update agent
|
||||
/// assignments without waiting for a filesystem event.
|
||||
AgentStateChanged,
|
||||
/// A story encountered a failure (e.g. merge failure).
|
||||
/// Triggers an error notification to configured Matrix rooms.
|
||||
MergeFailure {
|
||||
/// Work item ID (e.g. `"42_story_my_feature"`).
|
||||
story_id: String,
|
||||
/// Human-readable description of the failure.
|
||||
reason: String,
|
||||
},
|
||||
/// An agent hit an API rate limit.
|
||||
/// Triggers a warning notification to configured chat rooms.
|
||||
RateLimitWarning {
|
||||
/// Work item ID the agent is working on.
|
||||
story_id: String,
|
||||
/// Name of the agent that hit the rate limit.
|
||||
agent_name: String,
|
||||
},
|
||||
/// A story has been blocked (e.g. retry limit exceeded, empty diff).
|
||||
/// Triggers a warning notification to configured chat rooms.
|
||||
StoryBlocked {
|
||||
/// Work item ID (e.g. `"42_story_my_feature"`).
|
||||
story_id: String,
|
||||
/// Human-readable reason the story was blocked.
|
||||
reason: String,
|
||||
},
|
||||
/// An agent hit a hard API rate limit and will be blocked until `reset_at`.
|
||||
/// Triggers auto-scheduling of a timer and a notification with the resume time.
|
||||
RateLimitHardBlock {
|
||||
/// Work item ID the agent is working on.
|
||||
story_id: String,
|
||||
/// Name of the agent that hit the hard rate limit.
|
||||
agent_name: String,
|
||||
/// UTC instant at which the rate limit resets.
|
||||
reset_at: chrono::DateTime<chrono::Utc>,
|
||||
},
|
||||
/// An OAuth account pool swap succeeded: a different account is now active.
|
||||
/// Triggers a notification to chat transports naming the new account.
|
||||
OAuthAccountSwapped {
|
||||
/// Email address of the newly activated account.
|
||||
new_email: String,
|
||||
},
|
||||
/// All OAuth accounts in the pool are rate-limited — no swap was possible.
|
||||
/// Triggers a notification to chat transports with the earliest reset time.
|
||||
OAuthAccountsExhausted {
|
||||
/// Human-readable message describing when the earliest reset occurs.
|
||||
earliest_reset_msg: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// Return `true` if `path` is the root-level `.huskies/project.toml` or
|
||||
/// `.huskies/agents.toml`, i.e. `{git_root}/.huskies/{project,agents}.toml`.
|
||||
///
|
||||
/// Returns `false` for paths inside worktree directories (paths containing
|
||||
/// a `worktrees` component).
|
||||
pub fn is_config_file(path: &Path, git_root: &Path) -> bool {
|
||||
// Reject any path that passes through the worktrees directory.
|
||||
if path.components().any(|c| c.as_os_str() == "worktrees") {
|
||||
return false;
|
||||
}
|
||||
let huskies = git_root.join(".huskies");
|
||||
path == huskies.join("project.toml") || path == huskies.join("agents.toml")
|
||||
}
|
||||
|
||||
/// Map a pipeline directory name to a (action, commit-message-prefix) pair.
|
||||
///
|
||||
/// Used by the CRDT-to-watcher bridge (in `main.rs`) to derive the action and
|
||||
/// commit message for `WatcherEvent::WorkItem` events.
|
||||
pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> {
|
||||
use crate::pipeline_state::Stage;
|
||||
let (action, msg) = match Stage::from_dir(stage)? {
|
||||
Stage::Backlog => ("create", format!("huskies: create {item_id}")),
|
||||
Stage::Coding => ("start", format!("huskies: start {item_id}")),
|
||||
Stage::Qa => ("qa", format!("huskies: queue {item_id} for QA")),
|
||||
Stage::Merge { .. } => ("merge", format!("huskies: queue {item_id} for merge")),
|
||||
Stage::Done { .. } => ("done", format!("huskies: done {item_id}")),
|
||||
Stage::Archived { .. } => ("accept", format!("huskies: accept {item_id}")),
|
||||
};
|
||||
Some((action, msg))
|
||||
}
|
||||
|
||||
/// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention
|
||||
/// duration to `6_archived` via CRDT state transitions.
|
||||
///
|
||||
/// All state is read from and written to CRDT — no filesystem access.
|
||||
/// Worktree pruning is handled separately by the CRDT event subscriber.
|
||||
pub(crate) fn sweep_done_to_archived(done_retention: Duration) {
|
||||
use crate::pipeline_state::{PipelineEvent, Stage, read_all_typed, stage_dir_name, transition};
|
||||
|
||||
for item in read_all_typed() {
|
||||
if let Stage::Done { merged_at, .. } = &item.stage {
|
||||
let age = chrono::Utc::now()
|
||||
.signed_duration_since(*merged_at)
|
||||
.to_std()
|
||||
.unwrap_or_default();
|
||||
if age >= done_retention {
|
||||
let story_id = item.story_id.0.clone();
|
||||
match transition(item.stage.clone(), PipelineEvent::Accepted) {
|
||||
Ok(new_stage) => {
|
||||
crate::crdt_state::write_item(
|
||||
&story_id,
|
||||
stage_dir_name(&new_stage),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(false),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
slog!("[watcher] sweep: promoted {story_id} → 6_archived/");
|
||||
}
|
||||
Err(e) => {
|
||||
slog!("[watcher] sweep: transition error for {story_id}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Start the filesystem watcher on a dedicated OS thread.
|
||||
///
|
||||
/// Watches `.huskies/project.toml` and `.huskies/agents.toml` for config
|
||||
/// hot-reload, and periodically sweeps `5_done/` → `6_archived/` via CRDT.
|
||||
///
|
||||
/// Work-item pipeline events (stage transitions) are no longer driven by
|
||||
/// filesystem events — they originate from CRDT state changes via
|
||||
/// [`crate::crdt_state::subscribe`].
|
||||
///
|
||||
/// `git_root` — project root (passed to `git` commands and config loading).
|
||||
/// `event_tx` — broadcast sender for `ConfigChanged` events.
|
||||
/// `watcher_config` — initial sweep configuration loaded from `project.toml`.
|
||||
pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender<WatcherEvent>) {
|
||||
std::thread::spawn(move || {
|
||||
let (notify_tx, notify_rx) = mpsc::channel::<notify::Result<notify::Event>>();
|
||||
|
||||
let mut watcher: RecommendedWatcher = match recommended_watcher(move |res| {
|
||||
let _ = notify_tx.send(res);
|
||||
}) {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
slog!("[watcher] failed to create watcher: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Watch config files for hot-reload. Work-item directories are NOT
|
||||
// watched — CRDT state transitions drive pipeline events now.
|
||||
let huskies = git_root.join(".huskies");
|
||||
for config_file in [huskies.join("project.toml"), huskies.join("agents.toml")] {
|
||||
if config_file.exists()
|
||||
&& let Err(e) = watcher.watch(&config_file, RecursiveMode::NonRecursive)
|
||||
{
|
||||
slog!(
|
||||
"[watcher] failed to watch config file {}: {e}",
|
||||
config_file.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
slog!("[watcher] watching config files for hot-reload");
|
||||
|
||||
const DEBOUNCE: Duration = Duration::from_millis(300);
|
||||
|
||||
// Whether a config file change is pending in the current debounce window.
|
||||
let mut config_changed_pending = false;
|
||||
let mut deadline: Option<Instant> = None;
|
||||
|
||||
loop {
|
||||
// How long until the debounce window closes (or wait for next event).
|
||||
let timeout = deadline.map_or(Duration::from_secs(60), |d| {
|
||||
d.saturating_duration_since(Instant::now())
|
||||
});
|
||||
|
||||
let flush = match notify_rx.recv_timeout(timeout) {
|
||||
Ok(Ok(event)) => {
|
||||
let is_relevant_kind = matches!(
|
||||
event.kind,
|
||||
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
|
||||
);
|
||||
|
||||
if is_relevant_kind {
|
||||
for path in event.paths {
|
||||
if is_config_file(&path, &git_root) {
|
||||
slog!("[watcher] config change detected: {}", path.display());
|
||||
config_changed_pending = true;
|
||||
deadline = Some(Instant::now() + DEBOUNCE);
|
||||
}
|
||||
// Work-item file changes are intentionally ignored.
|
||||
// CRDT state transitions handle pipeline events.
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
slog!("[watcher] notify error: {e}");
|
||||
false
|
||||
}
|
||||
// Debounce window expired — time to flush.
|
||||
Err(mpsc::RecvTimeoutError::Timeout) => true,
|
||||
Err(mpsc::RecvTimeoutError::Disconnected) => {
|
||||
slog!("[watcher] channel disconnected, shutting down");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if flush {
|
||||
if config_changed_pending {
|
||||
slog!("[watcher] broadcasting agent_config_changed");
|
||||
let _ = event_tx.send(WatcherEvent::ConfigChanged);
|
||||
|
||||
config_changed_pending = false;
|
||||
}
|
||||
deadline = None;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn stage_metadata_returns_correct_actions() {
|
||||
let (action, msg) = stage_metadata("2_current", "42_story_foo").unwrap();
|
||||
assert_eq!(action, "start");
|
||||
assert_eq!(msg, "huskies: start 42_story_foo");
|
||||
|
||||
let (action, msg) = stage_metadata("5_done", "42_story_foo").unwrap();
|
||||
assert_eq!(action, "done");
|
||||
assert_eq!(msg, "huskies: done 42_story_foo");
|
||||
|
||||
let (action, msg) = stage_metadata("6_archived", "42_story_foo").unwrap();
|
||||
assert_eq!(action, "accept");
|
||||
assert_eq!(msg, "huskies: accept 42_story_foo");
|
||||
|
||||
assert!(stage_metadata("unknown", "id").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_identifies_root_project_toml() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
let config = git_root.join(".huskies").join("project.toml");
|
||||
assert!(is_config_file(&config, &git_root));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_identifies_root_agents_toml() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
let agents = git_root.join(".huskies").join("agents.toml");
|
||||
assert!(is_config_file(&agents, &git_root));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_rejects_worktree_copies() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
// project.toml inside a worktree must NOT be treated as the root config.
|
||||
let worktree_config =
|
||||
PathBuf::from("/proj/.huskies/worktrees/42_story_foo/.huskies/project.toml");
|
||||
assert!(!is_config_file(&worktree_config, &git_root));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_rejects_other_files() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
// Random files must not match.
|
||||
assert!(!is_config_file(
|
||||
&PathBuf::from("/proj/.huskies/work/2_current/42_story_foo.md"),
|
||||
&git_root
|
||||
));
|
||||
assert!(!is_config_file(
|
||||
&PathBuf::from("/proj/.huskies/README.md"),
|
||||
&git_root
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_rejects_wrong_root() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
let other_root_config = PathBuf::from("/other/.huskies/project.toml");
|
||||
assert!(!is_config_file(&other_root_config, &git_root));
|
||||
}
|
||||
|
||||
// ── sweep_done_to_archived (CRDT-based) ─────────────────────────────────
|
||||
//
|
||||
// The sweep function reads from `read_all_typed()` and checks
|
||||
// `Stage::Done { merged_at, .. }`. Items created via
|
||||
// `write_item_with_content("5_done")` project `merged_at = Utc::now()`,
|
||||
// so we test with Duration::ZERO to sweep immediately and with a long
|
||||
// retention to verify items are kept. No filesystem access is involved.
|
||||
|
||||
#[test]
|
||||
fn sweep_moves_old_items_to_archived() {
|
||||
crate::db::ensure_content_store();
|
||||
crate::db::write_item_with_content(
|
||||
"9880_story_sweep_old",
|
||||
"5_done",
|
||||
"---\nname: old\n---\n",
|
||||
);
|
||||
|
||||
// With ZERO retention, any Done item should be swept.
|
||||
sweep_done_to_archived(Duration::ZERO);
|
||||
|
||||
// Verify the item was moved to 6_archived in the CRDT.
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9880_story_sweep_old");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
|
||||
"item should be archived after sweep"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sweep_keeps_recent_items_in_done() {
|
||||
crate::db::ensure_content_store();
|
||||
crate::db::write_item_with_content(
|
||||
"9881_story_sweep_new",
|
||||
"5_done",
|
||||
"---\nname: new\n---\n",
|
||||
);
|
||||
|
||||
// With a very long retention, the item (merged_at ≈ now) should stay.
|
||||
sweep_done_to_archived(Duration::from_secs(999_999));
|
||||
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9881_story_sweep_new");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })),
|
||||
"item should remain in Done with long retention"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sweep_respects_custom_retention() {
|
||||
crate::db::ensure_content_store();
|
||||
crate::db::write_item_with_content(
|
||||
"9882_story_sweep_custom",
|
||||
"5_done",
|
||||
"---\nname: custom\n---\n",
|
||||
);
|
||||
|
||||
// With ZERO retention, sweep should promote.
|
||||
sweep_done_to_archived(Duration::ZERO);
|
||||
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9882_story_sweep_custom");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
|
||||
"item should be archived with zero retention"
|
||||
);
|
||||
}
|
||||
|
||||
/// Prove that the sweep reads `merged_at` from the CRDT (not `Utc::now()`).
|
||||
///
|
||||
/// This test sets `merged_at` to 10 seconds in the past and uses a 5-second
|
||||
/// retention. If the sweep were still using `Utc::now()` as the start time
|
||||
/// (the original bug), the elapsed time would be ~0 and the item would NOT
|
||||
/// be swept. With the fix, the item is swept because 10s > 5s retention.
|
||||
#[test]
|
||||
fn sweep_uses_crdt_merged_at_not_utc_now() {
|
||||
crate::db::ensure_content_store();
|
||||
|
||||
let ten_seconds_ago =
|
||||
(chrono::Utc::now() - chrono::Duration::seconds(10)).timestamp() as f64;
|
||||
|
||||
// Write item in 5_done with an explicit past merged_at timestamp.
|
||||
crate::crdt_state::write_item(
|
||||
"9883_story_sweep_merged_at",
|
||||
"5_done",
|
||||
Some("merged_at test"),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(ten_seconds_ago),
|
||||
);
|
||||
|
||||
// 5-second retention: item is 10s old → should be swept.
|
||||
sweep_done_to_archived(Duration::from_secs(5));
|
||||
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9883_story_sweep_merged_at");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
|
||||
"item with merged_at 10s ago should be archived with 5s retention"
|
||||
);
|
||||
}
|
||||
|
||||
/// Prove that an item with merged_at NEWER than done_retention is NOT swept.
|
||||
#[test]
|
||||
fn sweep_keeps_item_newer_than_retention() {
|
||||
crate::db::ensure_content_store();
|
||||
|
||||
let one_second_ago = (chrono::Utc::now() - chrono::Duration::seconds(1)).timestamp() as f64;
|
||||
|
||||
crate::crdt_state::write_item(
|
||||
"9884_story_sweep_recent",
|
||||
"5_done",
|
||||
Some("recent merged_at test"),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(one_second_ago),
|
||||
);
|
||||
|
||||
// 1-hour retention: item is only 1s old → should NOT be swept.
|
||||
sweep_done_to_archived(Duration::from_secs(3600));
|
||||
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9884_story_sweep_recent");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })),
|
||||
"item with merged_at 1s ago should stay in Done with 1-hour retention"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
//! Watcher event types emitted to WebSocket clients and internal subscribers.
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
/// A lifecycle event emitted by the filesystem watcher.
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum WatcherEvent {
|
||||
/// A work-pipeline file was created, modified, or deleted.
|
||||
WorkItem {
|
||||
/// Pipeline stage directory (e.g. `"2_current"`, `"5_archived"`).
|
||||
stage: String,
|
||||
/// Work item ID (filename stem without extension, e.g. `"42_story_my_feature"`).
|
||||
item_id: String,
|
||||
/// Semantic action inferred from the stage (e.g. `"start"`, `"accept"`).
|
||||
action: String,
|
||||
/// The deterministic git commit message used (or that would have been used).
|
||||
commit_msg: String,
|
||||
/// The pipeline stage the item moved FROM, populated for move operations.
|
||||
/// `None` for creations, deletions, or synthetic events.
|
||||
from_stage: Option<String>,
|
||||
},
|
||||
/// `.huskies/project.toml` was modified at the project root (not inside a worktree).
|
||||
ConfigChanged,
|
||||
/// An agent's state changed (started, stopped, completed, etc.).
|
||||
/// Triggers a pipeline state refresh so the frontend can update agent
|
||||
/// assignments without waiting for a filesystem event.
|
||||
AgentStateChanged,
|
||||
/// A story encountered a failure (e.g. merge failure).
|
||||
/// Triggers an error notification to configured Matrix rooms.
|
||||
MergeFailure {
|
||||
/// Work item ID (e.g. `"42_story_my_feature"`).
|
||||
story_id: String,
|
||||
/// Human-readable description of the failure.
|
||||
reason: String,
|
||||
},
|
||||
/// An agent hit an API rate limit.
|
||||
/// Triggers a warning notification to configured chat rooms.
|
||||
RateLimitWarning {
|
||||
/// Work item ID the agent is working on.
|
||||
story_id: String,
|
||||
/// Name of the agent that hit the rate limit.
|
||||
agent_name: String,
|
||||
},
|
||||
/// A story has been blocked (e.g. retry limit exceeded, empty diff).
|
||||
/// Triggers a warning notification to configured chat rooms.
|
||||
StoryBlocked {
|
||||
/// Work item ID (e.g. `"42_story_my_feature"`).
|
||||
story_id: String,
|
||||
/// Human-readable reason the story was blocked.
|
||||
reason: String,
|
||||
},
|
||||
/// An agent hit a hard API rate limit and will be blocked until `reset_at`.
|
||||
/// Triggers auto-scheduling of a timer and a notification with the resume time.
|
||||
RateLimitHardBlock {
|
||||
/// Work item ID the agent is working on.
|
||||
story_id: String,
|
||||
/// Name of the agent that hit the hard rate limit.
|
||||
agent_name: String,
|
||||
/// UTC instant at which the rate limit resets.
|
||||
reset_at: chrono::DateTime<chrono::Utc>,
|
||||
},
|
||||
/// An OAuth account pool swap succeeded: a different account is now active.
|
||||
/// Triggers a notification to chat transports naming the new account.
|
||||
OAuthAccountSwapped {
|
||||
/// Email address of the newly activated account.
|
||||
new_email: String,
|
||||
},
|
||||
/// All OAuth accounts in the pool are rate-limited — no swap was possible.
|
||||
/// Triggers a notification to chat transports with the earliest reset time.
|
||||
OAuthAccountsExhausted {
|
||||
/// Human-readable message describing when the earliest reset occurs.
|
||||
earliest_reset_msg: String,
|
||||
},
|
||||
}
|
||||
@@ -0,0 +1,362 @@
|
||||
//! Filesystem watcher for `.huskies/project.toml` and `.huskies/agents.toml`.
|
||||
//!
|
||||
//! Watches config files for changes and broadcasts a [`WatcherEvent`] to all
|
||||
//! connected WebSocket clients so the frontend can reload the agent roster
|
||||
//! without a server restart.
|
||||
//!
|
||||
//! Work-item pipeline events (stage transitions) are driven by CRDT state
|
||||
//! changes via [`crate::crdt_state::subscribe`], not by filesystem events.
|
||||
//!
|
||||
//! # Debouncing
|
||||
//! Config-file events are buffered for 300 ms after the last activity to avoid
|
||||
//! duplicate broadcasts when an editor writes multiple events in quick succession.
|
||||
//!
|
||||
//! # Submodules
|
||||
//! - [`events`] — [`WatcherEvent`] enum definition.
|
||||
//! - [`sweep`] — periodic sweep of `5_done` → `6_archived`.
|
||||
|
||||
mod events;
|
||||
mod sweep;
|
||||
|
||||
pub use events::WatcherEvent;
|
||||
pub(crate) use sweep::sweep_done_to_archived;
|
||||
|
||||
use crate::slog;
|
||||
use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher, recommended_watcher};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
/// Return `true` if `path` is the root-level `.huskies/project.toml` or
|
||||
/// `.huskies/agents.toml`, i.e. `{git_root}/.huskies/{project,agents}.toml`.
|
||||
///
|
||||
/// Returns `false` for paths inside worktree directories (paths containing
|
||||
/// a `worktrees` component).
|
||||
pub fn is_config_file(path: &Path, git_root: &Path) -> bool {
|
||||
// Reject any path that passes through the worktrees directory.
|
||||
if path.components().any(|c| c.as_os_str() == "worktrees") {
|
||||
return false;
|
||||
}
|
||||
let huskies = git_root.join(".huskies");
|
||||
path == huskies.join("project.toml") || path == huskies.join("agents.toml")
|
||||
}
|
||||
|
||||
/// Map a pipeline directory name to a (action, commit-message-prefix) pair.
|
||||
///
|
||||
/// Used by the CRDT-to-watcher bridge (in `main.rs`) to derive the action and
|
||||
/// commit message for `WatcherEvent::WorkItem` events.
|
||||
pub fn stage_metadata(stage: &str, item_id: &str) -> Option<(&'static str, String)> {
|
||||
use crate::pipeline_state::Stage;
|
||||
let (action, msg) = match Stage::from_dir(stage)? {
|
||||
Stage::Backlog => ("create", format!("huskies: create {item_id}")),
|
||||
Stage::Coding => ("start", format!("huskies: start {item_id}")),
|
||||
Stage::Qa => ("qa", format!("huskies: queue {item_id} for QA")),
|
||||
Stage::Merge { .. } => ("merge", format!("huskies: queue {item_id} for merge")),
|
||||
Stage::Done { .. } => ("done", format!("huskies: done {item_id}")),
|
||||
Stage::Archived { .. } => ("accept", format!("huskies: accept {item_id}")),
|
||||
};
|
||||
Some((action, msg))
|
||||
}
|
||||
|
||||
/// Start the filesystem watcher on a dedicated OS thread.
|
||||
///
|
||||
/// Watches `.huskies/project.toml` and `.huskies/agents.toml` for config
|
||||
/// hot-reload, and periodically sweeps `5_done/` → `6_archived/` via CRDT.
|
||||
///
|
||||
/// Work-item pipeline events (stage transitions) are no longer driven by
|
||||
/// filesystem events — they originate from CRDT state changes via
|
||||
/// [`crate::crdt_state::subscribe`].
|
||||
///
|
||||
/// `git_root` — project root (passed to `git` commands and config loading).
|
||||
/// `event_tx` — broadcast sender for `ConfigChanged` events.
|
||||
pub fn start_watcher(git_root: PathBuf, event_tx: broadcast::Sender<WatcherEvent>) {
|
||||
std::thread::spawn(move || {
|
||||
let (notify_tx, notify_rx) = mpsc::channel::<notify::Result<notify::Event>>();
|
||||
|
||||
let mut watcher: RecommendedWatcher = match recommended_watcher(move |res| {
|
||||
let _ = notify_tx.send(res);
|
||||
}) {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
slog!("[watcher] failed to create watcher: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Watch config files for hot-reload. Work-item directories are NOT
|
||||
// watched — CRDT state transitions drive pipeline events now.
|
||||
let huskies = git_root.join(".huskies");
|
||||
for config_file in [huskies.join("project.toml"), huskies.join("agents.toml")] {
|
||||
if config_file.exists()
|
||||
&& let Err(e) = watcher.watch(&config_file, RecursiveMode::NonRecursive)
|
||||
{
|
||||
slog!(
|
||||
"[watcher] failed to watch config file {}: {e}",
|
||||
config_file.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
slog!("[watcher] watching config files for hot-reload");
|
||||
|
||||
const DEBOUNCE: Duration = Duration::from_millis(300);
|
||||
|
||||
// Whether a config file change is pending in the current debounce window.
|
||||
let mut config_changed_pending = false;
|
||||
let mut deadline: Option<Instant> = None;
|
||||
|
||||
loop {
|
||||
// How long until the debounce window closes (or wait for next event).
|
||||
let timeout = deadline.map_or(Duration::from_secs(60), |d| {
|
||||
d.saturating_duration_since(Instant::now())
|
||||
});
|
||||
|
||||
let flush = match notify_rx.recv_timeout(timeout) {
|
||||
Ok(Ok(event)) => {
|
||||
let is_relevant_kind = matches!(
|
||||
event.kind,
|
||||
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
|
||||
);
|
||||
|
||||
if is_relevant_kind {
|
||||
for path in event.paths {
|
||||
if is_config_file(&path, &git_root) {
|
||||
slog!("[watcher] config change detected: {}", path.display());
|
||||
config_changed_pending = true;
|
||||
deadline = Some(Instant::now() + DEBOUNCE);
|
||||
}
|
||||
// Work-item file changes are intentionally ignored.
|
||||
// CRDT state transitions handle pipeline events.
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
slog!("[watcher] notify error: {e}");
|
||||
false
|
||||
}
|
||||
// Debounce window expired — time to flush.
|
||||
Err(mpsc::RecvTimeoutError::Timeout) => true,
|
||||
Err(mpsc::RecvTimeoutError::Disconnected) => {
|
||||
slog!("[watcher] channel disconnected, shutting down");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if flush {
|
||||
if config_changed_pending {
|
||||
slog!("[watcher] broadcasting agent_config_changed");
|
||||
let _ = event_tx.send(WatcherEvent::ConfigChanged);
|
||||
|
||||
config_changed_pending = false;
|
||||
}
|
||||
deadline = None;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// ── Test-only helpers (legacy; retained for the test suite) ───────────────
|
||||
|
||||
/// Return the pipeline stage name for a path if it is a `.md` file living
|
||||
/// directly inside one of the known work subdirectories, otherwise `None`.
|
||||
///
|
||||
/// Explicitly returns `None` for any path under `.huskies/worktrees/` so
|
||||
/// that code changes made by agents in their isolated worktrees are never
|
||||
/// auto-committed to master by the watcher.
|
||||
///
|
||||
/// Retained for tests; no longer called in production (CRDT drives events).
|
||||
#[cfg(test)]
|
||||
fn stage_for_path(path: &Path) -> Option<String> {
|
||||
// Reject any path that passes through the worktrees directory.
|
||||
if path.components().any(|c| c.as_os_str() == "worktrees") {
|
||||
return None;
|
||||
}
|
||||
|
||||
if path.extension().is_none_or(|e| e != "md") {
|
||||
return None;
|
||||
}
|
||||
let stage = path
|
||||
.parent()
|
||||
.and_then(|p| p.file_name())
|
||||
.and_then(|n| n.to_str())?;
|
||||
matches!(
|
||||
stage,
|
||||
"1_backlog" | "2_current" | "3_qa" | "4_merge" | "5_done" | "6_archived"
|
||||
)
|
||||
.then(|| stage.to_string())
|
||||
}
|
||||
|
||||
/// Stage all changes in the work directory and commit with the given message.
|
||||
///
|
||||
/// Uses `git add -A .huskies/work/` to catch both additions and deletions in
|
||||
/// a single commit. Returns `Ok(true)` if a commit was made, `Ok(false)` if
|
||||
/// there was nothing to commit, and `Err` for unexpected failures.
|
||||
///
|
||||
/// Retained for tests; no longer called in production (CRDT drives events).
|
||||
#[cfg(test)]
|
||||
fn git_add_work_and_commit(git_root: &Path, message: &str) -> Result<bool, String> {
|
||||
let work_rel = PathBuf::from(".huskies").join("work");
|
||||
|
||||
let add_out = std::process::Command::new("git")
|
||||
.args(["add", "-A"])
|
||||
.arg(&work_rel)
|
||||
.current_dir(git_root)
|
||||
.output()
|
||||
.map_err(|e| format!("git add: {e}"))?;
|
||||
if !add_out.status.success() {
|
||||
return Err(format!(
|
||||
"git add failed: {}",
|
||||
String::from_utf8_lossy(&add_out.stderr)
|
||||
));
|
||||
}
|
||||
|
||||
let commit_out = std::process::Command::new("git")
|
||||
.args(["commit", "-m", message])
|
||||
.current_dir(git_root)
|
||||
.output()
|
||||
.map_err(|e| format!("git commit: {e}"))?;
|
||||
|
||||
if commit_out.status.success() {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let stderr = String::from_utf8_lossy(&commit_out.stderr);
|
||||
let stdout = String::from_utf8_lossy(&commit_out.stdout);
|
||||
if stdout.contains("nothing to commit") || stderr.contains("nothing to commit") {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
Err(format!("git commit failed: {stderr}"))
|
||||
}
|
||||
|
||||
/// Stages that represent meaningful git checkpoints (creation and archival).
|
||||
/// Intermediate stages (current, qa, merge, done) are transient pipeline state
|
||||
/// that don't need to be committed.
|
||||
///
|
||||
/// Retained for tests; no longer called in production (CRDT drives events).
|
||||
#[cfg(test)]
|
||||
const COMMIT_WORTHY_STAGES: &[&str] = &["1_backlog", "5_done", "6_archived"];
|
||||
|
||||
/// Return `true` if changes in `stage` should be committed to git.
|
||||
///
|
||||
/// Retained for tests; no longer called in production (CRDT drives events).
|
||||
#[cfg(test)]
|
||||
fn should_commit_stage(stage: &str) -> bool {
|
||||
COMMIT_WORTHY_STAGES.contains(&stage)
|
||||
}
|
||||
|
||||
/// Process a batch of pending (path → stage) entries: commit and broadcast.
|
||||
///
|
||||
/// Only files that still exist on disk are used to derive the commit message
|
||||
/// (they represent the destination of a move or a new file). Deletions are
|
||||
/// captured by `git add -A .huskies/work/` automatically.
|
||||
///
|
||||
/// Only terminal stages (`1_backlog` and `6_archived`) trigger git commits.
|
||||
/// All stages broadcast a [`WatcherEvent`] so the frontend stays in sync.
|
||||
///
|
||||
/// Retained for tests; no longer called in production (CRDT drives events).
|
||||
#[cfg(test)]
|
||||
fn flush_pending(
|
||||
pending: &std::collections::HashMap<PathBuf, String>,
|
||||
git_root: &Path,
|
||||
event_tx: &broadcast::Sender<WatcherEvent>,
|
||||
) {
|
||||
use crate::io::story_metadata::clear_front_matter_field;
|
||||
|
||||
// Separate into files that exist (additions) vs gone (deletions).
|
||||
let mut additions: Vec<(&PathBuf, &str)> = Vec::new();
|
||||
for (path, stage) in pending {
|
||||
if path.exists() {
|
||||
additions.push((path, stage.as_str()));
|
||||
}
|
||||
}
|
||||
|
||||
// Pick the commit message from the first addition (the meaningful side of a move).
|
||||
// If there are only deletions, use a generic message.
|
||||
let (action, item_id, commit_msg) = if let Some((path, stage)) = additions.first() {
|
||||
let item = path
|
||||
.file_stem()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("unknown");
|
||||
if let Some((act, msg)) = stage_metadata(stage, item) {
|
||||
(act, item.to_string(), msg)
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// Only deletions — pick any pending path for the item name.
|
||||
let Some((path, _)) = pending.iter().next() else {
|
||||
return;
|
||||
};
|
||||
let item = path
|
||||
.file_stem()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("unknown");
|
||||
(
|
||||
"remove",
|
||||
item.to_string(),
|
||||
format!("huskies: remove {item}"),
|
||||
)
|
||||
};
|
||||
|
||||
// Strip stale merge_failure front matter from any story that has left 4_merge/.
|
||||
for (path, stage) in &additions {
|
||||
if *stage != "4_merge"
|
||||
&& let Err(e) = clear_front_matter_field(path, "merge_failure")
|
||||
{
|
||||
slog!(
|
||||
"[watcher] Warning: could not clear merge_failure from {}: {e}",
|
||||
path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Only commit for terminal stages; intermediate moves are broadcast-only.
|
||||
let dest_stage = additions.first().map_or("unknown", |(_, s)| *s);
|
||||
let should_commit = should_commit_stage(dest_stage);
|
||||
|
||||
if should_commit {
|
||||
slog!("[watcher] flush: {commit_msg}");
|
||||
match git_add_work_and_commit(git_root, &commit_msg) {
|
||||
Ok(committed) => {
|
||||
if committed {
|
||||
slog!("[watcher] committed: {commit_msg}");
|
||||
} else {
|
||||
slog!("[watcher] skipped (already committed): {commit_msg}");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
slog!("[watcher] git error: {e}");
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
slog!("[watcher] flush (broadcast-only): {commit_msg}");
|
||||
}
|
||||
|
||||
// For move operations, find the source stage from deleted entries with matching item_id.
|
||||
let from_stage: Option<String> = if !additions.is_empty() {
|
||||
pending
|
||||
.iter()
|
||||
.filter(|(path, _)| !path.exists())
|
||||
.find(|(path, _)| path.file_stem().and_then(|s| s.to_str()) == Some(item_id.as_str()))
|
||||
.map(|(_, stage)| stage.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Always broadcast the event so connected WebSocket clients stay in sync.
|
||||
let evt = WatcherEvent::WorkItem {
|
||||
stage: dest_stage.to_string(),
|
||||
item_id,
|
||||
action: action.to_string(),
|
||||
commit_msg,
|
||||
from_stage,
|
||||
};
|
||||
let _ = event_tx.send(evt);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@@ -0,0 +1,48 @@
|
||||
//! Periodic sweep of completed work items from `5_done` to `6_archived`.
|
||||
//!
|
||||
//! Items that have been in `5_done` for longer than the configured retention
|
||||
//! period are automatically promoted to `6_archived` via CRDT state transitions.
|
||||
|
||||
use crate::slog;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention
|
||||
/// duration to `6_archived` via CRDT state transitions.
|
||||
///
|
||||
/// All state is read from and written to CRDT — no filesystem access.
|
||||
/// Worktree pruning is handled separately by the CRDT event subscriber.
|
||||
pub(crate) fn sweep_done_to_archived(done_retention: Duration) {
|
||||
use crate::pipeline_state::{PipelineEvent, Stage, read_all_typed, stage_dir_name, transition};
|
||||
|
||||
for item in read_all_typed() {
|
||||
if let Stage::Done { merged_at, .. } = &item.stage {
|
||||
let age = chrono::Utc::now()
|
||||
.signed_duration_since(*merged_at)
|
||||
.to_std()
|
||||
.unwrap_or_default();
|
||||
if age >= done_retention {
|
||||
let story_id = item.story_id.0.clone();
|
||||
match transition(item.stage.clone(), PipelineEvent::Accepted) {
|
||||
Ok(new_stage) => {
|
||||
crate::crdt_state::write_item(
|
||||
&story_id,
|
||||
stage_dir_name(&new_stage),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(false),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
slog!("[watcher] sweep: promoted {story_id} → 6_archived/");
|
||||
}
|
||||
Err(e) => {
|
||||
slog!("[watcher] sweep: transition error for {story_id}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,730 @@
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use tempfile::TempDir;
|
||||
|
||||
/// Initialise a minimal git repo so commit operations work.
|
||||
fn init_git_repo(dir: &std::path::Path) {
|
||||
use std::process::Command;
|
||||
Command::new("git")
|
||||
.args(["init"])
|
||||
.current_dir(dir)
|
||||
.output()
|
||||
.expect("git init");
|
||||
Command::new("git")
|
||||
.args(["config", "user.email", "test@example.com"])
|
||||
.current_dir(dir)
|
||||
.output()
|
||||
.expect("git config email");
|
||||
Command::new("git")
|
||||
.args(["config", "user.name", "Test"])
|
||||
.current_dir(dir)
|
||||
.output()
|
||||
.expect("git config name");
|
||||
Command::new("git")
|
||||
.args(["commit", "--allow-empty", "-m", "init"])
|
||||
.current_dir(dir)
|
||||
.output()
|
||||
.expect("git initial commit");
|
||||
}
|
||||
|
||||
/// Create the `.huskies/work/{stage}/` dir tree inside `root`.
|
||||
fn make_stage_dir(root: &std::path::Path, stage: &str) -> PathBuf {
|
||||
let dir = root.join(".huskies").join("work").join(stage);
|
||||
fs::create_dir_all(&dir).expect("create stage dir");
|
||||
dir
|
||||
}
|
||||
|
||||
// ── git_add_work_and_commit ───────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn git_commit_returns_true_when_file_added() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), "2_current");
|
||||
fs::write(stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n").unwrap();
|
||||
|
||||
let result = git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo");
|
||||
assert_eq!(
|
||||
result,
|
||||
Ok(true),
|
||||
"should return Ok(true) when a commit was made"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn git_commit_returns_false_when_nothing_to_commit() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), "2_current");
|
||||
fs::write(stage_dir.join("42_story_foo.md"), "---\nname: test\n---\n").unwrap();
|
||||
|
||||
// First commit — should succeed.
|
||||
git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo").unwrap();
|
||||
|
||||
// Second call with no changes — should return Ok(false).
|
||||
let result = git_add_work_and_commit(tmp.path(), "huskies: start 42_story_foo");
|
||||
assert_eq!(
|
||||
result,
|
||||
Ok(false),
|
||||
"should return Ok(false) when nothing to commit"
|
||||
);
|
||||
}
|
||||
|
||||
// ── flush_pending ─────────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn flush_pending_commits_and_broadcasts_for_terminal_stage() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), "1_backlog");
|
||||
let story_path = stage_dir.join("42_story_foo.md");
|
||||
fs::write(&story_path, "---\nname: test\n---\n").unwrap();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(story_path, "1_backlog".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
let evt = rx.try_recv().expect("expected a broadcast event");
|
||||
match evt {
|
||||
WatcherEvent::WorkItem {
|
||||
stage,
|
||||
item_id,
|
||||
action,
|
||||
commit_msg,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(stage, "1_backlog");
|
||||
assert_eq!(item_id, "42_story_foo");
|
||||
assert_eq!(action, "create");
|
||||
assert_eq!(commit_msg, "huskies: create 42_story_foo");
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
|
||||
// Verify the file was actually committed.
|
||||
let log = std::process::Command::new("git")
|
||||
.args(["log", "--oneline", "-1"])
|
||||
.current_dir(tmp.path())
|
||||
.output()
|
||||
.expect("git log");
|
||||
let log_msg = String::from_utf8_lossy(&log.stdout);
|
||||
assert!(
|
||||
log_msg.contains("huskies: create 42_story_foo"),
|
||||
"terminal stage should produce a git commit"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_pending_broadcasts_without_commit_for_intermediate_stage() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), "2_current");
|
||||
let story_path = stage_dir.join("42_story_foo.md");
|
||||
fs::write(&story_path, "---\nname: test\n---\n").unwrap();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(story_path, "2_current".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
// Event should still be broadcast for frontend sync.
|
||||
let evt = rx.try_recv().expect("expected a broadcast event");
|
||||
match evt {
|
||||
WatcherEvent::WorkItem {
|
||||
stage,
|
||||
item_id,
|
||||
action,
|
||||
commit_msg,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(stage, "2_current");
|
||||
assert_eq!(item_id, "42_story_foo");
|
||||
assert_eq!(action, "start");
|
||||
assert_eq!(commit_msg, "huskies: start 42_story_foo");
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
|
||||
// Verify NO git commit was made (only the initial empty commit should exist).
|
||||
let log = std::process::Command::new("git")
|
||||
.args(["log", "--oneline"])
|
||||
.current_dir(tmp.path())
|
||||
.output()
|
||||
.expect("git log");
|
||||
let log_msg = String::from_utf8_lossy(&log.stdout);
|
||||
assert!(
|
||||
!log_msg.contains("huskies:"),
|
||||
"intermediate stage should NOT produce a git commit"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_pending_broadcasts_for_all_pipeline_stages() {
|
||||
let stages = [
|
||||
("1_backlog", "create", "huskies: create 10_story_x"),
|
||||
("3_qa", "qa", "huskies: queue 10_story_x for QA"),
|
||||
("4_merge", "merge", "huskies: queue 10_story_x for merge"),
|
||||
("5_done", "done", "huskies: done 10_story_x"),
|
||||
("6_archived", "accept", "huskies: accept 10_story_x"),
|
||||
];
|
||||
|
||||
for (stage, expected_action, expected_msg) in stages {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), stage);
|
||||
let story_path = stage_dir.join("10_story_x.md");
|
||||
fs::write(&story_path, "---\nname: test\n---\n").unwrap();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(story_path, stage.to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
// All stages should broadcast events regardless of commit behavior.
|
||||
let evt = rx.try_recv().expect("expected broadcast for stage {stage}");
|
||||
match evt {
|
||||
WatcherEvent::WorkItem {
|
||||
action, commit_msg, ..
|
||||
} => {
|
||||
assert_eq!(action, expected_action, "stage {stage}");
|
||||
assert_eq!(commit_msg, expected_msg, "stage {stage}");
|
||||
}
|
||||
other => panic!("unexpected event for stage {stage}: {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_pending_deletion_only_broadcasts_remove_event() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
// Create the work dir tree but NOT the file (simulates a deletion).
|
||||
make_stage_dir(tmp.path(), "2_current");
|
||||
let deleted_path = tmp
|
||||
.path()
|
||||
.join(".huskies")
|
||||
.join("work")
|
||||
.join("2_current")
|
||||
.join("42_story_foo.md");
|
||||
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(deleted_path, "2_current".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
// Even when nothing was committed (file never existed), an event is broadcast.
|
||||
let evt = rx
|
||||
.try_recv()
|
||||
.expect("expected a broadcast event for deletion");
|
||||
match evt {
|
||||
WatcherEvent::WorkItem {
|
||||
action, item_id, ..
|
||||
} => {
|
||||
assert_eq!(action, "remove");
|
||||
assert_eq!(item_id, "42_story_foo");
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_pending_skips_unknown_stage_for_addition() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
// File sits in an unrecognised directory.
|
||||
let unknown_dir = tmp.path().join(".huskies").join("work").join("9_unknown");
|
||||
fs::create_dir_all(&unknown_dir).unwrap();
|
||||
let path = unknown_dir.join("42_story_foo.md");
|
||||
fs::write(&path, "---\nname: test\n---\n").unwrap();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(path, "9_unknown".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
// No event should be broadcast because stage_metadata returns None for unknown stages.
|
||||
assert!(
|
||||
rx.try_recv().is_err(),
|
||||
"no event should be broadcast for unknown stage"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_pending_empty_pending_does_nothing() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
make_stage_dir(tmp.path(), "2_current");
|
||||
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||
let pending: HashMap<PathBuf, String> = HashMap::new();
|
||||
|
||||
// Should not panic and should not broadcast anything.
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
assert!(rx.try_recv().is_err(), "no event for empty pending map");
|
||||
}
|
||||
|
||||
// ── flush_pending clears merge_failure ─────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn flush_pending_clears_merge_failure_when_leaving_merge_stage() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), "2_current");
|
||||
let story_path = stage_dir.join("50_story_retry.md");
|
||||
fs::write(
|
||||
&story_path,
|
||||
"---\nname: Retry Story\nmerge_failure: \"conflicts detected\"\n---\n# Story\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (tx, _rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(story_path.clone(), "2_current".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
let contents = fs::read_to_string(&story_path).unwrap();
|
||||
assert!(
|
||||
!contents.contains("merge_failure"),
|
||||
"merge_failure should be stripped when story lands in 2_current"
|
||||
);
|
||||
assert!(contents.contains("name: Retry Story"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_pending_clears_merge_failure_when_moving_to_backlog() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), "1_backlog");
|
||||
let story_path = stage_dir.join("51_story_reset.md");
|
||||
fs::write(
|
||||
&story_path,
|
||||
"---\nname: Reset Story\nmerge_failure: \"gate failed\"\n---\n# Story\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (tx, _rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(story_path.clone(), "1_backlog".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
let contents = fs::read_to_string(&story_path).unwrap();
|
||||
assert!(
|
||||
!contents.contains("merge_failure"),
|
||||
"merge_failure should be stripped when story lands in 1_backlog"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_pending_clears_merge_failure_when_moving_to_done() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), "5_done");
|
||||
let story_path = stage_dir.join("52_story_done.md");
|
||||
fs::write(
|
||||
&story_path,
|
||||
"---\nname: Done Story\nmerge_failure: \"stale error\"\n---\n# Story\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (tx, _rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(story_path.clone(), "5_done".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
let contents = fs::read_to_string(&story_path).unwrap();
|
||||
assert!(
|
||||
!contents.contains("merge_failure"),
|
||||
"merge_failure should be stripped when story lands in 5_done"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_pending_preserves_merge_failure_when_in_merge_stage() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), "4_merge");
|
||||
let story_path = stage_dir.join("53_story_merging.md");
|
||||
fs::write(
|
||||
&story_path,
|
||||
"---\nname: Merging Story\nmerge_failure: \"conflicts\"\n---\n# Story\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let (tx, _rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(story_path.clone(), "4_merge".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
let contents = fs::read_to_string(&story_path).unwrap();
|
||||
assert!(
|
||||
contents.contains("merge_failure"),
|
||||
"merge_failure should be preserved when story is in 4_merge"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flush_pending_no_op_when_no_merge_failure() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
let stage_dir = make_stage_dir(tmp.path(), "2_current");
|
||||
let story_path = stage_dir.join("54_story_clean.md");
|
||||
let original = "---\nname: Clean Story\n---\n# Story\n";
|
||||
fs::write(&story_path, original).unwrap();
|
||||
|
||||
let (tx, _rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(story_path.clone(), "2_current".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
let contents = fs::read_to_string(&story_path).unwrap();
|
||||
assert_eq!(
|
||||
contents, original,
|
||||
"file without merge_failure should be unchanged"
|
||||
);
|
||||
}
|
||||
|
||||
// ── flush_pending from_stage ─────────────────────────────────────────────
|
||||
|
||||
/// AC3: when a pending map contains both a deletion (source stage) and a
|
||||
/// creation (dest stage) for the same item_id, the broadcast event should
|
||||
/// have `from_stage` set to the source stage key.
|
||||
#[test]
|
||||
fn flush_pending_sets_from_stage_for_move_operations() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
|
||||
// Destination exists (file moved here).
|
||||
let merge_dir = make_stage_dir(tmp.path(), "4_merge");
|
||||
let merge_path = merge_dir.join("42_story_foo.md");
|
||||
fs::write(&merge_path, "---\nname: test\n---\n").unwrap();
|
||||
|
||||
// Source path does NOT exist (file was moved away).
|
||||
make_stage_dir(tmp.path(), "3_qa");
|
||||
let qa_path = tmp
|
||||
.path()
|
||||
.join(".huskies")
|
||||
.join("work")
|
||||
.join("3_qa")
|
||||
.join("42_story_foo.md");
|
||||
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(merge_path, "4_merge".to_string()); // addition
|
||||
pending.insert(qa_path, "3_qa".to_string()); // deletion
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
let evt = rx.try_recv().expect("expected event");
|
||||
match evt {
|
||||
WatcherEvent::WorkItem {
|
||||
stage, from_stage, ..
|
||||
} => {
|
||||
assert_eq!(stage, "4_merge");
|
||||
assert_eq!(from_stage, Some("3_qa".to_string()));
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
/// AC3: when a pending map has only an addition (creation, not a move),
|
||||
/// `from_stage` should be `None`.
|
||||
#[test]
|
||||
fn flush_pending_sets_from_stage_to_none_for_creations() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
init_git_repo(tmp.path());
|
||||
|
||||
let stage_dir = make_stage_dir(tmp.path(), "2_current");
|
||||
let story_path = stage_dir.join("55_story_new.md");
|
||||
fs::write(&story_path, "---\nname: New Story\n---\n").unwrap();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(16);
|
||||
let mut pending = HashMap::new();
|
||||
pending.insert(story_path, "2_current".to_string());
|
||||
|
||||
flush_pending(&pending, tmp.path(), &tx);
|
||||
|
||||
let evt = rx.try_recv().expect("expected event");
|
||||
match evt {
|
||||
WatcherEvent::WorkItem { from_stage, .. } => {
|
||||
assert_eq!(from_stage, None, "creation should have no from_stage");
|
||||
}
|
||||
other => panic!("unexpected event: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
// ── stage_for_path (additional edge cases) ────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn stage_for_path_recognises_pipeline_dirs() {
|
||||
let base = PathBuf::from("/proj/.huskies/work");
|
||||
assert_eq!(
|
||||
stage_for_path(&base.join("2_current/42_story_foo.md")),
|
||||
Some("2_current".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
stage_for_path(&base.join("5_done/10_bug_bar.md")),
|
||||
Some("5_done".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
stage_for_path(&base.join("6_archived/10_bug_bar.md")),
|
||||
Some("6_archived".to_string())
|
||||
);
|
||||
assert_eq!(stage_for_path(&base.join("other/file.md")), None);
|
||||
assert_eq!(
|
||||
stage_for_path(&base.join("2_current/42_story_foo.txt")),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_for_path_ignores_worktree_paths() {
|
||||
let worktrees = PathBuf::from("/proj/.huskies/worktrees");
|
||||
|
||||
// Code changes inside a worktree must be ignored.
|
||||
assert_eq!(
|
||||
stage_for_path(&worktrees.join("42_story_foo/server/src/main.rs")),
|
||||
None,
|
||||
);
|
||||
|
||||
// Even if a worktree happens to contain a path component that looks
|
||||
// like a pipeline stage, it must still be ignored.
|
||||
assert_eq!(
|
||||
stage_for_path(&worktrees.join("42_story_foo/.huskies/work/2_current/42_story_foo.md")),
|
||||
None,
|
||||
);
|
||||
|
||||
// A path that only contains the word "worktrees" as part of a longer
|
||||
// segment (not an exact component) must NOT be filtered out.
|
||||
assert_eq!(
|
||||
stage_for_path(&PathBuf::from(
|
||||
"/proj/.huskies/work/2_current/not_worktrees_story.md"
|
||||
)),
|
||||
Some("2_current".to_string()),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn should_commit_stage_only_for_terminal_stages() {
|
||||
// Terminal stages — should commit.
|
||||
assert!(should_commit_stage("1_backlog"));
|
||||
assert!(should_commit_stage("5_done"));
|
||||
assert!(should_commit_stage("6_archived"));
|
||||
// Intermediate stages — broadcast-only, no commit.
|
||||
assert!(!should_commit_stage("2_current"));
|
||||
assert!(!should_commit_stage("3_qa"));
|
||||
assert!(!should_commit_stage("4_merge"));
|
||||
// Unknown — no commit.
|
||||
assert!(!should_commit_stage("unknown"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stage_metadata_returns_correct_actions() {
|
||||
let (action, msg) = stage_metadata("2_current", "42_story_foo").unwrap();
|
||||
assert_eq!(action, "start");
|
||||
assert_eq!(msg, "huskies: start 42_story_foo");
|
||||
|
||||
let (action, msg) = stage_metadata("5_done", "42_story_foo").unwrap();
|
||||
assert_eq!(action, "done");
|
||||
assert_eq!(msg, "huskies: done 42_story_foo");
|
||||
|
||||
let (action, msg) = stage_metadata("6_archived", "42_story_foo").unwrap();
|
||||
assert_eq!(action, "accept");
|
||||
assert_eq!(msg, "huskies: accept 42_story_foo");
|
||||
|
||||
assert!(stage_metadata("unknown", "id").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_identifies_root_project_toml() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
let config = git_root.join(".huskies").join("project.toml");
|
||||
assert!(is_config_file(&config, &git_root));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_identifies_root_agents_toml() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
let agents = git_root.join(".huskies").join("agents.toml");
|
||||
assert!(is_config_file(&agents, &git_root));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_rejects_worktree_copies() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
// project.toml inside a worktree must NOT be treated as the root config.
|
||||
let worktree_config =
|
||||
PathBuf::from("/proj/.huskies/worktrees/42_story_foo/.huskies/project.toml");
|
||||
assert!(!is_config_file(&worktree_config, &git_root));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_rejects_other_files() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
// Random files must not match.
|
||||
assert!(!is_config_file(
|
||||
&PathBuf::from("/proj/.huskies/work/2_current/42_story_foo.md"),
|
||||
&git_root
|
||||
));
|
||||
assert!(!is_config_file(
|
||||
&PathBuf::from("/proj/.huskies/README.md"),
|
||||
&git_root
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_config_file_rejects_wrong_root() {
|
||||
let git_root = PathBuf::from("/proj");
|
||||
let other_root_config = PathBuf::from("/other/.huskies/project.toml");
|
||||
assert!(!is_config_file(&other_root_config, &git_root));
|
||||
}
|
||||
|
||||
// ── sweep_done_to_archived (CRDT-based) ─────────────────────────────────
|
||||
//
|
||||
// The sweep function reads from `read_all_typed()` and checks
|
||||
// `Stage::Done { merged_at, .. }`. Items created via
|
||||
// `write_item_with_content("5_done")` project `merged_at = Utc::now()`,
|
||||
// so we test with Duration::ZERO to sweep immediately and with a long
|
||||
// retention to verify items are kept. No filesystem access is involved.
|
||||
|
||||
#[test]
|
||||
fn sweep_moves_old_items_to_archived() {
|
||||
crate::db::ensure_content_store();
|
||||
crate::db::write_item_with_content("9880_story_sweep_old", "5_done", "---\nname: old\n---\n");
|
||||
|
||||
// With ZERO retention, any Done item should be swept.
|
||||
sweep_done_to_archived(Duration::ZERO);
|
||||
|
||||
// Verify the item was moved to 6_archived in the CRDT.
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9880_story_sweep_old");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
|
||||
"item should be archived after sweep"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sweep_keeps_recent_items_in_done() {
|
||||
crate::db::ensure_content_store();
|
||||
crate::db::write_item_with_content("9881_story_sweep_new", "5_done", "---\nname: new\n---\n");
|
||||
|
||||
// With a very long retention, the item (merged_at ≈ now) should stay.
|
||||
sweep_done_to_archived(Duration::from_secs(999_999));
|
||||
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9881_story_sweep_new");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })),
|
||||
"item should remain in Done with long retention"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sweep_respects_custom_retention() {
|
||||
crate::db::ensure_content_store();
|
||||
crate::db::write_item_with_content(
|
||||
"9882_story_sweep_custom",
|
||||
"5_done",
|
||||
"---\nname: custom\n---\n",
|
||||
);
|
||||
|
||||
// With ZERO retention, sweep should promote.
|
||||
sweep_done_to_archived(Duration::ZERO);
|
||||
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9882_story_sweep_custom");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
|
||||
"item should be archived with zero retention"
|
||||
);
|
||||
}
|
||||
|
||||
/// Prove that the sweep reads `merged_at` from the CRDT (not `Utc::now()`).
|
||||
///
|
||||
/// This test sets `merged_at` to 10 seconds in the past and uses a 5-second
|
||||
/// retention. If the sweep were still using `Utc::now()` as the start time
|
||||
/// (the original bug), the elapsed time would be ~0 and the item would NOT
|
||||
/// be swept. With the fix, the item is swept because 10s > 5s retention.
|
||||
#[test]
|
||||
fn sweep_uses_crdt_merged_at_not_utc_now() {
|
||||
crate::db::ensure_content_store();
|
||||
|
||||
let ten_seconds_ago = (chrono::Utc::now() - chrono::Duration::seconds(10)).timestamp() as f64;
|
||||
|
||||
// Write item in 5_done with an explicit past merged_at timestamp.
|
||||
crate::crdt_state::write_item(
|
||||
"9883_story_sweep_merged_at",
|
||||
"5_done",
|
||||
Some("merged_at test"),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(ten_seconds_ago),
|
||||
);
|
||||
|
||||
// 5-second retention: item is 10s old → should be swept.
|
||||
sweep_done_to_archived(Duration::from_secs(5));
|
||||
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9883_story_sweep_merged_at");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
|
||||
"item with merged_at 10s ago should be archived with 5s retention"
|
||||
);
|
||||
}
|
||||
|
||||
/// Prove that an item with merged_at NEWER than done_retention is NOT swept.
|
||||
#[test]
|
||||
fn sweep_keeps_item_newer_than_retention() {
|
||||
crate::db::ensure_content_store();
|
||||
|
||||
let one_second_ago = (chrono::Utc::now() - chrono::Duration::seconds(1)).timestamp() as f64;
|
||||
|
||||
crate::crdt_state::write_item(
|
||||
"9884_story_sweep_recent",
|
||||
"5_done",
|
||||
Some("recent merged_at test"),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
Some(one_second_ago),
|
||||
);
|
||||
|
||||
// 1-hour retention: item is only 1s old → should NOT be swept.
|
||||
sweep_done_to_archived(Duration::from_secs(3600));
|
||||
|
||||
let items = crate::pipeline_state::read_all_typed();
|
||||
let item = items
|
||||
.iter()
|
||||
.find(|i| i.story_id.0 == "9884_story_sweep_recent");
|
||||
assert!(
|
||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Done { .. })),
|
||||
"item with merged_at 1s ago should stay in Done with 1-hour retention"
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user