huskies: merge 490_story_crdt_state_layer_backed_by_sqlite

CRDT state layer backed by SQLite for pipeline state. Integrates the
BFT JSON CRDT crate with SQLite persistence via sqlx. Ops are persisted
and replayed on startup. Node identity via Ed25519 keypair.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
dave
2026-04-07 16:12:19 +00:00
parent c621bca7b1
commit c73153dd4e
8 changed files with 1990 additions and 69 deletions
Generated
+1239 -46
View File
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -1,5 +1,5 @@
[workspace] [workspace]
members = ["server"] members = ["server", "crates/bft-json-crdt"]
resolver = "3" resolver = "3"
[workspace.dependencies] [workspace.dependencies]
+3
View File
@@ -40,6 +40,9 @@ tokio-tungstenite = { workspace = true }
libsqlite3-sys = { version = "0.35.0", features = ["bundled"] } libsqlite3-sys = { version = "0.35.0", features = ["bundled"] }
sqlx = { workspace = true } sqlx = { workspace = true }
wait-timeout = "0.2.1" wait-timeout = "0.2.1"
bft-json-crdt = { path = "../crates/bft-json-crdt" }
fastcrypto = "0.1.8"
indexmap = { version = "2.2.6", features = ["serde"] }
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
libc = { workspace = true } libc = { workspace = true }
@@ -0,0 +1,14 @@
-- Stores serialized CRDT SignedOps for pipeline state persistence.
-- On startup, all ops are replayed in sequence order to reconstruct the CRDT document.
CREATE TABLE IF NOT EXISTS crdt_ops (
op_id TEXT PRIMARY KEY,
seq INTEGER NOT NULL,
op_json TEXT NOT NULL,
created_at TEXT NOT NULL
);
-- Stores the node keypair seed so the same identity survives restarts.
CREATE TABLE IF NOT EXISTS crdt_node_identity (
id INTEGER PRIMARY KEY CHECK (id = 1),
seed BLOB NOT NULL
);
+613
View File
@@ -0,0 +1,613 @@
/// CRDT state layer for pipeline state, backed by SQLite.
///
/// Replaces the filesystem as the primary source of truth for pipeline item
/// metadata (stage, name, agent, etc.). CRDT ops are persisted to SQLite so
/// state survives restarts. The filesystem `.huskies/work/` directories are
/// still updated as a secondary output for backwards compatibility.
use std::collections::HashMap;
use std::sync::{Mutex, OnceLock};
use bft_json_crdt::json_crdt::*;
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::list_crdt::ListCrdt;
use bft_json_crdt::lww_crdt::LwwRegisterCrdt;
use bft_json_crdt::op::ROOT_ID;
use fastcrypto::ed25519::Ed25519KeyPair;
use fastcrypto::traits::ToFromBytes;
use serde_json::json;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::SqlitePool;
use std::path::Path;
use tokio::sync::mpsc;
use crate::slog;
// ── CRDT document types ──────────────────────────────────────────────
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct PipelineDoc {
pub items: ListCrdt<PipelineItemCrdt>,
}
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Debug)]
pub struct PipelineItemCrdt {
pub story_id: LwwRegisterCrdt<String>,
pub stage: LwwRegisterCrdt<String>,
pub name: LwwRegisterCrdt<String>,
pub agent: LwwRegisterCrdt<String>,
pub retry_count: LwwRegisterCrdt<f64>,
pub blocked: LwwRegisterCrdt<bool>,
pub depends_on: LwwRegisterCrdt<String>,
}
// ── Read-side view types ─────────────────────────────────────────────
/// A snapshot of a single pipeline item derived from the CRDT document.
#[derive(Clone, Debug)]
pub struct PipelineItemView {
pub story_id: String,
pub stage: String,
pub name: Option<String>,
pub agent: Option<String>,
pub retry_count: Option<i64>,
pub blocked: Option<bool>,
pub depends_on: Option<Vec<u32>>,
}
// ── Internal state ───────────────────────────────────────────────────
struct CrdtState {
crdt: BaseCrdt<PipelineDoc>,
keypair: Ed25519KeyPair,
/// Maps story_id → index in the ListCrdt for O(1) lookup.
index: HashMap<String, usize>,
/// Channel sender for fire-and-forget op persistence.
persist_tx: mpsc::UnboundedSender<SignedOp>,
}
static CRDT_STATE: OnceLock<Mutex<CrdtState>> = OnceLock::new();
// ── Initialisation ───────────────────────────────────────────────────
/// Initialise the CRDT state layer.
///
/// Opens the SQLite database, loads or creates a node keypair, replays any
/// persisted ops to reconstruct state, and spawns a background persistence
/// task. Safe to call only once; subsequent calls are no-ops.
pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
if CRDT_STATE.get().is_some() {
return Ok(());
}
let options = SqliteConnectOptions::new()
.filename(db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await?;
sqlx::migrate!("./migrations").run(&pool).await?;
// Load or create the node keypair.
let keypair = load_or_create_keypair(&pool).await?;
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
// Replay persisted ops to reconstruct state.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC")
.fetch_all(&pool)
.await?;
for (op_json,) in &rows {
if let Ok(signed_op) = serde_json::from_str::<SignedOp>(op_json) {
crdt.apply(signed_op);
} else {
slog!("[crdt] Warning: failed to deserialize stored op");
}
}
// Build the index from the reconstructed state.
let index = rebuild_index(&crdt);
slog!(
"[crdt] Initialised: {} ops replayed, {} items indexed",
rows.len(),
index.len()
);
// Spawn background persistence task.
let (persist_tx, mut persist_rx) = mpsc::unbounded_channel::<SignedOp>();
tokio::spawn(async move {
while let Some(op) = persist_rx.recv().await {
let op_json = match serde_json::to_string(&op) {
Ok(j) => j,
Err(e) => {
slog!("[crdt] Failed to serialize op: {e}");
continue;
}
};
let op_id = hex::encode(&op.id());
let seq = op.inner.seq as i64;
let now = chrono::Utc::now().to_rfc3339();
let result = sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) \
VALUES (?1, ?2, ?3, ?4) \
ON CONFLICT(op_id) DO NOTHING",
)
.bind(&op_id)
.bind(seq)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await;
if let Err(e) = result {
slog!("[crdt] Failed to persist op {}: {e}", &op_id[..12]);
}
}
});
let state = CrdtState {
crdt,
keypair,
index,
persist_tx,
};
let _ = CRDT_STATE.set(Mutex::new(state));
Ok(())
}
/// Load or create the Ed25519 keypair used by this node.
async fn load_or_create_keypair(pool: &SqlitePool) -> Result<Ed25519KeyPair, sqlx::Error> {
let row: Option<(Vec<u8>,)> =
sqlx::query_as("SELECT seed FROM crdt_node_identity WHERE id = 1")
.fetch_optional(pool)
.await?;
if let Some((seed,)) = row {
// Reconstruct from stored seed. The seed is the 32-byte private key.
if let Ok(kp) = Ed25519KeyPair::from_bytes(&seed) {
return Ok(kp);
}
slog!("[crdt] Stored keypair invalid, regenerating");
}
let kp = make_keypair();
let seed = kp.as_bytes().to_vec();
sqlx::query("INSERT INTO crdt_node_identity (id, seed) VALUES (1, ?1) ON CONFLICT(id) DO UPDATE SET seed = excluded.seed")
.bind(&seed)
.execute(pool)
.await?;
Ok(kp)
}
/// Rebuild the story_id → list index mapping from the current CRDT state.
fn rebuild_index(crdt: &BaseCrdt<PipelineDoc>) -> HashMap<String, usize> {
let mut map = HashMap::new();
for (i, item) in crdt.doc.items.iter().enumerate() {
if let JsonValue::String(ref sid) = item.story_id.view() {
map.insert(sid.clone(), i);
}
}
map
}
// ── Write path ───────────────────────────────────────────────────────
/// Create a CRDT op via `op_fn`, sign it, apply it, and send it to the
/// persistence channel. The closure receives `&mut CrdtState` so it can
/// mutably access the CRDT document, while `sign` only needs `&keypair`.
fn apply_and_persist<F>(state: &mut CrdtState, op_fn: F)
where
F: FnOnce(&mut CrdtState) -> bft_json_crdt::op::Op<JsonValue>,
{
let raw_op = op_fn(state);
let signed = raw_op.sign(&state.keypair);
state.crdt.apply(signed.clone());
let _ = state.persist_tx.send(signed);
}
/// Write a pipeline item state through CRDT operations.
///
/// If the item exists, updates its registers. If not, inserts a new item
/// into the list. All ops are signed and persisted to SQLite.
pub fn write_item(
story_id: &str,
stage: &str,
name: Option<&str>,
agent: Option<&str>,
retry_count: Option<i64>,
blocked: Option<bool>,
depends_on: Option<&str>,
) {
let Some(state_mutex) = CRDT_STATE.get() else {
return;
};
let Ok(mut state) = state_mutex.lock() else {
return;
};
if let Some(&idx) = state.index.get(story_id) {
// Update existing item registers.
// Each op is created, signed, applied, and persisted in a block so
// borrows do not overlap between &mut crdt (set) and &keypair (sign).
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].stage.set(stage.to_string())
});
if let Some(n) = name {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].name.set(n.to_string())
});
}
if let Some(a) = agent {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].agent.set(a.to_string())
});
}
if let Some(rc) = retry_count {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].retry_count.set(rc as f64)
});
}
if let Some(b) = blocked {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].blocked.set(b)
});
}
if let Some(d) = depends_on {
apply_and_persist(&mut state, |s| {
s.crdt.doc.items[idx].depends_on.set(d.to_string())
});
}
} else {
// Insert new item.
let item_json: JsonValue = json!({
"story_id": story_id,
"stage": stage,
"name": name.unwrap_or(""),
"agent": agent.unwrap_or(""),
"retry_count": retry_count.unwrap_or(0) as f64,
"blocked": blocked.unwrap_or(false),
"depends_on": depends_on.unwrap_or(""),
})
.into();
apply_and_persist(&mut state, |s| {
s.crdt.doc.items.insert(ROOT_ID, item_json)
});
// Rebuild index after insertion (indices may shift).
state.index = rebuild_index(&state.crdt);
}
}
// ── Read path ────────────────────────────────────────────────────────
/// Read the full pipeline state from the CRDT document.
///
/// Returns items grouped by stage, or `None` if the CRDT layer is not
/// initialised.
pub fn read_all_items() -> Option<Vec<PipelineItemView>> {
let state_mutex = CRDT_STATE.get()?;
let state = state_mutex.lock().ok()?;
let mut items = Vec::new();
for item_crdt in state.crdt.doc.items.iter() {
if let Some(view) = extract_item_view(item_crdt) {
items.push(view);
}
}
Some(items)
}
/// Read a single pipeline item by story_id.
pub fn read_item(story_id: &str) -> Option<PipelineItemView> {
let state_mutex = CRDT_STATE.get()?;
let state = state_mutex.lock().ok()?;
let &idx = state.index.get(story_id)?;
extract_item_view(&state.crdt.doc.items[idx])
}
/// Extract a `PipelineItemView` from a `PipelineItemCrdt`.
fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
let story_id = match item.story_id.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let stage = match item.stage.view() {
JsonValue::String(s) if !s.is_empty() => s,
_ => return None,
};
let name = match item.name.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let agent = match item.agent.view() {
JsonValue::String(s) if !s.is_empty() => Some(s),
_ => None,
};
let retry_count = match item.retry_count.view() {
JsonValue::Number(n) if n > 0.0 => Some(n as i64),
_ => None,
};
let blocked = match item.blocked.view() {
JsonValue::Bool(b) => Some(b),
_ => None,
};
let depends_on = match item.depends_on.view() {
JsonValue::String(s) if !s.is_empty() => {
serde_json::from_str::<Vec<u32>>(&s).ok()
}
_ => None,
};
Some(PipelineItemView {
story_id,
stage,
name,
agent,
retry_count,
blocked,
depends_on,
})
}
/// Hex-encode a byte slice (no external dep needed).
mod hex {
pub fn encode(bytes: &[u8]) -> String {
bytes.iter().map(|b| format!("{b:02x}")).collect()
}
}
// ── Tests ────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use bft_json_crdt::json_crdt::OpState;
#[test]
fn crdt_doc_insert_and_view() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "10_story_test",
"stage": "2_current",
"name": "Test Story",
"agent": "coder-opus",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
assert_eq!(crdt.apply(op), OpState::Ok);
let view = crdt.doc.items.view();
assert_eq!(view.len(), 1);
let item = &crdt.doc.items[0];
assert_eq!(item.story_id.view(), JsonValue::String("10_story_test".to_string()));
assert_eq!(item.stage.view(), JsonValue::String("2_current".to_string()));
}
#[test]
fn crdt_doc_update_stage() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "20_story_move",
"stage": "1_backlog",
"name": "Move Me",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt.apply(insert_op);
// Update stage
let stage_op = crdt.doc.items[0].stage.set("2_current".to_string()).sign(&kp);
crdt.apply(stage_op);
assert_eq!(
crdt.doc.items[0].stage.view(),
JsonValue::String("2_current".to_string())
);
}
#[test]
fn crdt_ops_replay_reconstructs_state() {
let kp = make_keypair();
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
// Build state with a series of ops.
let item_json: JsonValue = json!({
"story_id": "30_story_replay",
"stage": "1_backlog",
"name": "Replay Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op1 = crdt1.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt1.apply(op1.clone());
let op2 = crdt1.doc.items[0].stage.set("2_current".to_string()).sign(&kp);
crdt1.apply(op2.clone());
let op3 = crdt1.doc.items[0].name.set("Updated Name".to_string()).sign(&kp);
crdt1.apply(op3.clone());
// Replay ops on a fresh CRDT.
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
crdt2.apply(op1);
crdt2.apply(op2);
crdt2.apply(op3);
assert_eq!(
crdt1.doc.items[0].stage.view(),
crdt2.doc.items[0].stage.view()
);
assert_eq!(
crdt1.doc.items[0].name.view(),
crdt2.doc.items[0].name.view()
);
}
#[test]
fn extract_item_view_parses_crdt_item() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item_json: JsonValue = json!({
"story_id": "40_story_view",
"stage": "3_qa",
"name": "View Test",
"agent": "coder-1",
"retry_count": 2.0,
"blocked": true,
"depends_on": "[10,20]",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&kp);
crdt.apply(op);
let view = extract_item_view(&crdt.doc.items[0]).unwrap();
assert_eq!(view.story_id, "40_story_view");
assert_eq!(view.stage, "3_qa");
assert_eq!(view.name.as_deref(), Some("View Test"));
assert_eq!(view.agent.as_deref(), Some("coder-1"));
assert_eq!(view.retry_count, Some(2));
assert_eq!(view.blocked, Some(true));
assert_eq!(view.depends_on, Some(vec![10, 20]));
}
#[test]
fn rebuild_index_maps_story_ids() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
for (sid, stage) in &[("10_story_a", "1_backlog"), ("20_story_b", "2_current")] {
let item: JsonValue = json!({
"story_id": sid,
"stage": stage,
"name": "",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
crdt.apply(op);
}
let index = rebuild_index(&crdt);
assert_eq!(index.len(), 2);
assert!(index.contains_key("10_story_a"));
assert!(index.contains_key("20_story_b"));
}
#[tokio::test]
async fn init_and_write_read_roundtrip() {
let tmp = tempfile::tempdir().unwrap();
let db_path = tmp.path().join("crdt_test.db");
// Init directly (not via the global singleton, for test isolation).
let options = SqliteConnectOptions::new()
.filename(&db_path)
.create_if_missing(true);
let pool = SqlitePool::connect_with(options).await.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
let keypair = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&keypair);
// Insert and update like write_item does.
let item_json: JsonValue = json!({
"story_id": "50_story_roundtrip",
"stage": "1_backlog",
"name": "Roundtrip",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let insert_op = crdt.doc.items.insert(ROOT_ID, item_json).sign(&keypair);
crdt.apply(insert_op.clone());
// Persist the op.
let op_json = serde_json::to_string(&insert_op).unwrap();
let op_id = hex::encode(&insert_op.id());
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO crdt_ops (op_id, seq, op_json, created_at) VALUES (?1, ?2, ?3, ?4)",
)
.bind(&op_id)
.bind(insert_op.inner.seq as i64)
.bind(&op_json)
.bind(&now)
.execute(&pool)
.await
.unwrap();
// Reconstruct from DB.
let rows: Vec<(String,)> =
sqlx::query_as("SELECT op_json FROM crdt_ops ORDER BY seq ASC")
.fetch_all(&pool)
.await
.unwrap();
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&keypair);
for (json_str,) in &rows {
let op: SignedOp = serde_json::from_str(json_str).unwrap();
crdt2.apply(op);
}
let view = extract_item_view(&crdt2.doc.items[0]).unwrap();
assert_eq!(view.story_id, "50_story_roundtrip");
assert_eq!(view.stage, "1_backlog");
assert_eq!(view.name.as_deref(), Some("Roundtrip"));
}
#[test]
fn signed_op_serialization_roundtrip() {
let kp = make_keypair();
let mut crdt = BaseCrdt::<PipelineDoc>::new(&kp);
let item: JsonValue = json!({
"story_id": "60_story_serde",
"stage": "1_backlog",
"name": "Serde Test",
"agent": "",
"retry_count": 0.0,
"blocked": false,
"depends_on": "",
})
.into();
let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp);
let json_str = serde_json::to_string(&op).unwrap();
let deserialized: SignedOp = serde_json::from_str(&json_str).unwrap();
assert_eq!(op.id(), deserialized.id());
assert_eq!(op.inner.seq, deserialized.inner.seq);
}
}
+18 -10
View File
@@ -87,17 +87,13 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
Ok(()) Ok(())
} }
/// Shadow-write a pipeline item move to SQLite. /// Write a pipeline item state to both the CRDT layer and the legacy SQLite
/// shadow table.
/// ///
/// Reads front matter from `file_path` (the post-move location) to extract /// Reads front matter from `file_path` (the post-move location) to extract
/// metadata. The write is fire-and-forget — errors are logged but never /// metadata. The CRDT layer is the primary write path; the legacy shadow
/// propagate to the caller. If the database has not been initialised this is a /// table is kept for backwards compatibility. Both writes are fire-and-forget.
/// complete no-op.
pub fn shadow_write(story_id: &str, stage: &str, file_path: &Path) { pub fn shadow_write(story_id: &str, stage: &str, file_path: &Path) {
let Some(db) = PIPELINE_DB.get() else {
return;
};
let (name, agent, retry_count, blocked, depends_on) = let (name, agent, retry_count, blocked, depends_on) =
match std::fs::read_to_string(file_path) { match std::fs::read_to_string(file_path) {
Ok(contents) => match parse_front_matter(&contents) { Ok(contents) => match parse_front_matter(&contents) {
@@ -113,6 +109,19 @@ pub fn shadow_write(story_id: &str, stage: &str, file_path: &Path) {
Err(_) => (None, None, None, None, None), Err(_) => (None, None, None, None, None),
}; };
// Primary: write through CRDT ops (persisted to SQLite crdt_ops table).
crate::crdt_state::write_item(
story_id,
stage,
name.as_deref(),
agent.as_deref(),
retry_count,
blocked,
depends_on.as_deref(),
);
// Legacy: fire-and-forget to the pipeline_items shadow table.
if let Some(db) = PIPELINE_DB.get() {
let msg = PipelineWriteMsg { let msg = PipelineWriteMsg {
story_id: story_id.to_string(), story_id: story_id.to_string(),
stage: stage.to_string(), stage: stage.to_string(),
@@ -122,9 +131,8 @@ pub fn shadow_write(story_id: &str, stage: &str, file_path: &Path) {
blocked, blocked,
depends_on, depends_on,
}; };
// Ignore send errors: the background task may have exited (e.g. in tests).
let _ = db.tx.send(msg); let _ = db.tx.send(msg);
}
} }
#[cfg(test)] #[cfg(test)]
+86
View File
@@ -72,8 +72,62 @@ pub struct PipelineState {
} }
/// Load the full pipeline state (all 5 active stages). /// Load the full pipeline state (all 5 active stages).
///
/// Reads from the CRDT document when available, falling back to the
/// filesystem for any items not yet in the CRDT (e.g. first run before
/// migration). Agent assignments are always overlaid from the in-memory
/// agent pool.
pub fn load_pipeline_state(ctx: &AppContext) -> Result<PipelineState, String> { pub fn load_pipeline_state(ctx: &AppContext) -> Result<PipelineState, String> {
let agent_map = build_active_agent_map(ctx); let agent_map = build_active_agent_map(ctx);
// Try CRDT-first read.
if let Some(crdt_items) = crate::crdt_state::read_all_items() {
let mut state = PipelineState {
backlog: Vec::new(),
current: Vec::new(),
qa: Vec::new(),
merge: Vec::new(),
done: Vec::new(),
};
for item in crdt_items {
let agent = agent_map.get(&item.story_id).cloned();
let story = UpcomingStory {
story_id: item.story_id,
name: item.name,
error: None,
merge_failure: None,
agent,
review_hold: None,
qa: None,
retry_count: item.retry_count.map(|r| r as u32),
blocked: item.blocked,
depends_on: item.depends_on,
};
match item.stage.as_str() {
"1_backlog" => state.backlog.push(story),
"2_current" => state.current.push(story),
"3_qa" => state.qa.push(story),
"4_merge" => state.merge.push(story),
"5_done" => state.done.push(story),
_ => {} // ignore archived or unknown stages
}
}
// Sort each stage for deterministic output.
state.backlog.sort_by(|a, b| a.story_id.cmp(&b.story_id));
state.current.sort_by(|a, b| a.story_id.cmp(&b.story_id));
state.qa.sort_by(|a, b| a.story_id.cmp(&b.story_id));
state.merge.sort_by(|a, b| a.story_id.cmp(&b.story_id));
state.done.sort_by(|a, b| a.story_id.cmp(&b.story_id));
// Merge in any filesystem-only items not yet in the CRDT.
merge_filesystem_items(ctx, &mut state, &agent_map)?;
return Ok(state);
}
// Fallback: filesystem-only read (CRDT not initialised).
Ok(PipelineState { Ok(PipelineState {
backlog: load_stage_items(ctx, "1_backlog", &HashMap::new())?, backlog: load_stage_items(ctx, "1_backlog", &HashMap::new())?,
current: load_stage_items(ctx, "2_current", &agent_map)?, current: load_stage_items(ctx, "2_current", &agent_map)?,
@@ -83,6 +137,38 @@ pub fn load_pipeline_state(ctx: &AppContext) -> Result<PipelineState, String> {
}) })
} }
/// Merge filesystem items that are not already present in the CRDT state.
fn merge_filesystem_items(
ctx: &AppContext,
state: &mut PipelineState,
agent_map: &HashMap<String, AgentAssignment>,
) -> Result<(), String> {
let stages = [
("1_backlog", &mut state.backlog),
("2_current", &mut state.current),
("3_qa", &mut state.qa),
("4_merge", &mut state.merge),
("5_done", &mut state.done),
];
for (stage_dir, stage_vec) in stages {
let empty_map = HashMap::new();
let map = if stage_dir == "2_current" || stage_dir == "3_qa" || stage_dir == "4_merge" {
agent_map
} else {
&empty_map
};
let fs_items = load_stage_items(ctx, stage_dir, map)?;
for fs_item in fs_items {
if !stage_vec.iter().any(|s| s.story_id == fs_item.story_id) {
stage_vec.push(fs_item);
}
}
stage_vec.sort_by(|a, b| a.story_id.cmp(&b.story_id));
}
Ok(())
}
/// Build a map from story_id → AgentAssignment for all pending/running agents. /// Build a map from story_id → AgentAssignment for all pending/running agents.
fn build_active_agent_map(ctx: &AppContext) -> HashMap<String, AgentAssignment> { fn build_active_agent_map(ctx: &AppContext) -> HashMap<String, AgentAssignment> {
let agents = match ctx.agents.list_agents() { let agents = match ctx.agents.list_agents() {
+8 -4
View File
@@ -6,6 +6,7 @@ mod agent_log;
mod agents; mod agents;
mod chat; mod chat;
mod config; mod config;
pub mod crdt_state;
mod db; mod db;
mod http; mod http;
mod io; mod io;
@@ -283,7 +284,7 @@ async fn main() -> Result<(), std::io::Error> {
log_buffer::global().set_log_file(log_dir.join("server.log")); log_buffer::global().set_log_file(log_dir.join("server.log"));
} }
// Initialise the SQLite pipeline shadow-write database. // Initialise the SQLite pipeline shadow-write database and CRDT state layer.
// Clone the path out before the await so we don't hold the MutexGuard across // Clone the path out before the await so we don't hold the MutexGuard across
// an await point. // an await point.
let pipeline_db_path = app_state let pipeline_db_path = app_state
@@ -292,11 +293,14 @@ async fn main() -> Result<(), std::io::Error> {
.unwrap() .unwrap()
.as_ref() .as_ref()
.map(|root| root.join(".huskies").join("pipeline.db")); .map(|root| root.join(".huskies").join("pipeline.db"));
if let Some(db_path) = pipeline_db_path if let Some(ref db_path) = pipeline_db_path {
&& let Err(e) = db::init(&db_path).await if let Err(e) = db::init(db_path).await {
{
slog!("[db] Failed to initialise pipeline.db: {e}"); slog!("[db] Failed to initialise pipeline.db: {e}");
} }
if let Err(e) = crdt_state::init(db_path).await {
slog!("[crdt] Failed to initialise CRDT state layer: {e}");
}
}
let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default())); let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default()));