huskies: merge 532_story_remove_startup_reconcile_pass_and_drift_notification_no_filesystem_to_reconcile_against

This commit is contained in:
dave
2026-04-10 16:36:40 +00:00
parent b88857c2e4
commit a59f4fc1a5
6 changed files with 7 additions and 442 deletions
+6 -26
View File
@@ -3,15 +3,9 @@ use pulldown_cmark::{Options, Parser, html};
/// Format the startup greeting the bot sends to each room when it comes online.
///
/// Uses the bot's configured display name so the message reads naturally
/// (e.g. "Timmy is online."). When `drift_count` is `Some(n)` and `n > 0`,
/// appends a drift warning so operators know to check the server logs.
pub fn format_startup_announcement(bot_name: &str, drift_count: Option<usize>) -> String {
match drift_count {
Some(n) if n > 0 => format!(
"{bot_name} is online. \u{26a0}\u{fe0f} {n} item(s) have CRDT/DB drift \u{2014} check server logs."
),
_ => format!("{bot_name} is online."),
}
/// (e.g. "Timmy is online.").
pub fn format_startup_announcement(bot_name: &str) -> String {
format!("{bot_name} is online.")
}
/// Convert a Markdown string to an HTML string using pulldown-cmark.
@@ -103,27 +97,13 @@ mod tests {
#[test]
fn startup_announcement_uses_bot_name() {
assert_eq!(format_startup_announcement("Timmy", None), "Timmy is online.");
assert_eq!(format_startup_announcement("Timmy"), "Timmy is online.");
}
#[test]
fn startup_announcement_uses_configured_display_name_not_hardcoded() {
assert_eq!(format_startup_announcement("HAL", None), "HAL is online.");
assert_eq!(format_startup_announcement("Assistant", None), "Assistant is online.");
}
#[test]
fn startup_announcement_with_zero_drift_omits_warning() {
assert_eq!(format_startup_announcement("Timmy", Some(0)), "Timmy is online.");
}
#[test]
fn startup_announcement_with_drift_includes_count_and_warning() {
let msg = format_startup_announcement("Timmy", Some(3));
assert!(msg.starts_with("Timmy is online."), "should start with online msg: {msg}");
assert!(msg.contains('3'), "should include drift count: {msg}");
assert!(msg.contains("drift"), "should mention drift: {msg}");
assert!(msg.contains("server logs"), "should mention server logs: {msg}");
assert_eq!(format_startup_announcement("HAL"), "HAL is online.");
assert_eq!(format_startup_announcement("Assistant"), "Assistant is online.");
}
#[test]
+1 -1
View File
@@ -297,7 +297,7 @@ pub async fn run_bot(
// bot is online. This runs once per process start — the sync loop handles
// reconnects internally so this code is never reached again on a network
// blip or sync resumption.
let announce_msg = format_startup_announcement(&announce_bot_name, crate::startup_reconcile::drift_count());
let announce_msg = format_startup_announcement(&announce_bot_name);
let announce_html = markdown_to_html(&announce_msg);
slog!("[matrix-bot] Sending startup announcement: {announce_msg}");
for room_id in &announce_room_ids {
-17
View File
@@ -53,14 +53,6 @@ pub struct ProjectConfig {
/// so both machines see the same pipeline state in real-time.
#[serde(default)]
pub rendezvous: Option<String>,
/// Whether to run the startup state-reconcile pass.
///
/// The pass compares pipeline state across CRDT, `pipeline_items`, and
/// filesystem shadows after CRDT replay, logging any drift it finds.
/// Set to `false` to disable during the migration window if the pass
/// produces too much noise. Default: `true`.
#[serde(default = "default_reconcile_on_startup")]
pub reconcile_on_startup: bool,
}
/// Configuration for the filesystem watcher's sweep behaviour.
@@ -108,9 +100,6 @@ fn default_rate_limit_notifications() -> bool {
true
}
fn default_reconcile_on_startup() -> bool {
true
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
@@ -209,8 +198,6 @@ struct LegacyProjectConfig {
rate_limit_notifications: bool,
#[serde(default)]
timezone: Option<String>,
#[serde(default = "default_reconcile_on_startup")]
reconcile_on_startup: bool,
}
impl Default for ProjectConfig {
@@ -241,7 +228,6 @@ impl Default for ProjectConfig {
rate_limit_notifications: default_rate_limit_notifications(),
timezone: None,
rendezvous: None,
reconcile_on_startup: default_reconcile_on_startup(),
}
}
}
@@ -318,7 +304,6 @@ impl ProjectConfig {
rate_limit_notifications: legacy.rate_limit_notifications,
timezone: legacy.timezone,
rendezvous: None,
reconcile_on_startup: legacy.reconcile_on_startup,
};
validate_agents(&config.agent)?;
return Ok(config);
@@ -347,7 +332,6 @@ impl ProjectConfig {
rate_limit_notifications: legacy.rate_limit_notifications,
timezone: legacy.timezone,
rendezvous: None,
reconcile_on_startup: legacy.reconcile_on_startup,
};
validate_agents(&config.agent)?;
Ok(config)
@@ -364,7 +348,6 @@ impl ProjectConfig {
rate_limit_notifications: legacy.rate_limit_notifications,
timezone: legacy.timezone,
rendezvous: None,
reconcile_on_startup: legacy.reconcile_on_startup,
})
}
}
-15
View File
@@ -16,7 +16,6 @@ mod llm;
pub mod log_buffer;
pub mod rebuild;
mod state;
mod startup_reconcile;
mod store;
mod workflow;
pub(crate) mod pipeline_state;
@@ -305,20 +304,6 @@ async fn main() -> Result<(), std::io::Error> {
if let Err(e) = crdt_state::init(db_path).await {
slog!("[crdt] Failed to initialise CRDT state layer: {e}");
}
// Run the startup drift-reconcile pass now that both the CRDT and DB are
// initialised. The pass is cheap (~100 stories < 1 s) and opt-out via
// `reconcile_on_startup = false` in project.toml.
let reconcile_enabled = db_path
.parent()
.and_then(|p| p.parent())
.and_then(|root| config::ProjectConfig::load(root).ok())
.map(|cfg| cfg.reconcile_on_startup)
.unwrap_or(true);
if reconcile_enabled
&& let Some(project_root) = db_path.parent().and_then(|p| p.parent())
{
startup_reconcile::reconcile_state(project_root, db_path).await;
}
}
// (CRDT state layer is initialised above alongside the legacy pipeline.db.)
-371
View File
@@ -1,371 +0,0 @@
//! Startup reconcile pass: detect drift between CRDT and `pipeline_items`.
//!
//! ## What is drift?
//!
//! Huskies maintains pipeline state in two places that must stay in sync:
//!
//! 1. **In-memory CRDT** (`crdt_state`) — reconstructed from `crdt_ops` on startup; the
//! primary source of truth for pipeline metadata.
//! 2. **`pipeline_items` table** — a shadow/materialised view written alongside CRDT updates;
//! used for fast DB queries without parsing CRDT ops.
//!
//! "Drift" means these sources disagree about a story's stage or existence:
//!
//! | Drift type | Meaning |
//! |-----------------|-----------------------------------------------------------------|
//! | `CRDT-only` | Story in CRDT but absent from `pipeline_items` |
//! | `DB-only` | Story in `pipeline_items` but absent from CRDT |
//! | `stage-mismatch`| Story present in CRDT and DB but with different stage values |
//!
//! ## Log format
//!
//! Each drift emits a structured log line:
//! ```text
//! [reconcile] DRIFT story=X crdt_stage=Y db_stage=Z
//! ```
//! (`MISSING` is used where a source has no record for that story.)
//!
//! ## Opt-out
//!
//! Set `reconcile_on_startup = false` in `.huskies/project.toml` to disable the
//! pass during the migration window if it produces too much noise.
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::OnceLock;
use crate::slog;
// ── Public API ──────────────────────────────────────────────────────────────
/// Drift count from the last reconcile pass (set once at startup).
///
/// Returns `None` if the pass has not run yet (e.g. disabled via config or
/// called before startup completes).
static DRIFT_COUNT: OnceLock<usize> = OnceLock::new();
/// Return the drift count detected during the startup reconcile pass.
pub fn drift_count() -> Option<usize> {
DRIFT_COUNT.get().copied()
}
/// A story whose state is inconsistent across at least two sources.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ItemDrift {
pub story_id: String,
/// Stage according to the in-memory CRDT, or `None` if absent.
pub crdt_stage: Option<String>,
/// Stage according to the `pipeline_items` shadow table, or `None` if absent.
pub db_stage: Option<String>,
}
/// Summary returned by [`reconcile_state`].
pub struct DriftReport {
/// Number of drifted items detected. Read by tests; in production the count
/// is accessed via the [`drift_count()`] free function (stored in `DRIFT_COUNT`).
#[allow(dead_code)]
pub drift_count: usize,
}
// ── Main entry point ────────────────────────────────────────────────────────
/// Run the startup state-reconcile pass.
///
/// Must be called **after** `crdt_state::init()` so the in-memory CRDT is
/// populated, and **after** `db::init()` so `pipeline_items` is accessible.
///
/// The function is fast: a full scan with ~100 stories completes in well under
/// 1 second because it reads cached in-memory state (CRDT) and performs a
/// single `SELECT id, stage FROM pipeline_items` query.
///
/// Returns the number of drifted items and stores the count in a process-wide
/// [`OnceLock`] so other subsystems (e.g. the Matrix bot startup announcement)
/// can read it without re-running the pass.
pub async fn reconcile_state(_project_root: &Path, db_path: &Path) -> DriftReport {
let crdt_stages: HashMap<String, String> = crate::crdt_state::read_all_items()
.unwrap_or_default()
.into_iter()
.map(|v| (v.story_id, v.stage))
.collect();
let db_stages = read_db_stages(db_path).await;
slog!(
"[reconcile] Scanning {} CRDT / {} DB items for drift",
crdt_stages.len(),
db_stages.len(),
);
let drifts = detect_drift(&crdt_stages, &db_stages);
for d in &drifts {
slog!(
"[reconcile] DRIFT story={} crdt_stage={} db_stage={}",
d.story_id,
d.crdt_stage.as_deref().unwrap_or("MISSING"),
d.db_stage.as_deref().unwrap_or("MISSING"),
);
}
let count = drifts.len();
if count == 0 {
let total = {
let mut all: HashSet<&String> = HashSet::new();
all.extend(crdt_stages.keys());
all.extend(db_stages.keys());
all.len()
};
slog!("[reconcile] No drift detected across {} total items.", total);
} else {
slog!(
"[reconcile] DRIFT SUMMARY: {} item(s) with inconsistent state — check server logs.",
count
);
}
let _ = DRIFT_COUNT.set(count);
DriftReport { drift_count: count }
}
// ── Core comparison logic (pure, fully testable) ────────────────────────────
/// Detect drift between CRDT and DB stage maps.
///
/// A story is drifted when any of:
/// - Present in CRDT but absent from DB (`CRDT-only`)
/// - Present in DB but absent from CRDT (`DB-only`)
/// - Present in both CRDT and DB with different stage values (`stage-mismatch`)
pub(crate) fn detect_drift(
crdt: &HashMap<String, String>,
db: &HashMap<String, String>,
) -> Vec<ItemDrift> {
let all_ids: HashSet<&String> = crdt.keys().chain(db.keys()).collect();
let mut drifts: Vec<ItemDrift> = all_ids
.into_iter()
.filter_map(|id| {
let c = crdt.get(id).cloned();
let d = db.get(id).cloned();
let is_drift = match (&c, &d) {
// Both present but stages differ → stage mismatch
(Some(cs), Some(ds)) if cs != ds => true,
// One present, other absent → single-source
(Some(_), None) | (None, Some(_)) => true,
// Both present, same stage → clean
_ => false,
};
if is_drift {
Some(ItemDrift {
story_id: id.clone(),
crdt_stage: c,
db_stage: d,
})
} else {
None
}
})
.collect();
// Sort for deterministic output (useful in tests and logs).
drifts.sort_by(|a, b| a.story_id.cmp(&b.story_id));
drifts
}
// ── Data loading helpers ─────────────────────────────────────────────────────
/// Query `pipeline_items` for all `(id, stage)` rows.
///
/// Opens a separate read-only connection so this function can run before or
/// after `db::init()` without conflicting with the write pool.
async fn read_db_stages(db_path: &Path) -> HashMap<String, String> {
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
let options = SqliteConnectOptions::new()
.filename(db_path)
.read_only(true);
let pool = match SqlitePool::connect_with(options).await {
Ok(p) => p,
Err(e) => {
slog!("[reconcile] Could not open pipeline.db for reconcile scan: {e}");
return HashMap::new();
}
};
let rows: Vec<(String, String)> = sqlx::query_as("SELECT id, stage FROM pipeline_items")
.fetch_all(&pool)
.await
.unwrap_or_default();
rows.into_iter().collect()
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn stages(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
// ── detect_drift unit tests (pure function) ─────────────────────────────
#[test]
fn detect_drift_clean_state_no_items() {
// Both sources empty → no drift.
let empty = HashMap::new();
let drifts = detect_drift(&empty, &empty);
assert!(
drifts.is_empty(),
"expected no drift for empty state, got: {drifts:?}"
);
}
#[test]
fn detect_drift_clean_state_matching_stages() {
// Same story, same stage in both CRDT and DB → no drift.
let crdt = stages(&[("42_story_foo", "2_current")]);
let db = stages(&[("42_story_foo", "2_current")]);
let drifts = detect_drift(&crdt, &db);
assert!(
drifts.is_empty(),
"expected no drift when all sources agree, got: {drifts:?}"
);
}
#[test]
fn detect_drift_crdt_only_story() {
// Story in CRDT but absent from DB → drift (CRDT-only).
let crdt = stages(&[("10_story_crdt_only", "2_current")]);
let db = HashMap::new();
let drifts = detect_drift(&crdt, &db);
assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}");
let d = &drifts[0];
assert_eq!(d.story_id, "10_story_crdt_only");
assert_eq!(d.crdt_stage.as_deref(), Some("2_current"));
assert!(d.db_stage.is_none(), "db_stage should be MISSING");
}
#[test]
fn detect_drift_db_only_story() {
// Story in DB but absent from CRDT → drift (DB-only).
let crdt = HashMap::new();
let db = stages(&[("20_story_db_only", "1_backlog")]);
let drifts = detect_drift(&crdt, &db);
assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}");
let d = &drifts[0];
assert_eq!(d.story_id, "20_story_db_only");
assert!(d.crdt_stage.is_none(), "crdt_stage should be MISSING");
assert_eq!(d.db_stage.as_deref(), Some("1_backlog"));
}
#[test]
fn detect_drift_stage_mismatch_crdt_vs_db() {
// Same story in CRDT and DB but at different stages → drift (stage mismatch).
let crdt = stages(&[("40_story_mismatch", "4_merge")]);
let db = stages(&[("40_story_mismatch", "5_done")]);
let drifts = detect_drift(&crdt, &db);
assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}");
let d = &drifts[0];
assert_eq!(d.crdt_stage.as_deref(), Some("4_merge"));
assert_eq!(d.db_stage.as_deref(), Some("5_done"));
}
#[test]
fn detect_drift_multiple_stories_mixed() {
// Two clean stories, two drifted → exactly 2 drifts returned.
let crdt = stages(&[
("1_story_clean", "2_current"),
("2_story_crdt_only", "2_current"),
("3_story_mismatch", "3_qa"),
("4_story_clean2", "1_backlog"),
]);
let db = stages(&[
("1_story_clean", "2_current"),
("3_story_mismatch", "4_merge"), // mismatch
("4_story_clean2", "1_backlog"),
]);
let drifts = detect_drift(&crdt, &db);
assert_eq!(drifts.len(), 2, "expected 2 drifts, got: {drifts:?}");
let ids: Vec<&str> = drifts.iter().map(|d| d.story_id.as_str()).collect();
assert!(ids.contains(&"2_story_crdt_only"), "missing CRDT-only drift");
assert!(ids.contains(&"3_story_mismatch"), "missing mismatch drift");
}
// ── reconcile_state integration tests (temp DB + FS) ────────────────────
async fn setup_test_db(path: &Path) -> sqlx::SqlitePool {
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
let opts = SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(opts).await.unwrap();
sqlx::query(
"CREATE TABLE IF NOT EXISTS pipeline_items (
id TEXT PRIMARY KEY,
name TEXT,
stage TEXT NOT NULL,
agent TEXT,
retry_count INTEGER,
blocked INTEGER,
depends_on TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)",
)
.execute(&pool)
.await
.unwrap();
pool
}
#[tokio::test]
async fn reconcile_empty_db_yields_no_drift() {
// Empty DB + empty CRDT map → 0 drift via detect_drift.
// We test detect_drift directly because the global CRDT OnceLock
// is shared across test threads and cannot be isolated.
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
setup_test_db(&db_path).await;
let db_stages = read_db_stages(&db_path).await;
let crdt_stages: HashMap<String, String> = HashMap::new();
let drifts = detect_drift(&crdt_stages, &db_stages);
assert_eq!(drifts.len(), 0, "expected no drift for empty state");
}
#[tokio::test]
async fn reconcile_db_only_story_is_drift() {
// Story in DB but absent from CRDT map → 1 drift.
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("pipeline.db");
let pool = setup_test_db(&db_path).await;
sqlx::query(
"INSERT INTO pipeline_items (id, name, stage, created_at, updated_at)
VALUES ('100_story_db_only', 'DB-only story', '2_current', '2026-01-01', '2026-01-01')",
)
.execute(&pool)
.await
.unwrap();
let db_stages = read_db_stages(&db_path).await;
let crdt_stages: HashMap<String, String> = HashMap::new();
let drifts = detect_drift(&crdt_stages, &db_stages);
assert_eq!(
drifts.len(), 1,
"story in DB but not in CRDT should be counted as drift"
);
}
}
-12
View File
@@ -529,7 +529,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
// Should complete without panic
run_setup_commands(tmp.path(), &config).await;
@@ -555,7 +554,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
// Should complete without panic
run_setup_commands(tmp.path(), &config).await;
@@ -581,7 +579,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
// Setup command failures are non-fatal — should not panic or propagate
run_setup_commands(tmp.path(), &config).await;
@@ -607,7 +604,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
// Teardown failures are best-effort — should not propagate
assert!(run_teardown_commands(tmp.path(), &config).await.is_ok());
@@ -632,7 +628,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
let info = create_worktree(&project_root, "42_fresh_test", &config, 3001)
.await
@@ -664,7 +659,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
// First creation
let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001)
@@ -737,7 +731,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await;
@@ -768,7 +761,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
create_worktree(&project_root, "88_remove_by_id", &config, 3001)
.await
@@ -846,7 +838,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
// Even though setup commands fail, create_worktree must succeed
// so the agent can start and fix the problem itself.
@@ -880,7 +871,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
// First creation — no setup commands, should succeed
create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001)
@@ -904,7 +894,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
// Second call — worktree exists, setup commands fail, must still succeed
let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await;
@@ -934,7 +923,6 @@ mod tests {
rate_limit_notifications: true,
timezone: None,
rendezvous: None,
reconcile_on_startup: true,
};
let info = create_worktree(&project_root, "77_remove_async", &config, 3001)
.await