huskies: merge 513_story_startup_reconcile_pass_that_detects_drift_between_crdt_pipeline_items_and_filesystem_shadows
This commit is contained in:
@@ -89,6 +89,48 @@ script/release 0.7.1
|
|||||||
|
|
||||||
This bumps version in `Cargo.toml` and `package.json`, builds macOS arm64 and Linux amd64 binaries, tags the repo, and publishes a Gitea release with changelog and binaries attached.
|
This bumps version in `Cargo.toml` and `package.json`, builds macOS arm64 and Linux amd64 binaries, tags the repo, and publishes a Gitea release with changelog and binaries attached.
|
||||||
|
|
||||||
|
## Startup reconcile pass
|
||||||
|
|
||||||
|
On startup, after CRDT replay and database initialisation, huskies runs a
|
||||||
|
**reconcile pass** that compares pipeline state across three sources:
|
||||||
|
|
||||||
|
1. **In-memory CRDT** — the primary source of truth, reconstructed from
|
||||||
|
`crdt_ops` on startup.
|
||||||
|
2. **`pipeline_items` table** — a shadow/materialised view written alongside
|
||||||
|
CRDT updates, used for fast DB queries.
|
||||||
|
3. **Filesystem shadows** (`.huskies/work/N_stage/*.md`) — legacy rendering
|
||||||
|
still written by some paths and read by agent worktrees.
|
||||||
|
|
||||||
|
Any disagreement between these sources is **drift**. The reconcile pass logs a
|
||||||
|
structured line for each drifted item:
|
||||||
|
|
||||||
|
```
|
||||||
|
[reconcile] DRIFT story=X crdt_stage=Y db_stage=Z fs_stage=W
|
||||||
|
```
|
||||||
|
|
||||||
|
(`MISSING` is used where a source has no record for that story.)
|
||||||
|
|
||||||
|
### Drift types
|
||||||
|
|
||||||
|
| Type | Meaning |
|
||||||
|
|------|---------|
|
||||||
|
| `CRDT-only` | Story present in CRDT but absent from `pipeline_items` |
|
||||||
|
| `DB-only` | Story present in `pipeline_items` but absent from CRDT |
|
||||||
|
| `FS-only` | Story on the filesystem but absent from both CRDT and DB |
|
||||||
|
| `stage-mismatch` | Story present in both CRDT and DB but with different stage values |
|
||||||
|
|
||||||
|
Note: a filesystem shadow that lags behind the CRDT/DB stage (both of which
|
||||||
|
agree) is **not** treated as drift — the FS is a best-effort rendering and is
|
||||||
|
allowed to lag.
|
||||||
|
|
||||||
|
If any drift is detected, the Matrix/Slack/WhatsApp bot startup announcement
|
||||||
|
includes a count and a suggestion to check the server logs.
|
||||||
|
|
||||||
|
### Opt-out
|
||||||
|
|
||||||
|
Set `reconcile_on_startup = false` in `.huskies/project.toml` to disable the
|
||||||
|
pass during the migration window if it produces noise.
|
||||||
|
|
||||||
## License
|
## License
|
||||||
|
|
||||||
GPL-3.0. See [LICENSE](LICENSE).
|
GPL-3.0. See [LICENSE](LICENSE).
|
||||||
|
|||||||
@@ -3,9 +3,15 @@ use pulldown_cmark::{Options, Parser, html};
|
|||||||
/// Format the startup greeting the bot sends to each room when it comes online.
|
/// Format the startup greeting the bot sends to each room when it comes online.
|
||||||
///
|
///
|
||||||
/// Uses the bot's configured display name so the message reads naturally
|
/// Uses the bot's configured display name so the message reads naturally
|
||||||
/// (e.g. "Timmy is online.").
|
/// (e.g. "Timmy is online."). When `drift_count` is `Some(n)` and `n > 0`,
|
||||||
pub fn format_startup_announcement(bot_name: &str) -> String {
|
/// appends a drift warning so operators know to check the server logs.
|
||||||
format!("{bot_name} is online.")
|
pub fn format_startup_announcement(bot_name: &str, drift_count: Option<usize>) -> String {
|
||||||
|
match drift_count {
|
||||||
|
Some(n) if n > 0 => format!(
|
||||||
|
"{bot_name} is online. \u{26a0}\u{fe0f} {n} item(s) have CRDT/DB drift \u{2014} check server logs."
|
||||||
|
),
|
||||||
|
_ => format!("{bot_name} is online."),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Convert a Markdown string to an HTML string using pulldown-cmark.
|
/// Convert a Markdown string to an HTML string using pulldown-cmark.
|
||||||
@@ -97,13 +103,27 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn startup_announcement_uses_bot_name() {
|
fn startup_announcement_uses_bot_name() {
|
||||||
assert_eq!(format_startup_announcement("Timmy"), "Timmy is online.");
|
assert_eq!(format_startup_announcement("Timmy", None), "Timmy is online.");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn startup_announcement_uses_configured_display_name_not_hardcoded() {
|
fn startup_announcement_uses_configured_display_name_not_hardcoded() {
|
||||||
assert_eq!(format_startup_announcement("HAL"), "HAL is online.");
|
assert_eq!(format_startup_announcement("HAL", None), "HAL is online.");
|
||||||
assert_eq!(format_startup_announcement("Assistant"), "Assistant is online.");
|
assert_eq!(format_startup_announcement("Assistant", None), "Assistant is online.");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn startup_announcement_with_zero_drift_omits_warning() {
|
||||||
|
assert_eq!(format_startup_announcement("Timmy", Some(0)), "Timmy is online.");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn startup_announcement_with_drift_includes_count_and_warning() {
|
||||||
|
let msg = format_startup_announcement("Timmy", Some(3));
|
||||||
|
assert!(msg.starts_with("Timmy is online."), "should start with online msg: {msg}");
|
||||||
|
assert!(msg.contains('3'), "should include drift count: {msg}");
|
||||||
|
assert!(msg.contains("drift"), "should mention drift: {msg}");
|
||||||
|
assert!(msg.contains("server logs"), "should mention server logs: {msg}");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@@ -297,7 +297,7 @@ pub async fn run_bot(
|
|||||||
// bot is online. This runs once per process start — the sync loop handles
|
// bot is online. This runs once per process start — the sync loop handles
|
||||||
// reconnects internally so this code is never reached again on a network
|
// reconnects internally so this code is never reached again on a network
|
||||||
// blip or sync resumption.
|
// blip or sync resumption.
|
||||||
let announce_msg = format_startup_announcement(&announce_bot_name);
|
let announce_msg = format_startup_announcement(&announce_bot_name, crate::startup_reconcile::drift_count());
|
||||||
let announce_html = markdown_to_html(&announce_msg);
|
let announce_html = markdown_to_html(&announce_msg);
|
||||||
slog!("[matrix-bot] Sending startup announcement: {announce_msg}");
|
slog!("[matrix-bot] Sending startup announcement: {announce_msg}");
|
||||||
for room_id in &announce_room_ids {
|
for room_id in &announce_room_ids {
|
||||||
|
|||||||
@@ -53,6 +53,14 @@ pub struct ProjectConfig {
|
|||||||
/// so both machines see the same pipeline state in real-time.
|
/// so both machines see the same pipeline state in real-time.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub rendezvous: Option<String>,
|
pub rendezvous: Option<String>,
|
||||||
|
/// Whether to run the startup state-reconcile pass.
|
||||||
|
///
|
||||||
|
/// The pass compares pipeline state across CRDT, `pipeline_items`, and
|
||||||
|
/// filesystem shadows after CRDT replay, logging any drift it finds.
|
||||||
|
/// Set to `false` to disable during the migration window if the pass
|
||||||
|
/// produces too much noise. Default: `true`.
|
||||||
|
#[serde(default = "default_reconcile_on_startup")]
|
||||||
|
pub reconcile_on_startup: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Configuration for the filesystem watcher's sweep behaviour.
|
/// Configuration for the filesystem watcher's sweep behaviour.
|
||||||
@@ -100,6 +108,10 @@ fn default_rate_limit_notifications() -> bool {
|
|||||||
true
|
true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_reconcile_on_startup() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub struct ComponentConfig {
|
pub struct ComponentConfig {
|
||||||
@@ -197,6 +209,8 @@ struct LegacyProjectConfig {
|
|||||||
rate_limit_notifications: bool,
|
rate_limit_notifications: bool,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
timezone: Option<String>,
|
timezone: Option<String>,
|
||||||
|
#[serde(default = "default_reconcile_on_startup")]
|
||||||
|
reconcile_on_startup: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for ProjectConfig {
|
impl Default for ProjectConfig {
|
||||||
@@ -227,6 +241,7 @@ impl Default for ProjectConfig {
|
|||||||
rate_limit_notifications: default_rate_limit_notifications(),
|
rate_limit_notifications: default_rate_limit_notifications(),
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: default_reconcile_on_startup(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -303,6 +318,7 @@ impl ProjectConfig {
|
|||||||
rate_limit_notifications: legacy.rate_limit_notifications,
|
rate_limit_notifications: legacy.rate_limit_notifications,
|
||||||
timezone: legacy.timezone,
|
timezone: legacy.timezone,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: legacy.reconcile_on_startup,
|
||||||
};
|
};
|
||||||
validate_agents(&config.agent)?;
|
validate_agents(&config.agent)?;
|
||||||
return Ok(config);
|
return Ok(config);
|
||||||
@@ -331,6 +347,7 @@ impl ProjectConfig {
|
|||||||
rate_limit_notifications: legacy.rate_limit_notifications,
|
rate_limit_notifications: legacy.rate_limit_notifications,
|
||||||
timezone: legacy.timezone,
|
timezone: legacy.timezone,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: legacy.reconcile_on_startup,
|
||||||
};
|
};
|
||||||
validate_agents(&config.agent)?;
|
validate_agents(&config.agent)?;
|
||||||
Ok(config)
|
Ok(config)
|
||||||
@@ -347,6 +364,7 @@ impl ProjectConfig {
|
|||||||
rate_limit_notifications: legacy.rate_limit_notifications,
|
rate_limit_notifications: legacy.rate_limit_notifications,
|
||||||
timezone: legacy.timezone,
|
timezone: legacy.timezone,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: legacy.reconcile_on_startup,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ mod llm;
|
|||||||
pub mod log_buffer;
|
pub mod log_buffer;
|
||||||
pub mod rebuild;
|
pub mod rebuild;
|
||||||
mod state;
|
mod state;
|
||||||
|
mod startup_reconcile;
|
||||||
mod store;
|
mod store;
|
||||||
mod workflow;
|
mod workflow;
|
||||||
pub(crate) mod pipeline_state;
|
pub(crate) mod pipeline_state;
|
||||||
@@ -303,6 +304,20 @@ async fn main() -> Result<(), std::io::Error> {
|
|||||||
if let Err(e) = crdt_state::init(db_path).await {
|
if let Err(e) = crdt_state::init(db_path).await {
|
||||||
slog!("[crdt] Failed to initialise CRDT state layer: {e}");
|
slog!("[crdt] Failed to initialise CRDT state layer: {e}");
|
||||||
}
|
}
|
||||||
|
// Run the startup drift-reconcile pass now that both the CRDT and DB are
|
||||||
|
// initialised. The pass is cheap (~100 stories < 1 s) and opt-out via
|
||||||
|
// `reconcile_on_startup = false` in project.toml.
|
||||||
|
let reconcile_enabled = db_path
|
||||||
|
.parent()
|
||||||
|
.and_then(|p| p.parent())
|
||||||
|
.and_then(|root| config::ProjectConfig::load(root).ok())
|
||||||
|
.map(|cfg| cfg.reconcile_on_startup)
|
||||||
|
.unwrap_or(true);
|
||||||
|
if reconcile_enabled
|
||||||
|
&& let Some(project_root) = db_path.parent().and_then(|p| p.parent())
|
||||||
|
{
|
||||||
|
startup_reconcile::reconcile_state(project_root, db_path).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Import any existing .huskies/work/ stories into the DB content store.
|
// Import any existing .huskies/work/ stories into the DB content store.
|
||||||
|
|||||||
@@ -0,0 +1,512 @@
|
|||||||
|
//! Startup reconcile pass: detect drift between CRDT, `pipeline_items`, and filesystem shadows.
|
||||||
|
//!
|
||||||
|
//! ## What is drift?
|
||||||
|
//!
|
||||||
|
//! Huskies maintains pipeline state in three places that must stay in sync:
|
||||||
|
//!
|
||||||
|
//! 1. **In-memory CRDT** (`crdt_state`) — reconstructed from `crdt_ops` on startup; the
|
||||||
|
//! primary source of truth for pipeline metadata.
|
||||||
|
//! 2. **`pipeline_items` table** — a shadow/materialised view written alongside CRDT updates;
|
||||||
|
//! used for fast DB queries without parsing CRDT ops.
|
||||||
|
//! 3. **Filesystem shadows** (`.huskies/work/N_stage/*.md`) — legacy rendering still written
|
||||||
|
//! by some paths and read by agent worktrees.
|
||||||
|
//!
|
||||||
|
//! "Drift" means these sources disagree about a story's stage or existence:
|
||||||
|
//!
|
||||||
|
//! | Drift type | Meaning |
|
||||||
|
//! |-----------------|-----------------------------------------------------------------|
|
||||||
|
//! | `CRDT-only` | Story in CRDT but absent from `pipeline_items` |
|
||||||
|
//! | `DB-only` | Story in `pipeline_items` but absent from CRDT |
|
||||||
|
//! | `FS-only` | Story on filesystem but absent from both CRDT and `pipeline_items` |
|
||||||
|
//! | `stage-mismatch`| Story present in CRDT and DB but with different stage values |
|
||||||
|
//!
|
||||||
|
//! ## Log format
|
||||||
|
//!
|
||||||
|
//! Each drift emits a structured log line:
|
||||||
|
//! ```text
|
||||||
|
//! [reconcile] DRIFT story=X crdt_stage=Y db_stage=Z fs_stage=W
|
||||||
|
//! ```
|
||||||
|
//! (`MISSING` is used where a source has no record for that story.)
|
||||||
|
//!
|
||||||
|
//! ## Opt-out
|
||||||
|
//!
|
||||||
|
//! Set `reconcile_on_startup = false` in `.huskies/project.toml` to disable the
|
||||||
|
//! pass during the migration window if it produces too much noise.
|
||||||
|
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::OnceLock;
|
||||||
|
|
||||||
|
use crate::slog;
|
||||||
|
|
||||||
|
// ── Public API ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Drift count from the last reconcile pass (set once at startup).
|
||||||
|
///
|
||||||
|
/// Returns `None` if the pass has not run yet (e.g. disabled via config or
|
||||||
|
/// called before startup completes).
|
||||||
|
static DRIFT_COUNT: OnceLock<usize> = OnceLock::new();
|
||||||
|
|
||||||
|
/// Return the drift count detected during the startup reconcile pass.
|
||||||
|
pub fn drift_count() -> Option<usize> {
|
||||||
|
DRIFT_COUNT.get().copied()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A story whose state is inconsistent across at least two sources.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct ItemDrift {
|
||||||
|
pub story_id: String,
|
||||||
|
/// Stage according to the in-memory CRDT, or `None` if absent.
|
||||||
|
pub crdt_stage: Option<String>,
|
||||||
|
/// Stage according to the `pipeline_items` shadow table, or `None` if absent.
|
||||||
|
pub db_stage: Option<String>,
|
||||||
|
/// Stage according to the filesystem shadow, or `None` if absent.
|
||||||
|
pub fs_stage: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Summary returned by [`reconcile_state`].
|
||||||
|
pub struct DriftReport {
|
||||||
|
/// Number of drifted items detected. Read by tests; in production the count
|
||||||
|
/// is accessed via the [`drift_count()`] free function (stored in `DRIFT_COUNT`).
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub drift_count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Main entry point ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Run the startup state-reconcile pass.
|
||||||
|
///
|
||||||
|
/// Must be called **after** `crdt_state::init()` so the in-memory CRDT is
|
||||||
|
/// populated, and **after** `db::init()` so `pipeline_items` is accessible.
|
||||||
|
///
|
||||||
|
/// The function is fast: a full scan with ~100 stories completes in well under
|
||||||
|
/// 1 second because it reads cached in-memory state (CRDT) and performs a
|
||||||
|
/// single `SELECT id, stage FROM pipeline_items` query.
|
||||||
|
///
|
||||||
|
/// Returns the number of drifted items and stores the count in a process-wide
|
||||||
|
/// [`OnceLock`] so other subsystems (e.g. the Matrix bot startup announcement)
|
||||||
|
/// can read it without re-running the pass.
|
||||||
|
pub async fn reconcile_state(project_root: &Path, db_path: &Path) -> DriftReport {
|
||||||
|
let crdt_stages: HashMap<String, String> = crate::crdt_state::read_all_items()
|
||||||
|
.unwrap_or_default()
|
||||||
|
.into_iter()
|
||||||
|
.map(|v| (v.story_id, v.stage))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let db_stages = read_db_stages(db_path).await;
|
||||||
|
let fs_stages = scan_fs_stages(project_root);
|
||||||
|
|
||||||
|
slog!(
|
||||||
|
"[reconcile] Scanning {} CRDT / {} DB / {} FS items for drift",
|
||||||
|
crdt_stages.len(),
|
||||||
|
db_stages.len(),
|
||||||
|
fs_stages.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
let drifts = detect_drift(&crdt_stages, &db_stages, &fs_stages);
|
||||||
|
|
||||||
|
for d in &drifts {
|
||||||
|
slog!(
|
||||||
|
"[reconcile] DRIFT story={} crdt_stage={} db_stage={} fs_stage={}",
|
||||||
|
d.story_id,
|
||||||
|
d.crdt_stage.as_deref().unwrap_or("MISSING"),
|
||||||
|
d.db_stage.as_deref().unwrap_or("MISSING"),
|
||||||
|
d.fs_stage.as_deref().unwrap_or("MISSING"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let count = drifts.len();
|
||||||
|
if count == 0 {
|
||||||
|
let total = {
|
||||||
|
let mut all: HashSet<&String> = HashSet::new();
|
||||||
|
all.extend(crdt_stages.keys());
|
||||||
|
all.extend(db_stages.keys());
|
||||||
|
all.extend(fs_stages.keys());
|
||||||
|
all.len()
|
||||||
|
};
|
||||||
|
slog!("[reconcile] No drift detected across {} total items.", total);
|
||||||
|
} else {
|
||||||
|
slog!(
|
||||||
|
"[reconcile] DRIFT SUMMARY: {} item(s) with inconsistent state — check server logs.",
|
||||||
|
count
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = DRIFT_COUNT.set(count);
|
||||||
|
DriftReport { drift_count: count }
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Core comparison logic (pure, fully testable) ────────────────────────────
|
||||||
|
|
||||||
|
/// Detect drift between three stage maps.
|
||||||
|
///
|
||||||
|
/// A story is drifted when any of:
|
||||||
|
/// - Present in CRDT but absent from DB (`CRDT-only`)
|
||||||
|
/// - Present in DB but absent from CRDT (`DB-only`)
|
||||||
|
/// - Present in both CRDT and DB with different stage values (`stage-mismatch`)
|
||||||
|
/// - Present in filesystem but absent from both CRDT and DB (`FS-only`)
|
||||||
|
///
|
||||||
|
/// FS stage vs CRDT/DB stage disagreement is noted in the drift record's
|
||||||
|
/// `fs_stage` field but does **not** independently trigger a drift entry —
|
||||||
|
/// the filesystem shadow is allowed to lag.
|
||||||
|
pub(crate) fn detect_drift(
|
||||||
|
crdt: &HashMap<String, String>,
|
||||||
|
db: &HashMap<String, String>,
|
||||||
|
fs: &HashMap<String, String>,
|
||||||
|
) -> Vec<ItemDrift> {
|
||||||
|
let all_ids: HashSet<&String> = crdt.keys().chain(db.keys()).chain(fs.keys()).collect();
|
||||||
|
|
||||||
|
let mut drifts: Vec<ItemDrift> = all_ids
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|id| {
|
||||||
|
let c = crdt.get(id).cloned();
|
||||||
|
let d = db.get(id).cloned();
|
||||||
|
let f = fs.get(id).cloned();
|
||||||
|
|
||||||
|
let is_drift = match (&c, &d) {
|
||||||
|
// Both present but stages differ → stage mismatch
|
||||||
|
(Some(cs), Some(ds)) if cs != ds => true,
|
||||||
|
// One present, other absent → single-source
|
||||||
|
(Some(_), None) | (None, Some(_)) => true,
|
||||||
|
// Both absent but FS present → FS-only
|
||||||
|
(None, None) => f.is_some(),
|
||||||
|
// Both present, same stage → clean
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
|
||||||
|
if is_drift {
|
||||||
|
Some(ItemDrift {
|
||||||
|
story_id: id.clone(),
|
||||||
|
crdt_stage: c,
|
||||||
|
db_stage: d,
|
||||||
|
fs_stage: f,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Sort for deterministic output (useful in tests and logs).
|
||||||
|
drifts.sort_by(|a, b| a.story_id.cmp(&b.story_id));
|
||||||
|
drifts
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Data loading helpers ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// Query `pipeline_items` for all `(id, stage)` rows.
|
||||||
|
///
|
||||||
|
/// Opens a separate read-only connection so this function can run before or
|
||||||
|
/// after `db::init()` without conflicting with the write pool.
|
||||||
|
async fn read_db_stages(db_path: &Path) -> HashMap<String, String> {
|
||||||
|
use sqlx::sqlite::SqliteConnectOptions;
|
||||||
|
use sqlx::SqlitePool;
|
||||||
|
|
||||||
|
let options = SqliteConnectOptions::new()
|
||||||
|
.filename(db_path)
|
||||||
|
.read_only(true);
|
||||||
|
|
||||||
|
let pool = match SqlitePool::connect_with(options).await {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
slog!("[reconcile] Could not open pipeline.db for reconcile scan: {e}");
|
||||||
|
return HashMap::new();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let rows: Vec<(String, String)> = sqlx::query_as("SELECT id, stage FROM pipeline_items")
|
||||||
|
.fetch_all(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
rows.into_iter().collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Walk `.huskies/work/` directories to build a stage map from filesystem shadows.
|
||||||
|
fn scan_fs_stages(project_root: &Path) -> HashMap<String, String> {
|
||||||
|
const STAGE_DIRS: &[&str] = &[
|
||||||
|
"1_backlog",
|
||||||
|
"2_current",
|
||||||
|
"3_qa",
|
||||||
|
"4_merge",
|
||||||
|
"5_done",
|
||||||
|
"6_archived",
|
||||||
|
];
|
||||||
|
|
||||||
|
let mut map = HashMap::new();
|
||||||
|
let work_root = project_root.join(".huskies").join("work");
|
||||||
|
|
||||||
|
for stage in STAGE_DIRS {
|
||||||
|
let dir = work_root.join(stage);
|
||||||
|
if !dir.is_dir() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Ok(entries) = std::fs::read_dir(&dir) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
for entry in entries.flatten() {
|
||||||
|
let path = entry.path();
|
||||||
|
if path.extension().and_then(|e| e.to_str()) != Some("md") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
|
||||||
|
map.insert(stem.to_string(), stage.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
map
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn stages(pairs: &[(&str, &str)]) -> HashMap<String, String> {
|
||||||
|
pairs
|
||||||
|
.iter()
|
||||||
|
.map(|(k, v)| (k.to_string(), v.to_string()))
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── detect_drift unit tests (pure function) ─────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_drift_clean_state_no_items() {
|
||||||
|
// All three sources empty → no drift.
|
||||||
|
let empty = HashMap::new();
|
||||||
|
let drifts = detect_drift(&empty, &empty, &empty);
|
||||||
|
assert!(
|
||||||
|
drifts.is_empty(),
|
||||||
|
"expected no drift for empty state, got: {drifts:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_drift_clean_state_matching_stages() {
|
||||||
|
// Same story, same stage in both CRDT and DB, also present on FS → no drift.
|
||||||
|
let crdt = stages(&[("42_story_foo", "2_current")]);
|
||||||
|
let db = stages(&[("42_story_foo", "2_current")]);
|
||||||
|
let fs = stages(&[("42_story_foo", "2_current")]);
|
||||||
|
let drifts = detect_drift(&crdt, &db, &fs);
|
||||||
|
assert!(
|
||||||
|
drifts.is_empty(),
|
||||||
|
"expected no drift when all sources agree, got: {drifts:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_drift_crdt_only_story() {
|
||||||
|
// Story in CRDT but absent from DB and FS → drift (CRDT-only).
|
||||||
|
let crdt = stages(&[("10_story_crdt_only", "2_current")]);
|
||||||
|
let db = HashMap::new();
|
||||||
|
let fs = HashMap::new();
|
||||||
|
let drifts = detect_drift(&crdt, &db, &fs);
|
||||||
|
assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}");
|
||||||
|
let d = &drifts[0];
|
||||||
|
assert_eq!(d.story_id, "10_story_crdt_only");
|
||||||
|
assert_eq!(d.crdt_stage.as_deref(), Some("2_current"));
|
||||||
|
assert!(d.db_stage.is_none(), "db_stage should be MISSING");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_drift_db_only_story() {
|
||||||
|
// Story in DB but absent from CRDT → drift (DB-only).
|
||||||
|
let crdt = HashMap::new();
|
||||||
|
let db = stages(&[("20_story_db_only", "1_backlog")]);
|
||||||
|
let fs = HashMap::new();
|
||||||
|
let drifts = detect_drift(&crdt, &db, &fs);
|
||||||
|
assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}");
|
||||||
|
let d = &drifts[0];
|
||||||
|
assert_eq!(d.story_id, "20_story_db_only");
|
||||||
|
assert!(d.crdt_stage.is_none(), "crdt_stage should be MISSING");
|
||||||
|
assert_eq!(d.db_stage.as_deref(), Some("1_backlog"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_drift_fs_only_story() {
|
||||||
|
// Story on filesystem but absent from both CRDT and DB → drift (FS-only).
|
||||||
|
let crdt = HashMap::new();
|
||||||
|
let db = HashMap::new();
|
||||||
|
let fs = stages(&[("30_story_fs_only", "3_qa")]);
|
||||||
|
let drifts = detect_drift(&crdt, &db, &fs);
|
||||||
|
assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}");
|
||||||
|
let d = &drifts[0];
|
||||||
|
assert_eq!(d.story_id, "30_story_fs_only");
|
||||||
|
assert!(d.crdt_stage.is_none(), "crdt_stage should be MISSING");
|
||||||
|
assert!(d.db_stage.is_none(), "db_stage should be MISSING");
|
||||||
|
assert_eq!(d.fs_stage.as_deref(), Some("3_qa"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_drift_stage_mismatch_crdt_vs_db() {
|
||||||
|
// Same story in CRDT and DB but at different stages → drift (stage mismatch).
|
||||||
|
let crdt = stages(&[("40_story_mismatch", "4_merge")]);
|
||||||
|
let db = stages(&[("40_story_mismatch", "5_done")]);
|
||||||
|
let fs = stages(&[("40_story_mismatch", "4_merge")]);
|
||||||
|
let drifts = detect_drift(&crdt, &db, &fs);
|
||||||
|
assert_eq!(drifts.len(), 1, "expected 1 drift, got: {drifts:?}");
|
||||||
|
let d = &drifts[0];
|
||||||
|
assert_eq!(d.crdt_stage.as_deref(), Some("4_merge"));
|
||||||
|
assert_eq!(d.db_stage.as_deref(), Some("5_done"));
|
||||||
|
// FS stage is recorded even though it's not what triggered the drift.
|
||||||
|
assert_eq!(d.fs_stage.as_deref(), Some("4_merge"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_drift_fs_lag_does_not_cause_drift() {
|
||||||
|
// FS is behind CRDT/DB but CRDT == DB → not a drift (FS is a lagging shadow).
|
||||||
|
let crdt = stages(&[("50_story_fs_lag", "5_done")]);
|
||||||
|
let db = stages(&[("50_story_fs_lag", "5_done")]);
|
||||||
|
let fs = stages(&[("50_story_fs_lag", "4_merge")]); // FS is behind
|
||||||
|
let drifts = detect_drift(&crdt, &db, &fs);
|
||||||
|
assert!(
|
||||||
|
drifts.is_empty(),
|
||||||
|
"FS lag alone should not trigger drift when CRDT == DB, got: {drifts:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn detect_drift_multiple_stories_mixed() {
|
||||||
|
// Two clean stories, two drifted → exactly 2 drifts returned.
|
||||||
|
let crdt = stages(&[
|
||||||
|
("1_story_clean", "2_current"),
|
||||||
|
("2_story_crdt_only", "2_current"),
|
||||||
|
("3_story_mismatch", "3_qa"),
|
||||||
|
("4_story_clean2", "1_backlog"),
|
||||||
|
]);
|
||||||
|
let db = stages(&[
|
||||||
|
("1_story_clean", "2_current"),
|
||||||
|
("3_story_mismatch", "4_merge"), // mismatch
|
||||||
|
("4_story_clean2", "1_backlog"),
|
||||||
|
]);
|
||||||
|
let fs: HashMap<String, String> = HashMap::new();
|
||||||
|
let drifts = detect_drift(&crdt, &db, &fs);
|
||||||
|
assert_eq!(drifts.len(), 2, "expected 2 drifts, got: {drifts:?}");
|
||||||
|
let ids: Vec<&str> = drifts.iter().map(|d| d.story_id.as_str()).collect();
|
||||||
|
assert!(ids.contains(&"2_story_crdt_only"), "missing CRDT-only drift");
|
||||||
|
assert!(ids.contains(&"3_story_mismatch"), "missing mismatch drift");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── scan_fs_stages unit test ─────────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn scan_fs_stages_finds_stories_in_each_stage() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let root = tmp.path();
|
||||||
|
|
||||||
|
let stages_to_create = [
|
||||||
|
("1_backlog", "10_story_a"),
|
||||||
|
("2_current", "20_story_b"),
|
||||||
|
("5_done", "50_story_c"),
|
||||||
|
];
|
||||||
|
for (stage, name) in &stages_to_create {
|
||||||
|
let dir = root.join(".huskies").join("work").join(stage);
|
||||||
|
std::fs::create_dir_all(&dir).unwrap();
|
||||||
|
std::fs::write(dir.join(format!("{name}.md")), "# test\n").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let map = scan_fs_stages(root);
|
||||||
|
assert_eq!(map.get("10_story_a").map(String::as_str), Some("1_backlog"));
|
||||||
|
assert_eq!(map.get("20_story_b").map(String::as_str), Some("2_current"));
|
||||||
|
assert_eq!(map.get("50_story_c").map(String::as_str), Some("5_done"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn scan_fs_stages_ignores_non_md_files() {
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let root = tmp.path();
|
||||||
|
let dir = root.join(".huskies").join("work").join("1_backlog");
|
||||||
|
std::fs::create_dir_all(&dir).unwrap();
|
||||||
|
std::fs::write(dir.join("not_a_story.txt"), "ignored").unwrap();
|
||||||
|
std::fs::write(dir.join("99_story_real.md"), "# real").unwrap();
|
||||||
|
|
||||||
|
let map = scan_fs_stages(root);
|
||||||
|
assert!(!map.contains_key("not_a_story"), "txt file should be ignored");
|
||||||
|
assert!(map.contains_key("99_story_real"), "md file should be found");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── reconcile_state integration tests (temp DB + FS) ────────────────────
|
||||||
|
|
||||||
|
async fn setup_test_db(path: &Path) -> sqlx::SqlitePool {
|
||||||
|
use sqlx::sqlite::SqliteConnectOptions;
|
||||||
|
use sqlx::SqlitePool;
|
||||||
|
let opts = SqliteConnectOptions::new()
|
||||||
|
.filename(path)
|
||||||
|
.create_if_missing(true);
|
||||||
|
let pool = SqlitePool::connect_with(opts).await.unwrap();
|
||||||
|
sqlx::query(
|
||||||
|
"CREATE TABLE IF NOT EXISTS pipeline_items (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
name TEXT,
|
||||||
|
stage TEXT NOT NULL,
|
||||||
|
agent TEXT,
|
||||||
|
retry_count INTEGER,
|
||||||
|
blocked INTEGER,
|
||||||
|
depends_on TEXT,
|
||||||
|
created_at TEXT NOT NULL,
|
||||||
|
updated_at TEXT NOT NULL
|
||||||
|
)",
|
||||||
|
)
|
||||||
|
.execute(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
pool
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn reconcile_state_no_drift_empty_sources() {
|
||||||
|
// Empty DB + empty FS + uninitialised CRDT → 0 drift.
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let db_path = tmp.path().join("pipeline.db");
|
||||||
|
setup_test_db(&db_path).await;
|
||||||
|
|
||||||
|
let report = reconcile_state(tmp.path(), &db_path).await;
|
||||||
|
assert_eq!(
|
||||||
|
report.drift_count, 0,
|
||||||
|
"expected no drift for empty state"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn reconcile_state_db_only_story_is_drift() {
|
||||||
|
// Story in DB but CRDT not initialised (treated as empty) → 1 drift.
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let db_path = tmp.path().join("pipeline.db");
|
||||||
|
let pool = setup_test_db(&db_path).await;
|
||||||
|
|
||||||
|
sqlx::query(
|
||||||
|
"INSERT INTO pipeline_items (id, name, stage, created_at, updated_at)
|
||||||
|
VALUES ('100_story_db_only', 'DB-only story', '2_current', '2026-01-01', '2026-01-01')",
|
||||||
|
)
|
||||||
|
.execute(&pool)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let report = reconcile_state(tmp.path(), &db_path).await;
|
||||||
|
assert_eq!(
|
||||||
|
report.drift_count, 1,
|
||||||
|
"story in DB but not in CRDT should be counted as drift"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn reconcile_state_fs_only_story_is_drift() {
|
||||||
|
// Story on filesystem, nothing in DB or CRDT → 1 drift (FS-only).
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let db_path = tmp.path().join("pipeline.db");
|
||||||
|
setup_test_db(&db_path).await;
|
||||||
|
|
||||||
|
let backlog = tmp.path().join(".huskies").join("work").join("1_backlog");
|
||||||
|
std::fs::create_dir_all(&backlog).unwrap();
|
||||||
|
std::fs::write(backlog.join("200_story_fs_only.md"), "---\nname: FS-only\n---\n").unwrap();
|
||||||
|
|
||||||
|
let report = reconcile_state(tmp.path(), &db_path).await;
|
||||||
|
assert_eq!(
|
||||||
|
report.drift_count, 1,
|
||||||
|
"story on FS but absent from CRDT and DB should be counted as drift"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -529,6 +529,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
// Should complete without panic
|
// Should complete without panic
|
||||||
run_setup_commands(tmp.path(), &config).await;
|
run_setup_commands(tmp.path(), &config).await;
|
||||||
@@ -554,6 +555,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
// Should complete without panic
|
// Should complete without panic
|
||||||
run_setup_commands(tmp.path(), &config).await;
|
run_setup_commands(tmp.path(), &config).await;
|
||||||
@@ -579,6 +581,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
// Setup command failures are non-fatal — should not panic or propagate
|
// Setup command failures are non-fatal — should not panic or propagate
|
||||||
run_setup_commands(tmp.path(), &config).await;
|
run_setup_commands(tmp.path(), &config).await;
|
||||||
@@ -604,6 +607,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
// Teardown failures are best-effort — should not propagate
|
// Teardown failures are best-effort — should not propagate
|
||||||
assert!(run_teardown_commands(tmp.path(), &config).await.is_ok());
|
assert!(run_teardown_commands(tmp.path(), &config).await.is_ok());
|
||||||
@@ -628,6 +632,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
let info = create_worktree(&project_root, "42_fresh_test", &config, 3001)
|
let info = create_worktree(&project_root, "42_fresh_test", &config, 3001)
|
||||||
.await
|
.await
|
||||||
@@ -659,6 +664,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
// First creation
|
// First creation
|
||||||
let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001)
|
let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001)
|
||||||
@@ -731,6 +737,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await;
|
let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await;
|
||||||
@@ -761,6 +768,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
create_worktree(&project_root, "88_remove_by_id", &config, 3001)
|
create_worktree(&project_root, "88_remove_by_id", &config, 3001)
|
||||||
.await
|
.await
|
||||||
@@ -838,6 +846,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
// Even though setup commands fail, create_worktree must succeed
|
// Even though setup commands fail, create_worktree must succeed
|
||||||
// so the agent can start and fix the problem itself.
|
// so the agent can start and fix the problem itself.
|
||||||
@@ -871,6 +880,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
// First creation — no setup commands, should succeed
|
// First creation — no setup commands, should succeed
|
||||||
create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001)
|
create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001)
|
||||||
@@ -894,6 +904,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
// Second call — worktree exists, setup commands fail, must still succeed
|
// Second call — worktree exists, setup commands fail, must still succeed
|
||||||
let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await;
|
let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await;
|
||||||
@@ -923,6 +934,7 @@ mod tests {
|
|||||||
rate_limit_notifications: true,
|
rate_limit_notifications: true,
|
||||||
timezone: None,
|
timezone: None,
|
||||||
rendezvous: None,
|
rendezvous: None,
|
||||||
|
reconcile_on_startup: true,
|
||||||
};
|
};
|
||||||
let info = create_worktree(&project_root, "77_remove_async", &config, 3001)
|
let info = create_worktree(&project_root, "77_remove_async", &config, 3001)
|
||||||
.await
|
.await
|
||||||
|
|||||||
Reference in New Issue
Block a user