huskies: merge 533_story_crdt_based_done_archived_sweep_to_replace_filesystem_based_watcher_sweep
This commit is contained in:
+19
-175
@@ -323,37 +323,24 @@ fn flush_pending(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention
|
/// Sweep items in `5_done` whose `merged_at` timestamp exceeds the retention
|
||||||
/// duration to `6_archived` via CRDT state transitions. Also prunes worktrees
|
/// duration to `6_archived` via CRDT state transitions.
|
||||||
/// for items already in `6_archived`.
|
|
||||||
///
|
///
|
||||||
/// All state is read from CRDT — no filesystem access.
|
/// All state is read from and written to CRDT — no filesystem access.
|
||||||
fn sweep_done_to_archived(_work_dir: &Path, git_root: &Path, done_retention: Duration) {
|
/// Worktree pruning is handled separately by the CRDT event subscriber.
|
||||||
|
pub(crate) fn sweep_done_to_archived(done_retention: Duration) {
|
||||||
use crate::pipeline_state::{Stage, read_all_typed};
|
use crate::pipeline_state::{Stage, read_all_typed};
|
||||||
|
|
||||||
for item in read_all_typed() {
|
for item in read_all_typed() {
|
||||||
match &item.stage {
|
if let Stage::Done { merged_at, .. } = &item.stage {
|
||||||
Stage::Done { merged_at, .. } => {
|
let age = chrono::Utc::now()
|
||||||
let age = chrono::Utc::now()
|
.signed_duration_since(*merged_at)
|
||||||
.signed_duration_since(*merged_at)
|
.to_std()
|
||||||
.to_std()
|
.unwrap_or_default();
|
||||||
.unwrap_or_default();
|
if age >= done_retention {
|
||||||
if age >= done_retention {
|
|
||||||
let story_id = &item.story_id.0;
|
|
||||||
crate::db::move_item_stage(story_id, "6_archived", None);
|
|
||||||
slog!("[watcher] sweep: promoted {story_id} → 6_archived/");
|
|
||||||
if let Err(e) = crate::worktree::prune_worktree_sync(git_root, story_id) {
|
|
||||||
slog!("[watcher] sweep: worktree prune failed for {story_id}: {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Stage::Archived { .. } => {
|
|
||||||
// Prune stale worktrees for archived items.
|
|
||||||
let story_id = &item.story_id.0;
|
let story_id = &item.story_id.0;
|
||||||
if let Err(e) = crate::worktree::prune_worktree_sync(git_root, story_id) {
|
crate::db::move_item_stage(story_id, "6_archived", None);
|
||||||
slog!("[watcher] sweep: worktree prune failed for {story_id}: {e}");
|
slog!("[watcher] sweep: promoted {story_id} → 6_archived/");
|
||||||
}
|
|
||||||
}
|
}
|
||||||
_ => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -361,18 +348,16 @@ fn sweep_done_to_archived(_work_dir: &Path, git_root: &Path, done_retention: Dur
|
|||||||
/// Start the filesystem watcher on a dedicated OS thread.
|
/// Start the filesystem watcher on a dedicated OS thread.
|
||||||
///
|
///
|
||||||
/// Watches `.huskies/project.toml` and `.huskies/agents.toml` for config
|
/// Watches `.huskies/project.toml` and `.huskies/agents.toml` for config
|
||||||
/// hot-reload, and periodically sweeps `5_done/` → `6_archived/`.
|
/// hot-reload, and periodically sweeps `5_done/` → `6_archived/` via CRDT.
|
||||||
///
|
///
|
||||||
/// Work-item pipeline events (stage transitions) are no longer driven by
|
/// Work-item pipeline events (stage transitions) are no longer driven by
|
||||||
/// filesystem events — they originate from CRDT state changes via
|
/// filesystem events — they originate from CRDT state changes via
|
||||||
/// [`crate::crdt_state::subscribe`].
|
/// [`crate::crdt_state::subscribe`].
|
||||||
///
|
///
|
||||||
/// `work_dir` — absolute path to `.huskies/work/` (used for sweep only).
|
|
||||||
/// `git_root` — project root (passed to `git` commands and config loading).
|
/// `git_root` — project root (passed to `git` commands and config loading).
|
||||||
/// `event_tx` — broadcast sender for `ConfigChanged` events.
|
/// `event_tx` — broadcast sender for `ConfigChanged` events.
|
||||||
/// `watcher_config` — initial sweep configuration loaded from `project.toml`.
|
/// `watcher_config` — initial sweep configuration loaded from `project.toml`.
|
||||||
pub fn start_watcher(
|
pub fn start_watcher(
|
||||||
work_dir: PathBuf,
|
|
||||||
git_root: PathBuf,
|
git_root: PathBuf,
|
||||||
event_tx: broadcast::Sender<WatcherEvent>,
|
event_tx: broadcast::Sender<WatcherEvent>,
|
||||||
watcher_config: WatcherConfig,
|
watcher_config: WatcherConfig,
|
||||||
@@ -498,7 +483,7 @@ pub fn start_watcher(
|
|||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
if now.duration_since(last_sweep) >= sweep_interval {
|
if now.duration_since(last_sweep) >= sweep_interval {
|
||||||
last_sweep = now;
|
last_sweep = now;
|
||||||
sweep_done_to_archived(&work_dir, &git_root, done_retention);
|
sweep_done_to_archived(done_retention);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1101,16 +1086,14 @@ mod tests {
|
|||||||
|
|
||||||
// ── sweep_done_to_archived (CRDT-based) ─────────────────────────────────
|
// ── sweep_done_to_archived (CRDT-based) ─────────────────────────────────
|
||||||
//
|
//
|
||||||
// The sweep function now reads from `read_all_typed()` and checks
|
// The sweep function reads from `read_all_typed()` and checks
|
||||||
// `Stage::Done { merged_at, .. }`. Items created via
|
// `Stage::Done { merged_at, .. }`. Items created via
|
||||||
// `write_item_with_content("5_done")` project `merged_at = Utc::now()`,
|
// `write_item_with_content("5_done")` project `merged_at = Utc::now()`,
|
||||||
// so we test with Duration::ZERO to sweep immediately and with a long
|
// so we test with Duration::ZERO to sweep immediately and with a long
|
||||||
// retention to verify items are kept.
|
// retention to verify items are kept. No filesystem access is involved.
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn sweep_moves_old_items_to_archived() {
|
fn sweep_moves_old_items_to_archived() {
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
|
|
||||||
crate::db::ensure_content_store();
|
crate::db::ensure_content_store();
|
||||||
crate::db::write_item_with_content(
|
crate::db::write_item_with_content(
|
||||||
"9880_story_sweep_old",
|
"9880_story_sweep_old",
|
||||||
@@ -1119,11 +1102,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// With ZERO retention, any Done item should be swept.
|
// With ZERO retention, any Done item should be swept.
|
||||||
sweep_done_to_archived(
|
sweep_done_to_archived(Duration::ZERO);
|
||||||
&tmp.path().join(".huskies/work"),
|
|
||||||
tmp.path(),
|
|
||||||
Duration::ZERO,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Verify the item was moved to 6_archived in the CRDT.
|
// Verify the item was moved to 6_archived in the CRDT.
|
||||||
let items = crate::pipeline_state::read_all_typed();
|
let items = crate::pipeline_state::read_all_typed();
|
||||||
@@ -1136,8 +1115,6 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn sweep_keeps_recent_items_in_done() {
|
fn sweep_keeps_recent_items_in_done() {
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
|
|
||||||
crate::db::ensure_content_store();
|
crate::db::ensure_content_store();
|
||||||
crate::db::write_item_with_content(
|
crate::db::write_item_with_content(
|
||||||
"9881_story_sweep_new",
|
"9881_story_sweep_new",
|
||||||
@@ -1146,11 +1123,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// With a very long retention, the item (merged_at ≈ now) should stay.
|
// With a very long retention, the item (merged_at ≈ now) should stay.
|
||||||
sweep_done_to_archived(
|
sweep_done_to_archived(Duration::from_secs(999_999));
|
||||||
&tmp.path().join(".huskies/work"),
|
|
||||||
tmp.path(),
|
|
||||||
Duration::from_secs(999_999),
|
|
||||||
);
|
|
||||||
|
|
||||||
let items = crate::pipeline_state::read_all_typed();
|
let items = crate::pipeline_state::read_all_typed();
|
||||||
let item = items.iter().find(|i| i.story_id.0 == "9881_story_sweep_new");
|
let item = items.iter().find(|i| i.story_id.0 == "9881_story_sweep_new");
|
||||||
@@ -1162,8 +1135,6 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn sweep_respects_custom_retention() {
|
fn sweep_respects_custom_retention() {
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
|
|
||||||
crate::db::ensure_content_store();
|
crate::db::ensure_content_store();
|
||||||
crate::db::write_item_with_content(
|
crate::db::write_item_with_content(
|
||||||
"9882_story_sweep_custom",
|
"9882_story_sweep_custom",
|
||||||
@@ -1172,11 +1143,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// With ZERO retention, sweep should promote.
|
// With ZERO retention, sweep should promote.
|
||||||
sweep_done_to_archived(
|
sweep_done_to_archived(Duration::ZERO);
|
||||||
&tmp.path().join(".huskies/work"),
|
|
||||||
tmp.path(),
|
|
||||||
Duration::ZERO,
|
|
||||||
);
|
|
||||||
|
|
||||||
let items = crate::pipeline_state::read_all_typed();
|
let items = crate::pipeline_state::read_all_typed();
|
||||||
let item = items.iter().find(|i| i.story_id.0 == "9882_story_sweep_custom");
|
let item = items.iter().find(|i| i.story_id.0 == "9882_story_sweep_custom");
|
||||||
@@ -1185,127 +1152,4 @@ mod tests {
|
|||||||
"item should be archived with zero retention"
|
"item should be archived with zero retention"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── sweep worktree pruning ─────────────────────────────────────────────
|
|
||||||
|
|
||||||
/// Helper: create a real git worktree at `wt_path` on a new branch.
|
|
||||||
fn create_git_worktree(git_root: &std::path::Path, wt_path: &std::path::Path, branch: &str) {
|
|
||||||
use std::process::Command;
|
|
||||||
let _ = Command::new("git")
|
|
||||||
.args(["branch", branch])
|
|
||||||
.current_dir(git_root)
|
|
||||||
.output();
|
|
||||||
Command::new("git")
|
|
||||||
.args(["worktree", "add", &wt_path.to_string_lossy(), branch])
|
|
||||||
.current_dir(git_root)
|
|
||||||
.output()
|
|
||||||
.expect("git worktree add");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn sweep_prunes_worktree_when_story_promoted_to_archived() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let git_root = tmp.path().to_path_buf();
|
|
||||||
init_git_repo(&git_root);
|
|
||||||
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
let story_id = "9883_story_prune_on_promote";
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
story_id,
|
|
||||||
"5_done",
|
|
||||||
"---\nname: test\n---\n",
|
|
||||||
);
|
|
||||||
|
|
||||||
// Create a real git worktree for this story.
|
|
||||||
let wt_path = crate::worktree::worktree_path(&git_root, story_id);
|
|
||||||
fs::create_dir_all(wt_path.parent().unwrap()).unwrap();
|
|
||||||
create_git_worktree(&git_root, &wt_path, &format!("feature/story-{story_id}"));
|
|
||||||
assert!(wt_path.exists(), "worktree must exist before sweep");
|
|
||||||
|
|
||||||
sweep_done_to_archived(
|
|
||||||
&git_root.join(".huskies/work"),
|
|
||||||
&git_root,
|
|
||||||
Duration::ZERO,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Story must be archived in CRDT.
|
|
||||||
let items = crate::pipeline_state::read_all_typed();
|
|
||||||
let item = items.iter().find(|i| i.story_id.0 == story_id);
|
|
||||||
assert!(
|
|
||||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
|
|
||||||
"story should be archived"
|
|
||||||
);
|
|
||||||
// Worktree must be removed.
|
|
||||||
assert!(
|
|
||||||
!wt_path.exists(),
|
|
||||||
"worktree should be removed after archiving"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn sweep_prunes_worktrees_for_already_archived_stories() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let git_root = tmp.path().to_path_buf();
|
|
||||||
init_git_repo(&git_root);
|
|
||||||
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
let story_id = "9884_story_stale_worktree";
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
story_id,
|
|
||||||
"6_archived",
|
|
||||||
"---\nname: stale\n---\n",
|
|
||||||
);
|
|
||||||
|
|
||||||
// Create a real git worktree that was never cleaned up.
|
|
||||||
let wt_path = crate::worktree::worktree_path(&git_root, story_id);
|
|
||||||
fs::create_dir_all(wt_path.parent().unwrap()).unwrap();
|
|
||||||
create_git_worktree(&git_root, &wt_path, &format!("feature/story-{story_id}"));
|
|
||||||
assert!(wt_path.exists(), "stale worktree must exist before sweep");
|
|
||||||
|
|
||||||
sweep_done_to_archived(
|
|
||||||
&git_root.join(".huskies/work"),
|
|
||||||
&git_root,
|
|
||||||
Duration::from_secs(999_999),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Stale worktree should be pruned.
|
|
||||||
assert!(
|
|
||||||
!wt_path.exists(),
|
|
||||||
"stale worktree should be pruned by sweep"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn sweep_archives_story_even_when_worktree_removal_fails() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let git_root = tmp.path().to_path_buf();
|
|
||||||
init_git_repo(&git_root);
|
|
||||||
|
|
||||||
crate::db::ensure_content_store();
|
|
||||||
let story_id = "9885_story_fake_worktree";
|
|
||||||
crate::db::write_item_with_content(
|
|
||||||
story_id,
|
|
||||||
"5_done",
|
|
||||||
"---\nname: test\n---\n",
|
|
||||||
);
|
|
||||||
|
|
||||||
// Create a plain directory at the expected worktree path — not a real
|
|
||||||
// git worktree, so `git worktree remove` will fail.
|
|
||||||
let wt_path = crate::worktree::worktree_path(&git_root, story_id);
|
|
||||||
fs::create_dir_all(&wt_path).unwrap();
|
|
||||||
|
|
||||||
sweep_done_to_archived(
|
|
||||||
&git_root.join(".huskies/work"),
|
|
||||||
&git_root,
|
|
||||||
Duration::ZERO,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Story must be archived in CRDT despite worktree removal failure.
|
|
||||||
let items = crate::pipeline_state::read_all_typed();
|
|
||||||
let item = items.iter().find(|i| i.story_id.0 == story_id);
|
|
||||||
assert!(
|
|
||||||
item.is_some_and(|i| matches!(i.stage, crate::pipeline_state::Stage::Archived { .. })),
|
|
||||||
"story should be archived even when worktree removal fails"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
+24
-10
@@ -328,26 +328,40 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
let watchdog_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
|
let watchdog_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
|
||||||
AgentPool::spawn_watchdog(Arc::clone(&agents), watchdog_root);
|
AgentPool::spawn_watchdog(Arc::clone(&agents), watchdog_root);
|
||||||
|
|
||||||
// Filesystem watcher: only watches config files (project.toml, agents.toml) and
|
// Filesystem watcher: watches config files (project.toml, agents.toml) for
|
||||||
// handles the sweep of done→archived. Work-item pipeline events are now driven
|
// hot-reload and runs the CRDT-based done→archived sweep. Work-item pipeline
|
||||||
// by CRDT state transitions via crdt_state::subscribe().
|
// events are driven by CRDT state transitions via crdt_state::subscribe().
|
||||||
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
|
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
|
||||||
let work_dir = root.join(".huskies").join("work");
|
let watcher_config = config::ProjectConfig::load(root)
|
||||||
if work_dir.is_dir() {
|
.map(|c| c.watcher)
|
||||||
let watcher_config = config::ProjectConfig::load(root)
|
.unwrap_or_default();
|
||||||
.map(|c| c.watcher)
|
io::watcher::start_watcher(root.clone(), watcher_tx.clone(), watcher_config);
|
||||||
.unwrap_or_default();
|
|
||||||
io::watcher::start_watcher(work_dir, root.clone(), watcher_tx.clone(), watcher_config);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bridge CRDT state-transition events to the watcher broadcast channel.
|
// Bridge CRDT state-transition events to the watcher broadcast channel.
|
||||||
// This replaces the filesystem watcher as the source of WorkItem events.
|
// This replaces the filesystem watcher as the source of WorkItem events.
|
||||||
|
// Also prunes worktrees when stories transition to 6_archived.
|
||||||
{
|
{
|
||||||
let crdt_watcher_tx = watcher_tx.clone();
|
let crdt_watcher_tx = watcher_tx.clone();
|
||||||
|
let crdt_prune_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
|
||||||
if let Some(mut crdt_rx) = crdt_state::subscribe() {
|
if let Some(mut crdt_rx) = crdt_state::subscribe() {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Ok(evt) = crdt_rx.recv().await {
|
while let Ok(evt) = crdt_rx.recv().await {
|
||||||
|
// Prune the worktree when a story is archived.
|
||||||
|
if evt.to_stage == "6_archived"
|
||||||
|
&& let Some(root) = crdt_prune_root.as_ref().cloned()
|
||||||
|
{
|
||||||
|
let story_id = evt.story_id.clone();
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
|
if let Err(e) =
|
||||||
|
crate::worktree::prune_worktree_sync(&root, &story_id)
|
||||||
|
{
|
||||||
|
crate::slog!(
|
||||||
|
"[crdt] worktree prune failed for {story_id}: {e}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
let (action, commit_msg) =
|
let (action, commit_msg) =
|
||||||
io::watcher::stage_metadata(&evt.to_stage, &evt.story_id)
|
io::watcher::stage_metadata(&evt.to_stage, &evt.story_id)
|
||||||
.unwrap_or(("update", format!("huskies: update {}", evt.story_id)));
|
.unwrap_or(("update", format!("huskies: update {}", evt.story_id)));
|
||||||
|
|||||||
Reference in New Issue
Block a user