From 5377eeae5bcde1567bcb340c6ae0837eb723bd18 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 10 Apr 2026 10:12:54 +0000 Subject: [PATCH] huskies: merge 513_story_startup_reconcile_pass_that_detects_drift_between_crdt_pipeline_items_and_filesystem_shadows --- README.md | 42 ++ .../src/chat/transport/matrix/bot/format.rs | 32 +- server/src/chat/transport/matrix/bot/run.rs | 2 +- server/src/config.rs | 18 + server/src/main.rs | 15 + server/src/startup_reconcile.rs | 512 ++++++++++++++++++ server/src/worktree.rs | 12 + 7 files changed, 626 insertions(+), 7 deletions(-) create mode 100644 server/src/startup_reconcile.rs diff --git a/README.md b/README.md index c9609e2a..6d0dbf18 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,48 @@ script/release 0.7.1 This bumps version in `Cargo.toml` and `package.json`, builds macOS arm64 and Linux amd64 binaries, tags the repo, and publishes a Gitea release with changelog and binaries attached. +## Startup reconcile pass + +On startup, after CRDT replay and database initialisation, huskies runs a +**reconcile pass** that compares pipeline state across three sources: + +1. **In-memory CRDT** — the primary source of truth, reconstructed from + `crdt_ops` on startup. +2. **`pipeline_items` table** — a shadow/materialised view written alongside + CRDT updates, used for fast DB queries. +3. **Filesystem shadows** (`.huskies/work/N_stage/*.md`) — legacy rendering + still written by some paths and read by agent worktrees. + +Any disagreement between these sources is **drift**. The reconcile pass logs a +structured line for each drifted item: + +``` +[reconcile] DRIFT story=X crdt_stage=Y db_stage=Z fs_stage=W +``` + +(`MISSING` is used where a source has no record for that story.) + +### Drift types + +| Type | Meaning | +|------|---------| +| `CRDT-only` | Story present in CRDT but absent from `pipeline_items` | +| `DB-only` | Story present in `pipeline_items` but absent from CRDT | +| `FS-only` | Story on the filesystem but absent from both CRDT and DB | +| `stage-mismatch` | Story present in both CRDT and DB but with different stage values | + +Note: a filesystem shadow that lags behind the CRDT/DB stage (both of which +agree) is **not** treated as drift — the FS is a best-effort rendering and is +allowed to lag. + +If any drift is detected, the Matrix/Slack/WhatsApp bot startup announcement +includes a count and a suggestion to check the server logs. + +### Opt-out + +Set `reconcile_on_startup = false` in `.huskies/project.toml` to disable the +pass during the migration window if it produces noise. + ## License GPL-3.0. See [LICENSE](LICENSE). diff --git a/server/src/chat/transport/matrix/bot/format.rs b/server/src/chat/transport/matrix/bot/format.rs index fa2c87e5..6346c47a 100644 --- a/server/src/chat/transport/matrix/bot/format.rs +++ b/server/src/chat/transport/matrix/bot/format.rs @@ -3,9 +3,15 @@ use pulldown_cmark::{Options, Parser, html}; /// Format the startup greeting the bot sends to each room when it comes online. /// /// Uses the bot's configured display name so the message reads naturally -/// (e.g. "Timmy is online."). -pub fn format_startup_announcement(bot_name: &str) -> String { - format!("{bot_name} is online.") +/// (e.g. "Timmy is online."). When `drift_count` is `Some(n)` and `n > 0`, +/// appends a drift warning so operators know to check the server logs. +pub fn format_startup_announcement(bot_name: &str, drift_count: Option) -> String { + match drift_count { + Some(n) if n > 0 => format!( + "{bot_name} is online. \u{26a0}\u{fe0f} {n} item(s) have CRDT/DB drift \u{2014} check server logs." + ), + _ => format!("{bot_name} is online."), + } } /// Convert a Markdown string to an HTML string using pulldown-cmark. @@ -97,13 +103,27 @@ mod tests { #[test] fn startup_announcement_uses_bot_name() { - assert_eq!(format_startup_announcement("Timmy"), "Timmy is online."); + assert_eq!(format_startup_announcement("Timmy", None), "Timmy is online."); } #[test] fn startup_announcement_uses_configured_display_name_not_hardcoded() { - assert_eq!(format_startup_announcement("HAL"), "HAL is online."); - assert_eq!(format_startup_announcement("Assistant"), "Assistant is online."); + assert_eq!(format_startup_announcement("HAL", None), "HAL is online."); + assert_eq!(format_startup_announcement("Assistant", None), "Assistant is online."); + } + + #[test] + fn startup_announcement_with_zero_drift_omits_warning() { + assert_eq!(format_startup_announcement("Timmy", Some(0)), "Timmy is online."); + } + + #[test] + fn startup_announcement_with_drift_includes_count_and_warning() { + let msg = format_startup_announcement("Timmy", Some(3)); + assert!(msg.starts_with("Timmy is online."), "should start with online msg: {msg}"); + assert!(msg.contains('3'), "should include drift count: {msg}"); + assert!(msg.contains("drift"), "should mention drift: {msg}"); + assert!(msg.contains("server logs"), "should mention server logs: {msg}"); } #[test] diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index a64fe41b..6b8c5de7 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -297,7 +297,7 @@ pub async fn run_bot( // bot is online. This runs once per process start — the sync loop handles // reconnects internally so this code is never reached again on a network // blip or sync resumption. - let announce_msg = format_startup_announcement(&announce_bot_name); + let announce_msg = format_startup_announcement(&announce_bot_name, crate::startup_reconcile::drift_count()); let announce_html = markdown_to_html(&announce_msg); slog!("[matrix-bot] Sending startup announcement: {announce_msg}"); for room_id in &announce_room_ids { diff --git a/server/src/config.rs b/server/src/config.rs index 228d61dc..22f12f9d 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -53,6 +53,14 @@ pub struct ProjectConfig { /// so both machines see the same pipeline state in real-time. #[serde(default)] pub rendezvous: Option, + /// Whether to run the startup state-reconcile pass. + /// + /// The pass compares pipeline state across CRDT, `pipeline_items`, and + /// filesystem shadows after CRDT replay, logging any drift it finds. + /// Set to `false` to disable during the migration window if the pass + /// produces too much noise. Default: `true`. + #[serde(default = "default_reconcile_on_startup")] + pub reconcile_on_startup: bool, } /// Configuration for the filesystem watcher's sweep behaviour. @@ -100,6 +108,10 @@ fn default_rate_limit_notifications() -> bool { true } +fn default_reconcile_on_startup() -> bool { + true +} + #[derive(Debug, Clone, Deserialize)] #[allow(dead_code)] pub struct ComponentConfig { @@ -197,6 +209,8 @@ struct LegacyProjectConfig { rate_limit_notifications: bool, #[serde(default)] timezone: Option, + #[serde(default = "default_reconcile_on_startup")] + reconcile_on_startup: bool, } impl Default for ProjectConfig { @@ -227,6 +241,7 @@ impl Default for ProjectConfig { rate_limit_notifications: default_rate_limit_notifications(), timezone: None, rendezvous: None, + reconcile_on_startup: default_reconcile_on_startup(), } } } @@ -303,6 +318,7 @@ impl ProjectConfig { rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, rendezvous: None, + reconcile_on_startup: legacy.reconcile_on_startup, }; validate_agents(&config.agent)?; return Ok(config); @@ -331,6 +347,7 @@ impl ProjectConfig { rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, rendezvous: None, + reconcile_on_startup: legacy.reconcile_on_startup, }; validate_agents(&config.agent)?; Ok(config) @@ -347,6 +364,7 @@ impl ProjectConfig { rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, rendezvous: None, + reconcile_on_startup: legacy.reconcile_on_startup, }) } } diff --git a/server/src/main.rs b/server/src/main.rs index a8d3e1d2..13821afa 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,6 +15,7 @@ mod llm; pub mod log_buffer; pub mod rebuild; mod state; +mod startup_reconcile; mod store; mod workflow; pub(crate) mod pipeline_state; @@ -303,6 +304,20 @@ async fn main() -> Result<(), std::io::Error> { if let Err(e) = crdt_state::init(db_path).await { slog!("[crdt] Failed to initialise CRDT state layer: {e}"); } + // Run the startup drift-reconcile pass now that both the CRDT and DB are + // initialised. The pass is cheap (~100 stories < 1 s) and opt-out via + // `reconcile_on_startup = false` in project.toml. + let reconcile_enabled = db_path + .parent() + .and_then(|p| p.parent()) + .and_then(|root| config::ProjectConfig::load(root).ok()) + .map(|cfg| cfg.reconcile_on_startup) + .unwrap_or(true); + if reconcile_enabled + && let Some(project_root) = db_path.parent().and_then(|p| p.parent()) + { + startup_reconcile::reconcile_state(project_root, db_path).await; + } } // Import any existing .huskies/work/ stories into the DB content store. diff --git a/server/src/startup_reconcile.rs b/server/src/startup_reconcile.rs new file mode 100644 index 00000000..1f337f64 --- /dev/null +++ b/server/src/startup_reconcile.rs @@ -0,0 +1,512 @@ +//! Startup reconcile pass: detect drift between CRDT, `pipeline_items`, and filesystem shadows. +//! +//! ## What is drift? +//! +//! Huskies maintains pipeline state in three places that must stay in sync: +//! +//! 1. **In-memory CRDT** (`crdt_state`) — reconstructed from `crdt_ops` on startup; the +//! primary source of truth for pipeline metadata. +//! 2. **`pipeline_items` table** — a shadow/materialised view written alongside CRDT updates; +//! used for fast DB queries without parsing CRDT ops. +//! 3. **Filesystem shadows** (`.huskies/work/N_stage/*.md`) — legacy rendering still written +//! by some paths and read by agent worktrees. +//! +//! "Drift" means these sources disagree about a story's stage or existence: +//! +//! | Drift type | Meaning | +//! |-----------------|-----------------------------------------------------------------| +//! | `CRDT-only` | Story in CRDT but absent from `pipeline_items` | +//! | `DB-only` | Story in `pipeline_items` but absent from CRDT | +//! | `FS-only` | Story on filesystem but absent from both CRDT and `pipeline_items` | +//! | `stage-mismatch`| Story present in CRDT and DB but with different stage values | +//! +//! ## Log format +//! +//! Each drift emits a structured log line: +//! ```text +//! [reconcile] DRIFT story=X crdt_stage=Y db_stage=Z fs_stage=W +//! ``` +//! (`MISSING` is used where a source has no record for that story.) +//! +//! ## Opt-out +//! +//! Set `reconcile_on_startup = false` in `.huskies/project.toml` to disable the +//! pass during the migration window if it produces too much noise. + +use std::collections::{HashMap, HashSet}; +use std::path::Path; +use std::sync::OnceLock; + +use crate::slog; + +// ── Public API ────────────────────────────────────────────────────────────── + +/// Drift count from the last reconcile pass (set once at startup). +/// +/// Returns `None` if the pass has not run yet (e.g. disabled via config or +/// called before startup completes). +static DRIFT_COUNT: OnceLock = OnceLock::new(); + +/// Return the drift count detected during the startup reconcile pass. +pub fn drift_count() -> Option { + DRIFT_COUNT.get().copied() +} + +/// A story whose state is inconsistent across at least two sources. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ItemDrift { + pub story_id: String, + /// Stage according to the in-memory CRDT, or `None` if absent. + pub crdt_stage: Option, + /// Stage according to the `pipeline_items` shadow table, or `None` if absent. + pub db_stage: Option, + /// Stage according to the filesystem shadow, or `None` if absent. + pub fs_stage: Option, +} + +/// Summary returned by [`reconcile_state`]. +pub struct DriftReport { + /// Number of drifted items detected. Read by tests; in production the count + /// is accessed via the [`drift_count()`] free function (stored in `DRIFT_COUNT`). + #[allow(dead_code)] + pub drift_count: usize, +} + +// ── Main entry point ──────────────────────────────────────────────────────── + +/// Run the startup state-reconcile pass. +/// +/// Must be called **after** `crdt_state::init()` so the in-memory CRDT is +/// populated, and **after** `db::init()` so `pipeline_items` is accessible. +/// +/// The function is fast: a full scan with ~100 stories completes in well under +/// 1 second because it reads cached in-memory state (CRDT) and performs a +/// single `SELECT id, stage FROM pipeline_items` query. +/// +/// Returns the number of drifted items and stores the count in a process-wide +/// [`OnceLock`] so other subsystems (e.g. the Matrix bot startup announcement) +/// can read it without re-running the pass. +pub async fn reconcile_state(project_root: &Path, db_path: &Path) -> DriftReport { + let crdt_stages: HashMap = crate::crdt_state::read_all_items() + .unwrap_or_default() + .into_iter() + .map(|v| (v.story_id, v.stage)) + .collect(); + + let db_stages = read_db_stages(db_path).await; + let fs_stages = scan_fs_stages(project_root); + + slog!( + "[reconcile] Scanning {} CRDT / {} DB / {} FS items for drift", + crdt_stages.len(), + db_stages.len(), + fs_stages.len() + ); + + let drifts = detect_drift(&crdt_stages, &db_stages, &fs_stages); + + for d in &drifts { + slog!( + "[reconcile] DRIFT story={} crdt_stage={} db_stage={} fs_stage={}", + d.story_id, + d.crdt_stage.as_deref().unwrap_or("MISSING"), + d.db_stage.as_deref().unwrap_or("MISSING"), + d.fs_stage.as_deref().unwrap_or("MISSING"), + ); + } + + let count = drifts.len(); + if count == 0 { + let total = { + let mut all: HashSet<&String> = HashSet::new(); + all.extend(crdt_stages.keys()); + all.extend(db_stages.keys()); + all.extend(fs_stages.keys()); + all.len() + }; + slog!("[reconcile] No drift detected across {} total items.", total); + } else { + slog!( + "[reconcile] DRIFT SUMMARY: {} item(s) with inconsistent state — check server logs.", + count + ); + } + + let _ = DRIFT_COUNT.set(count); + DriftReport { drift_count: count } +} + +// ── Core comparison logic (pure, fully testable) ──────────────────────────── + +/// Detect drift between three stage maps. +/// +/// A story is drifted when any of: +/// - Present in CRDT but absent from DB (`CRDT-only`) +/// - Present in DB but absent from CRDT (`DB-only`) +/// - Present in both CRDT and DB with different stage values (`stage-mismatch`) +/// - Present in filesystem but absent from both CRDT and DB (`FS-only`) +/// +/// FS stage vs CRDT/DB stage disagreement is noted in the drift record's +/// `fs_stage` field but does **not** independently trigger a drift entry — +/// the filesystem shadow is allowed to lag. +pub(crate) fn detect_drift( + crdt: &HashMap, + db: &HashMap, + fs: &HashMap, +) -> Vec { + let all_ids: HashSet<&String> = crdt.keys().chain(db.keys()).chain(fs.keys()).collect(); + + let mut drifts: Vec = all_ids + .into_iter() + .filter_map(|id| { + let c = crdt.get(id).cloned(); + let d = db.get(id).cloned(); + let f = fs.get(id).cloned(); + + let is_drift = match (&c, &d) { + // Both present but stages differ → stage mismatch + (Some(cs), Some(ds)) if cs != ds => true, + // One present, other absent → single-source + (Some(_), None) | (None, Some(_)) => true, + // Both absent but FS present → FS-only + (None, None) => f.is_some(), + // Both present, same stage → clean + _ => false, + }; + + if is_drift { + Some(ItemDrift { + story_id: id.clone(), + crdt_stage: c, + db_stage: d, + fs_stage: f, + }) + } else { + None + } + }) + .collect(); + + // Sort for deterministic output (useful in tests and logs). + drifts.sort_by(|a, b| a.story_id.cmp(&b.story_id)); + drifts +} + +// ── Data loading helpers ───────────────────────────────────────────────────── + +/// Query `pipeline_items` for all `(id, stage)` rows. +/// +/// Opens a separate read-only connection so this function can run before or +/// after `db::init()` without conflicting with the write pool. +async fn read_db_stages(db_path: &Path) -> HashMap { + use sqlx::sqlite::SqliteConnectOptions; + use sqlx::SqlitePool; + + let options = SqliteConnectOptions::new() + .filename(db_path) + .read_only(true); + + let pool = match SqlitePool::connect_with(options).await { + Ok(p) => p, + Err(e) => { + slog!("[reconcile] Could not open pipeline.db for reconcile scan: {e}"); + return HashMap::new(); + } + }; + + let rows: Vec<(String, String)> = sqlx::query_as("SELECT id, stage FROM pipeline_items") + .fetch_all(&pool) + .await + .unwrap_or_default(); + + rows.into_iter().collect() +} + +/// Walk `.huskies/work/` directories to build a stage map from filesystem shadows. +fn scan_fs_stages(project_root: &Path) -> HashMap { + const STAGE_DIRS: &[&str] = &[ + "1_backlog", + "2_current", + "3_qa", + "4_merge", + "5_done", + "6_archived", + ]; + + let mut map = HashMap::new(); + let work_root = project_root.join(".huskies").join("work"); + + for stage in STAGE_DIRS { + let dir = work_root.join(stage); + if !dir.is_dir() { + continue; + } + let Ok(entries) = std::fs::read_dir(&dir) else { + continue; + }; + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().and_then(|e| e.to_str()) != Some("md") { + continue; + } + if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) { + map.insert(stem.to_string(), stage.to_string()); + } + } + } + + map +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + fn stages(pairs: &[(&str, &str)]) -> HashMap { + pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect() + } + + // ── detect_drift unit tests (pure function) ───────────────────────────── + + #[test] + fn detect_drift_clean_state_no_items() { + // All three sources empty → no drift. + let empty = HashMap::new(); + let drifts = detect_drift(&empty, &empty, &empty); + assert!( + drifts.is_empty(), + "expected no drift for empty state, got: {drifts:?}" + ); + } + + #[test] + fn detect_drift_clean_state_matching_stages() { + // Same story, same stage in both CRDT and DB, also present on FS → no drift. + let crdt = stages(&[("42_story_foo", "2_current")]); + let db = stages(&[("42_story_foo", "2_current")]); + let fs = stages(&[("42_story_foo", "2_current")]); + let drifts = detect_drift(&crdt, &db, &fs); + assert!( + drifts.is_empty(), + "expected no drift when all sources agree, got: {drifts:?}" + ); + } + + #[test] + fn detect_drift_crdt_only_story() { + // Story in CRDT but absent from DB and FS → drift (CRDT-only). + let crdt = stages(&[("10_story_crdt_only", "2_current")]); + let db = HashMap::new(); + let fs = HashMap::new(); + let drifts = detect_drift(&crdt, &db, &fs); + assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}"); + let d = &drifts[0]; + assert_eq!(d.story_id, "10_story_crdt_only"); + assert_eq!(d.crdt_stage.as_deref(), Some("2_current")); + assert!(d.db_stage.is_none(), "db_stage should be MISSING"); + } + + #[test] + fn detect_drift_db_only_story() { + // Story in DB but absent from CRDT → drift (DB-only). + let crdt = HashMap::new(); + let db = stages(&[("20_story_db_only", "1_backlog")]); + let fs = HashMap::new(); + let drifts = detect_drift(&crdt, &db, &fs); + assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}"); + let d = &drifts[0]; + assert_eq!(d.story_id, "20_story_db_only"); + assert!(d.crdt_stage.is_none(), "crdt_stage should be MISSING"); + assert_eq!(d.db_stage.as_deref(), Some("1_backlog")); + } + + #[test] + fn detect_drift_fs_only_story() { + // Story on filesystem but absent from both CRDT and DB → drift (FS-only). + let crdt = HashMap::new(); + let db = HashMap::new(); + let fs = stages(&[("30_story_fs_only", "3_qa")]); + let drifts = detect_drift(&crdt, &db, &fs); + assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}"); + let d = &drifts[0]; + assert_eq!(d.story_id, "30_story_fs_only"); + assert!(d.crdt_stage.is_none(), "crdt_stage should be MISSING"); + assert!(d.db_stage.is_none(), "db_stage should be MISSING"); + assert_eq!(d.fs_stage.as_deref(), Some("3_qa")); + } + + #[test] + fn detect_drift_stage_mismatch_crdt_vs_db() { + // Same story in CRDT and DB but at different stages → drift (stage mismatch). + let crdt = stages(&[("40_story_mismatch", "4_merge")]); + let db = stages(&[("40_story_mismatch", "5_done")]); + let fs = stages(&[("40_story_mismatch", "4_merge")]); + let drifts = detect_drift(&crdt, &db, &fs); + assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}"); + let d = &drifts[0]; + assert_eq!(d.crdt_stage.as_deref(), Some("4_merge")); + assert_eq!(d.db_stage.as_deref(), Some("5_done")); + // FS stage is recorded even though it's not what triggered the drift. + assert_eq!(d.fs_stage.as_deref(), Some("4_merge")); + } + + #[test] + fn detect_drift_fs_lag_does_not_cause_drift() { + // FS is behind CRDT/DB but CRDT == DB → not a drift (FS is a lagging shadow). + let crdt = stages(&[("50_story_fs_lag", "5_done")]); + let db = stages(&[("50_story_fs_lag", "5_done")]); + let fs = stages(&[("50_story_fs_lag", "4_merge")]); // FS is behind + let drifts = detect_drift(&crdt, &db, &fs); + assert!( + drifts.is_empty(), + "FS lag alone should not trigger drift when CRDT == DB, got: {drifts:?}" + ); + } + + #[test] + fn detect_drift_multiple_stories_mixed() { + // Two clean stories, two drifted → exactly 2 drifts returned. + let crdt = stages(&[ + ("1_story_clean", "2_current"), + ("2_story_crdt_only", "2_current"), + ("3_story_mismatch", "3_qa"), + ("4_story_clean2", "1_backlog"), + ]); + let db = stages(&[ + ("1_story_clean", "2_current"), + ("3_story_mismatch", "4_merge"), // mismatch + ("4_story_clean2", "1_backlog"), + ]); + let fs: HashMap = HashMap::new(); + let drifts = detect_drift(&crdt, &db, &fs); + assert_eq!(drifts.len(), 2, "expected 2 drifts, got: {drifts:?}"); + let ids: Vec<&str> = drifts.iter().map(|d| d.story_id.as_str()).collect(); + assert!(ids.contains(&"2_story_crdt_only"), "missing CRDT-only drift"); + assert!(ids.contains(&"3_story_mismatch"), "missing mismatch drift"); + } + + // ── scan_fs_stages unit test ───────────────────────────────────────────── + + #[test] + fn scan_fs_stages_finds_stories_in_each_stage() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let stages_to_create = [ + ("1_backlog", "10_story_a"), + ("2_current", "20_story_b"), + ("5_done", "50_story_c"), + ]; + for (stage, name) in &stages_to_create { + let dir = root.join(".huskies").join("work").join(stage); + std::fs::create_dir_all(&dir).unwrap(); + std::fs::write(dir.join(format!("{name}.md")), "# test\n").unwrap(); + } + + let map = scan_fs_stages(root); + assert_eq!(map.get("10_story_a").map(String::as_str), Some("1_backlog")); + assert_eq!(map.get("20_story_b").map(String::as_str), Some("2_current")); + assert_eq!(map.get("50_story_c").map(String::as_str), Some("5_done")); + } + + #[test] + fn scan_fs_stages_ignores_non_md_files() { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + let dir = root.join(".huskies").join("work").join("1_backlog"); + std::fs::create_dir_all(&dir).unwrap(); + std::fs::write(dir.join("not_a_story.txt"), "ignored").unwrap(); + std::fs::write(dir.join("99_story_real.md"), "# real").unwrap(); + + let map = scan_fs_stages(root); + assert!(!map.contains_key("not_a_story"), "txt file should be ignored"); + assert!(map.contains_key("99_story_real"), "md file should be found"); + } + + // ── reconcile_state integration tests (temp DB + FS) ──────────────────── + + async fn setup_test_db(path: &Path) -> sqlx::SqlitePool { + use sqlx::sqlite::SqliteConnectOptions; + use sqlx::SqlitePool; + let opts = SqliteConnectOptions::new() + .filename(path) + .create_if_missing(true); + let pool = SqlitePool::connect_with(opts).await.unwrap(); + sqlx::query( + "CREATE TABLE IF NOT EXISTS pipeline_items ( + id TEXT PRIMARY KEY, + name TEXT, + stage TEXT NOT NULL, + agent TEXT, + retry_count INTEGER, + blocked INTEGER, + depends_on TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + )", + ) + .execute(&pool) + .await + .unwrap(); + pool + } + + #[tokio::test] + async fn reconcile_state_no_drift_empty_sources() { + // Empty DB + empty FS + uninitialised CRDT → 0 drift. + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("pipeline.db"); + setup_test_db(&db_path).await; + + let report = reconcile_state(tmp.path(), &db_path).await; + assert_eq!( + report.drift_count, 0, + "expected no drift for empty state" + ); + } + + #[tokio::test] + async fn reconcile_state_db_only_story_is_drift() { + // Story in DB but CRDT not initialised (treated as empty) → 1 drift. + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("pipeline.db"); + let pool = setup_test_db(&db_path).await; + + sqlx::query( + "INSERT INTO pipeline_items (id, name, stage, created_at, updated_at) + VALUES ('100_story_db_only', 'DB-only story', '2_current', '2026-01-01', '2026-01-01')", + ) + .execute(&pool) + .await + .unwrap(); + + let report = reconcile_state(tmp.path(), &db_path).await; + assert_eq!( + report.drift_count, 1, + "story in DB but not in CRDT should be counted as drift" + ); + } + + #[tokio::test] + async fn reconcile_state_fs_only_story_is_drift() { + // Story on filesystem, nothing in DB or CRDT → 1 drift (FS-only). + let tmp = tempfile::tempdir().unwrap(); + let db_path = tmp.path().join("pipeline.db"); + setup_test_db(&db_path).await; + + let backlog = tmp.path().join(".huskies").join("work").join("1_backlog"); + std::fs::create_dir_all(&backlog).unwrap(); + std::fs::write(backlog.join("200_story_fs_only.md"), "---\nname: FS-only\n---\n").unwrap(); + + let report = reconcile_state(tmp.path(), &db_path).await; + assert_eq!( + report.drift_count, 1, + "story on FS but absent from CRDT and DB should be counted as drift" + ); + } +} diff --git a/server/src/worktree.rs b/server/src/worktree.rs index e8082d21..072c9c3f 100644 --- a/server/src/worktree.rs +++ b/server/src/worktree.rs @@ -529,6 +529,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -554,6 +555,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -579,6 +581,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; // Setup command failures are non-fatal — should not panic or propagate run_setup_commands(tmp.path(), &config).await; @@ -604,6 +607,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; // Teardown failures are best-effort — should not propagate assert!(run_teardown_commands(tmp.path(), &config).await.is_ok()); @@ -628,6 +632,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; let info = create_worktree(&project_root, "42_fresh_test", &config, 3001) .await @@ -659,6 +664,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; // First creation let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001) @@ -731,6 +737,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await; @@ -761,6 +768,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; create_worktree(&project_root, "88_remove_by_id", &config, 3001) .await @@ -838,6 +846,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; // Even though setup commands fail, create_worktree must succeed // so the agent can start and fix the problem itself. @@ -871,6 +880,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; // First creation — no setup commands, should succeed create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001) @@ -894,6 +904,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; // Second call — worktree exists, setup commands fail, must still succeed let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await; @@ -923,6 +934,7 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, + reconcile_on_startup: true, }; let info = create_worktree(&project_root, "77_remove_async", &config, 3001) .await