diff --git a/server/src/chat/transport/matrix/bot/format.rs b/server/src/chat/transport/matrix/bot/format.rs index 6346c47a..fa2c87e5 100644 --- a/server/src/chat/transport/matrix/bot/format.rs +++ b/server/src/chat/transport/matrix/bot/format.rs @@ -3,15 +3,9 @@ use pulldown_cmark::{Options, Parser, html}; /// Format the startup greeting the bot sends to each room when it comes online. /// /// Uses the bot's configured display name so the message reads naturally -/// (e.g. "Timmy is online."). When `drift_count` is `Some(n)` and `n > 0`, -/// appends a drift warning so operators know to check the server logs. -pub fn format_startup_announcement(bot_name: &str, drift_count: Option) -> String { - match drift_count { - Some(n) if n > 0 => format!( - "{bot_name} is online. \u{26a0}\u{fe0f} {n} item(s) have CRDT/DB drift \u{2014} check server logs." - ), - _ => format!("{bot_name} is online."), - } +/// (e.g. "Timmy is online."). +pub fn format_startup_announcement(bot_name: &str) -> String { + format!("{bot_name} is online.") } /// Convert a Markdown string to an HTML string using pulldown-cmark. @@ -103,27 +97,13 @@ mod tests { #[test] fn startup_announcement_uses_bot_name() { - assert_eq!(format_startup_announcement("Timmy", None), "Timmy is online."); + assert_eq!(format_startup_announcement("Timmy"), "Timmy is online."); } #[test] fn startup_announcement_uses_configured_display_name_not_hardcoded() { - assert_eq!(format_startup_announcement("HAL", None), "HAL is online."); - assert_eq!(format_startup_announcement("Assistant", None), "Assistant is online."); - } - - #[test] - fn startup_announcement_with_zero_drift_omits_warning() { - assert_eq!(format_startup_announcement("Timmy", Some(0)), "Timmy is online."); - } - - #[test] - fn startup_announcement_with_drift_includes_count_and_warning() { - let msg = format_startup_announcement("Timmy", Some(3)); - assert!(msg.starts_with("Timmy is online."), "should start with online msg: {msg}"); - assert!(msg.contains('3'), "should include drift count: {msg}"); - assert!(msg.contains("drift"), "should mention drift: {msg}"); - assert!(msg.contains("server logs"), "should mention server logs: {msg}"); + assert_eq!(format_startup_announcement("HAL"), "HAL is online."); + assert_eq!(format_startup_announcement("Assistant"), "Assistant is online."); } #[test] diff --git a/server/src/chat/transport/matrix/bot/run.rs b/server/src/chat/transport/matrix/bot/run.rs index 6b8c5de7..a64fe41b 100644 --- a/server/src/chat/transport/matrix/bot/run.rs +++ b/server/src/chat/transport/matrix/bot/run.rs @@ -297,7 +297,7 @@ pub async fn run_bot( // bot is online. This runs once per process start — the sync loop handles // reconnects internally so this code is never reached again on a network // blip or sync resumption. - let announce_msg = format_startup_announcement(&announce_bot_name, crate::startup_reconcile::drift_count()); + let announce_msg = format_startup_announcement(&announce_bot_name); let announce_html = markdown_to_html(&announce_msg); slog!("[matrix-bot] Sending startup announcement: {announce_msg}"); for room_id in &announce_room_ids { diff --git a/server/src/config.rs b/server/src/config.rs index 22f12f9d..2b5f6025 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -53,14 +53,6 @@ pub struct ProjectConfig { /// so both machines see the same pipeline state in real-time. #[serde(default)] pub rendezvous: Option, - /// Whether to run the startup state-reconcile pass. - /// - /// The pass compares pipeline state across CRDT, `pipeline_items`, and - /// filesystem shadows after CRDT replay, logging any drift it finds. - /// Set to `false` to disable during the migration window if the pass - /// produces too much noise. Default: `true`. - #[serde(default = "default_reconcile_on_startup")] - pub reconcile_on_startup: bool, } /// Configuration for the filesystem watcher's sweep behaviour. @@ -108,9 +100,6 @@ fn default_rate_limit_notifications() -> bool { true } -fn default_reconcile_on_startup() -> bool { - true -} #[derive(Debug, Clone, Deserialize)] #[allow(dead_code)] @@ -209,8 +198,6 @@ struct LegacyProjectConfig { rate_limit_notifications: bool, #[serde(default)] timezone: Option, - #[serde(default = "default_reconcile_on_startup")] - reconcile_on_startup: bool, } impl Default for ProjectConfig { @@ -241,7 +228,6 @@ impl Default for ProjectConfig { rate_limit_notifications: default_rate_limit_notifications(), timezone: None, rendezvous: None, - reconcile_on_startup: default_reconcile_on_startup(), } } } @@ -318,7 +304,6 @@ impl ProjectConfig { rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, rendezvous: None, - reconcile_on_startup: legacy.reconcile_on_startup, }; validate_agents(&config.agent)?; return Ok(config); @@ -347,7 +332,6 @@ impl ProjectConfig { rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, rendezvous: None, - reconcile_on_startup: legacy.reconcile_on_startup, }; validate_agents(&config.agent)?; Ok(config) @@ -364,7 +348,6 @@ impl ProjectConfig { rate_limit_notifications: legacy.rate_limit_notifications, timezone: legacy.timezone, rendezvous: None, - reconcile_on_startup: legacy.reconcile_on_startup, }) } } diff --git a/server/src/main.rs b/server/src/main.rs index b3ba071f..1c7e3fdf 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -16,7 +16,6 @@ mod llm; pub mod log_buffer; pub mod rebuild; mod state; -mod startup_reconcile; mod store; mod workflow; pub(crate) mod pipeline_state; @@ -305,20 +304,6 @@ async fn main() -> Result<(), std::io::Error> { if let Err(e) = crdt_state::init(db_path).await { slog!("[crdt] Failed to initialise CRDT state layer: {e}"); } - // Run the startup drift-reconcile pass now that both the CRDT and DB are - // initialised. The pass is cheap (~100 stories < 1 s) and opt-out via - // `reconcile_on_startup = false` in project.toml. - let reconcile_enabled = db_path - .parent() - .and_then(|p| p.parent()) - .and_then(|root| config::ProjectConfig::load(root).ok()) - .map(|cfg| cfg.reconcile_on_startup) - .unwrap_or(true); - if reconcile_enabled - && let Some(project_root) = db_path.parent().and_then(|p| p.parent()) - { - startup_reconcile::reconcile_state(project_root, db_path).await; - } } // (CRDT state layer is initialised above alongside the legacy pipeline.db.) diff --git a/server/src/startup_reconcile.rs b/server/src/startup_reconcile.rs deleted file mode 100644 index 884bda8a..00000000 --- a/server/src/startup_reconcile.rs +++ /dev/null @@ -1,371 +0,0 @@ -//! Startup reconcile pass: detect drift between CRDT and `pipeline_items`. -//! -//! ## What is drift? -//! -//! Huskies maintains pipeline state in two places that must stay in sync: -//! -//! 1. **In-memory CRDT** (`crdt_state`) — reconstructed from `crdt_ops` on startup; the -//! primary source of truth for pipeline metadata. -//! 2. **`pipeline_items` table** — a shadow/materialised view written alongside CRDT updates; -//! used for fast DB queries without parsing CRDT ops. -//! -//! "Drift" means these sources disagree about a story's stage or existence: -//! -//! | Drift type | Meaning | -//! |-----------------|-----------------------------------------------------------------| -//! | `CRDT-only` | Story in CRDT but absent from `pipeline_items` | -//! | `DB-only` | Story in `pipeline_items` but absent from CRDT | -//! | `stage-mismatch`| Story present in CRDT and DB but with different stage values | -//! -//! ## Log format -//! -//! Each drift emits a structured log line: -//! ```text -//! [reconcile] DRIFT story=X crdt_stage=Y db_stage=Z -//! ``` -//! (`MISSING` is used where a source has no record for that story.) -//! -//! ## Opt-out -//! -//! Set `reconcile_on_startup = false` in `.huskies/project.toml` to disable the -//! pass during the migration window if it produces too much noise. - -use std::collections::{HashMap, HashSet}; -use std::path::Path; -use std::sync::OnceLock; - -use crate::slog; - -// ── Public API ────────────────────────────────────────────────────────────── - -/// Drift count from the last reconcile pass (set once at startup). -/// -/// Returns `None` if the pass has not run yet (e.g. disabled via config or -/// called before startup completes). -static DRIFT_COUNT: OnceLock = OnceLock::new(); - -/// Return the drift count detected during the startup reconcile pass. -pub fn drift_count() -> Option { - DRIFT_COUNT.get().copied() -} - -/// A story whose state is inconsistent across at least two sources. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct ItemDrift { - pub story_id: String, - /// Stage according to the in-memory CRDT, or `None` if absent. - pub crdt_stage: Option, - /// Stage according to the `pipeline_items` shadow table, or `None` if absent. - pub db_stage: Option, -} - -/// Summary returned by [`reconcile_state`]. -pub struct DriftReport { - /// Number of drifted items detected. Read by tests; in production the count - /// is accessed via the [`drift_count()`] free function (stored in `DRIFT_COUNT`). - #[allow(dead_code)] - pub drift_count: usize, -} - -// ── Main entry point ──────────────────────────────────────────────────────── - -/// Run the startup state-reconcile pass. -/// -/// Must be called **after** `crdt_state::init()` so the in-memory CRDT is -/// populated, and **after** `db::init()` so `pipeline_items` is accessible. -/// -/// The function is fast: a full scan with ~100 stories completes in well under -/// 1 second because it reads cached in-memory state (CRDT) and performs a -/// single `SELECT id, stage FROM pipeline_items` query. -/// -/// Returns the number of drifted items and stores the count in a process-wide -/// [`OnceLock`] so other subsystems (e.g. the Matrix bot startup announcement) -/// can read it without re-running the pass. -pub async fn reconcile_state(_project_root: &Path, db_path: &Path) -> DriftReport { - let crdt_stages: HashMap = crate::crdt_state::read_all_items() - .unwrap_or_default() - .into_iter() - .map(|v| (v.story_id, v.stage)) - .collect(); - - let db_stages = read_db_stages(db_path).await; - - slog!( - "[reconcile] Scanning {} CRDT / {} DB items for drift", - crdt_stages.len(), - db_stages.len(), - ); - - let drifts = detect_drift(&crdt_stages, &db_stages); - - for d in &drifts { - slog!( - "[reconcile] DRIFT story={} crdt_stage={} db_stage={}", - d.story_id, - d.crdt_stage.as_deref().unwrap_or("MISSING"), - d.db_stage.as_deref().unwrap_or("MISSING"), - ); - } - - let count = drifts.len(); - if count == 0 { - let total = { - let mut all: HashSet<&String> = HashSet::new(); - all.extend(crdt_stages.keys()); - all.extend(db_stages.keys()); - all.len() - }; - slog!("[reconcile] No drift detected across {} total items.", total); - } else { - slog!( - "[reconcile] DRIFT SUMMARY: {} item(s) with inconsistent state — check server logs.", - count - ); - } - - let _ = DRIFT_COUNT.set(count); - DriftReport { drift_count: count } -} - -// ── Core comparison logic (pure, fully testable) ──────────────────────────── - -/// Detect drift between CRDT and DB stage maps. -/// -/// A story is drifted when any of: -/// - Present in CRDT but absent from DB (`CRDT-only`) -/// - Present in DB but absent from CRDT (`DB-only`) -/// - Present in both CRDT and DB with different stage values (`stage-mismatch`) -pub(crate) fn detect_drift( - crdt: &HashMap, - db: &HashMap, -) -> Vec { - let all_ids: HashSet<&String> = crdt.keys().chain(db.keys()).collect(); - - let mut drifts: Vec = all_ids - .into_iter() - .filter_map(|id| { - let c = crdt.get(id).cloned(); - let d = db.get(id).cloned(); - - let is_drift = match (&c, &d) { - // Both present but stages differ → stage mismatch - (Some(cs), Some(ds)) if cs != ds => true, - // One present, other absent → single-source - (Some(_), None) | (None, Some(_)) => true, - // Both present, same stage → clean - _ => false, - }; - - if is_drift { - Some(ItemDrift { - story_id: id.clone(), - crdt_stage: c, - db_stage: d, - }) - } else { - None - } - }) - .collect(); - - // Sort for deterministic output (useful in tests and logs). - drifts.sort_by(|a, b| a.story_id.cmp(&b.story_id)); - drifts -} - -// ── Data loading helpers ───────────────────────────────────────────────────── - -/// Query `pipeline_items` for all `(id, stage)` rows. -/// -/// Opens a separate read-only connection so this function can run before or -/// after `db::init()` without conflicting with the write pool. -async fn read_db_stages(db_path: &Path) -> HashMap { - use sqlx::sqlite::SqliteConnectOptions; - use sqlx::SqlitePool; - - let options = SqliteConnectOptions::new() - .filename(db_path) - .read_only(true); - - let pool = match SqlitePool::connect_with(options).await { - Ok(p) => p, - Err(e) => { - slog!("[reconcile] Could not open pipeline.db for reconcile scan: {e}"); - return HashMap::new(); - } - }; - - let rows: Vec<(String, String)> = sqlx::query_as("SELECT id, stage FROM pipeline_items") - .fetch_all(&pool) - .await - .unwrap_or_default(); - - rows.into_iter().collect() -} - - -// ── Tests ──────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - - fn stages(pairs: &[(&str, &str)]) -> HashMap { - pairs - .iter() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect() - } - - // ── detect_drift unit tests (pure function) ───────────────────────────── - - #[test] - fn detect_drift_clean_state_no_items() { - // Both sources empty → no drift. - let empty = HashMap::new(); - let drifts = detect_drift(&empty, &empty); - assert!( - drifts.is_empty(), - "expected no drift for empty state, got: {drifts:?}" - ); - } - - #[test] - fn detect_drift_clean_state_matching_stages() { - // Same story, same stage in both CRDT and DB → no drift. - let crdt = stages(&[("42_story_foo", "2_current")]); - let db = stages(&[("42_story_foo", "2_current")]); - let drifts = detect_drift(&crdt, &db); - assert!( - drifts.is_empty(), - "expected no drift when all sources agree, got: {drifts:?}" - ); - } - - #[test] - fn detect_drift_crdt_only_story() { - // Story in CRDT but absent from DB → drift (CRDT-only). - let crdt = stages(&[("10_story_crdt_only", "2_current")]); - let db = HashMap::new(); - let drifts = detect_drift(&crdt, &db); - assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}"); - let d = &drifts[0]; - assert_eq!(d.story_id, "10_story_crdt_only"); - assert_eq!(d.crdt_stage.as_deref(), Some("2_current")); - assert!(d.db_stage.is_none(), "db_stage should be MISSING"); - } - - #[test] - fn detect_drift_db_only_story() { - // Story in DB but absent from CRDT → drift (DB-only). - let crdt = HashMap::new(); - let db = stages(&[("20_story_db_only", "1_backlog")]); - let drifts = detect_drift(&crdt, &db); - assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}"); - let d = &drifts[0]; - assert_eq!(d.story_id, "20_story_db_only"); - assert!(d.crdt_stage.is_none(), "crdt_stage should be MISSING"); - assert_eq!(d.db_stage.as_deref(), Some("1_backlog")); - } - - #[test] - fn detect_drift_stage_mismatch_crdt_vs_db() { - // Same story in CRDT and DB but at different stages → drift (stage mismatch). - let crdt = stages(&[("40_story_mismatch", "4_merge")]); - let db = stages(&[("40_story_mismatch", "5_done")]); - let drifts = detect_drift(&crdt, &db); - assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}"); - let d = &drifts[0]; - assert_eq!(d.crdt_stage.as_deref(), Some("4_merge")); - assert_eq!(d.db_stage.as_deref(), Some("5_done")); - } - - #[test] - fn detect_drift_multiple_stories_mixed() { - // Two clean stories, two drifted → exactly 2 drifts returned. - let crdt = stages(&[ - ("1_story_clean", "2_current"), - ("2_story_crdt_only", "2_current"), - ("3_story_mismatch", "3_qa"), - ("4_story_clean2", "1_backlog"), - ]); - let db = stages(&[ - ("1_story_clean", "2_current"), - ("3_story_mismatch", "4_merge"), // mismatch - ("4_story_clean2", "1_backlog"), - ]); - let drifts = detect_drift(&crdt, &db); - assert_eq!(drifts.len(), 2, "expected 2 drifts, got: {drifts:?}"); - let ids: Vec<&str> = drifts.iter().map(|d| d.story_id.as_str()).collect(); - assert!(ids.contains(&"2_story_crdt_only"), "missing CRDT-only drift"); - assert!(ids.contains(&"3_story_mismatch"), "missing mismatch drift"); - } - - // ── reconcile_state integration tests (temp DB + FS) ──────────────────── - - async fn setup_test_db(path: &Path) -> sqlx::SqlitePool { - use sqlx::sqlite::SqliteConnectOptions; - use sqlx::SqlitePool; - let opts = SqliteConnectOptions::new() - .filename(path) - .create_if_missing(true); - let pool = SqlitePool::connect_with(opts).await.unwrap(); - sqlx::query( - "CREATE TABLE IF NOT EXISTS pipeline_items ( - id TEXT PRIMARY KEY, - name TEXT, - stage TEXT NOT NULL, - agent TEXT, - retry_count INTEGER, - blocked INTEGER, - depends_on TEXT, - created_at TEXT NOT NULL, - updated_at TEXT NOT NULL - )", - ) - .execute(&pool) - .await - .unwrap(); - pool - } - - #[tokio::test] - async fn reconcile_empty_db_yields_no_drift() { - // Empty DB + empty CRDT map → 0 drift via detect_drift. - // We test detect_drift directly because the global CRDT OnceLock - // is shared across test threads and cannot be isolated. - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("pipeline.db"); - setup_test_db(&db_path).await; - - let db_stages = read_db_stages(&db_path).await; - let crdt_stages: HashMap = HashMap::new(); - let drifts = detect_drift(&crdt_stages, &db_stages); - assert_eq!(drifts.len(), 0, "expected no drift for empty state"); - } - - #[tokio::test] - async fn reconcile_db_only_story_is_drift() { - // Story in DB but absent from CRDT map → 1 drift. - let tmp = tempfile::tempdir().unwrap(); - let db_path = tmp.path().join("pipeline.db"); - let pool = setup_test_db(&db_path).await; - - sqlx::query( - "INSERT INTO pipeline_items (id, name, stage, created_at, updated_at) - VALUES ('100_story_db_only', 'DB-only story', '2_current', '2026-01-01', '2026-01-01')", - ) - .execute(&pool) - .await - .unwrap(); - - let db_stages = read_db_stages(&db_path).await; - let crdt_stages: HashMap = HashMap::new(); - let drifts = detect_drift(&crdt_stages, &db_stages); - assert_eq!( - drifts.len(), 1, - "story in DB but not in CRDT should be counted as drift" - ); - } - -} diff --git a/server/src/worktree.rs b/server/src/worktree.rs index 072c9c3f..e8082d21 100644 --- a/server/src/worktree.rs +++ b/server/src/worktree.rs @@ -529,7 +529,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -555,7 +554,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -581,7 +579,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; // Setup command failures are non-fatal — should not panic or propagate run_setup_commands(tmp.path(), &config).await; @@ -607,7 +604,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; // Teardown failures are best-effort — should not propagate assert!(run_teardown_commands(tmp.path(), &config).await.is_ok()); @@ -632,7 +628,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; let info = create_worktree(&project_root, "42_fresh_test", &config, 3001) .await @@ -664,7 +659,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; // First creation let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001) @@ -737,7 +731,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await; @@ -768,7 +761,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; create_worktree(&project_root, "88_remove_by_id", &config, 3001) .await @@ -846,7 +838,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; // Even though setup commands fail, create_worktree must succeed // so the agent can start and fix the problem itself. @@ -880,7 +871,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; // First creation — no setup commands, should succeed create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001) @@ -904,7 +894,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; // Second call — worktree exists, setup commands fail, must still succeed let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await; @@ -934,7 +923,6 @@ mod tests { rate_limit_notifications: true, timezone: None, rendezvous: None, - reconcile_on_startup: true, }; let info = create_worktree(&project_root, "77_remove_async", &config, 3001) .await