huskies: merge 491_story_watcher_fires_on_crdt_state_transitions_instead_of_filesystem_events
This commit is contained in:
+187
-4
@@ -1,9 +1,13 @@
|
||||
/// CRDT state layer for pipeline state, backed by SQLite.
|
||||
///
|
||||
/// Replaces the filesystem as the primary source of truth for pipeline item
|
||||
/// The CRDT document is the primary source of truth for pipeline item
|
||||
/// metadata (stage, name, agent, etc.). CRDT ops are persisted to SQLite so
|
||||
/// state survives restarts. The filesystem `.huskies/work/` directories are
|
||||
/// still updated as a secondary output for backwards compatibility.
|
||||
///
|
||||
/// Stage transitions detected by `write_item()` are broadcast as [`CrdtEvent`]s
|
||||
/// so subscribers (auto-assign, WebSocket, notifications) can react without
|
||||
/// polling the filesystem.
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Mutex, OnceLock};
|
||||
|
||||
@@ -18,10 +22,34 @@ use serde_json::json;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use sqlx::SqlitePool;
|
||||
use std::path::Path;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
|
||||
use crate::slog;
|
||||
|
||||
// ── CRDT events ─────────────────────────────────────────────────────
|
||||
|
||||
/// An event emitted when a pipeline item's stage changes in the CRDT document.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CrdtEvent {
|
||||
/// Work item ID (e.g. `"42_story_my_feature"`).
|
||||
pub story_id: String,
|
||||
/// The stage the item was in before this transition, or `None` for new items.
|
||||
pub from_stage: Option<String>,
|
||||
/// The stage the item is now in.
|
||||
pub to_stage: String,
|
||||
/// Human-readable story name from the CRDT document.
|
||||
pub name: Option<String>,
|
||||
}
|
||||
|
||||
/// Subscribe to CRDT state transition events.
|
||||
///
|
||||
/// Returns `None` if the CRDT layer has not been initialised yet.
|
||||
pub fn subscribe() -> Option<broadcast::Receiver<CrdtEvent>> {
|
||||
CRDT_EVENT_TX.get().map(|tx| tx.subscribe())
|
||||
}
|
||||
|
||||
static CRDT_EVENT_TX: OnceLock<broadcast::Sender<CrdtEvent>> = OnceLock::new();
|
||||
|
||||
// ── CRDT document types ──────────────────────────────────────────────
|
||||
|
||||
#[add_crdt_fields]
|
||||
@@ -156,6 +184,11 @@ pub async fn init(db_path: &Path) -> Result<(), sqlx::Error> {
|
||||
};
|
||||
|
||||
let _ = CRDT_STATE.set(Mutex::new(state));
|
||||
|
||||
// Initialise the CRDT event broadcast channel.
|
||||
let (event_tx, _) = broadcast::channel::<CrdtEvent>(256);
|
||||
let _ = CRDT_EVENT_TX.set(event_tx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -214,6 +247,9 @@ where
|
||||
///
|
||||
/// If the item exists, updates its registers. If not, inserts a new item
|
||||
/// into the list. All ops are signed and persisted to SQLite.
|
||||
///
|
||||
/// When the stage changes (or a new item is created), a [`CrdtEvent`] is
|
||||
/// broadcast so subscribers can react to the transition.
|
||||
pub fn write_item(
|
||||
story_id: &str,
|
||||
stage: &str,
|
||||
@@ -231,9 +267,13 @@ pub fn write_item(
|
||||
};
|
||||
|
||||
if let Some(&idx) = state.index.get(story_id) {
|
||||
// Capture the old stage before updating so we can detect transitions.
|
||||
let old_stage = match state.crdt.doc.items[idx].stage.view() {
|
||||
JsonValue::String(s) => Some(s),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// Update existing item registers.
|
||||
// Each op is created, signed, applied, and persisted in a block so
|
||||
// borrows do not overlap between &mut crdt (set) and &keypair (sign).
|
||||
apply_and_persist(&mut state, |s| {
|
||||
s.crdt.doc.items[idx].stage.set(stage.to_string())
|
||||
});
|
||||
@@ -263,6 +303,22 @@ pub fn write_item(
|
||||
s.crdt.doc.items[idx].depends_on.set(d.to_string())
|
||||
});
|
||||
}
|
||||
|
||||
// Broadcast a CrdtEvent if the stage actually changed.
|
||||
let stage_changed = old_stage.as_deref() != Some(stage);
|
||||
if stage_changed {
|
||||
// Read the current name from the CRDT document for the event.
|
||||
let current_name = match state.crdt.doc.items[idx].name.view() {
|
||||
JsonValue::String(s) if !s.is_empty() => Some(s),
|
||||
_ => None,
|
||||
};
|
||||
emit_event(CrdtEvent {
|
||||
story_id: story_id.to_string(),
|
||||
from_stage: old_stage,
|
||||
to_stage: stage.to_string(),
|
||||
name: current_name,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// Insert new item.
|
||||
let item_json: JsonValue = json!({
|
||||
@@ -282,6 +338,21 @@ pub fn write_item(
|
||||
|
||||
// Rebuild index after insertion (indices may shift).
|
||||
state.index = rebuild_index(&state.crdt);
|
||||
|
||||
// Broadcast a CrdtEvent for the new item.
|
||||
emit_event(CrdtEvent {
|
||||
story_id: story_id.to_string(),
|
||||
from_stage: None,
|
||||
to_stage: stage.to_string(),
|
||||
name: name.map(String::from),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast a CRDT event to all subscribers.
|
||||
fn emit_event(event: CrdtEvent) {
|
||||
if let Some(tx) = CRDT_EVENT_TX.get() {
|
||||
let _ = tx.send(event);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -356,6 +427,40 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option<PipelineItemView> {
|
||||
})
|
||||
}
|
||||
|
||||
/// Check whether a dependency (by numeric ID prefix) is in `5_done` or `6_archived`
|
||||
/// according to CRDT state.
|
||||
///
|
||||
/// Returns `true` if the dependency is satisfied (item found in a done stage).
|
||||
pub fn dep_is_done_crdt(dep_number: u32) -> bool {
|
||||
let prefix = format!("{dep_number}_");
|
||||
if let Some(items) = read_all_items() {
|
||||
items.iter().any(|item| {
|
||||
item.story_id.starts_with(&prefix)
|
||||
&& matches!(item.stage.as_str(), "5_done" | "6_archived")
|
||||
})
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Check unmet dependencies for a story by reading its `depends_on` from the
|
||||
/// CRDT document and checking each dependency against CRDT state.
|
||||
///
|
||||
/// Returns the list of dependency numbers that are NOT in `5_done` or `6_archived`.
|
||||
pub fn check_unmet_deps_crdt(story_id: &str) -> Vec<u32> {
|
||||
let item = match read_item(story_id) {
|
||||
Some(i) => i,
|
||||
None => return Vec::new(),
|
||||
};
|
||||
let deps = match item.depends_on {
|
||||
Some(d) => d,
|
||||
None => return Vec::new(),
|
||||
};
|
||||
deps.into_iter()
|
||||
.filter(|&dep| !dep_is_done_crdt(dep))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Hex-encode a byte slice (no external dep needed).
|
||||
mod hex {
|
||||
pub fn encode(bytes: &[u8]) -> String {
|
||||
@@ -610,4 +715,82 @@ mod tests {
|
||||
assert_eq!(op.id(), deserialized.id());
|
||||
assert_eq!(op.inner.seq, deserialized.inner.seq);
|
||||
}
|
||||
|
||||
// ── CrdtEvent tests ─────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn crdt_event_has_expected_fields() {
|
||||
let evt = CrdtEvent {
|
||||
story_id: "42_story_foo".to_string(),
|
||||
from_stage: Some("1_backlog".to_string()),
|
||||
to_stage: "2_current".to_string(),
|
||||
name: Some("Foo Feature".to_string()),
|
||||
};
|
||||
assert_eq!(evt.story_id, "42_story_foo");
|
||||
assert_eq!(evt.from_stage.as_deref(), Some("1_backlog"));
|
||||
assert_eq!(evt.to_stage, "2_current");
|
||||
assert_eq!(evt.name.as_deref(), Some("Foo Feature"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn crdt_event_clone_preserves_data() {
|
||||
let evt = CrdtEvent {
|
||||
story_id: "10_story_bar".to_string(),
|
||||
from_stage: None,
|
||||
to_stage: "1_backlog".to_string(),
|
||||
name: None,
|
||||
};
|
||||
let cloned = evt.clone();
|
||||
assert_eq!(cloned.story_id, "10_story_bar");
|
||||
assert!(cloned.from_stage.is_none());
|
||||
assert!(cloned.name.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emit_event_is_noop_when_channel_not_initialised() {
|
||||
// Before CRDT_EVENT_TX is set, emit_event should not panic.
|
||||
// This test verifies the guard clause works. In test binaries the
|
||||
// OnceLock may already be set by another test, so we just verify
|
||||
// the function doesn't panic regardless.
|
||||
emit_event(CrdtEvent {
|
||||
story_id: "99_story_noop".to_string(),
|
||||
from_stage: None,
|
||||
to_stage: "1_backlog".to_string(),
|
||||
name: None,
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn crdt_event_broadcast_channel_round_trip() {
|
||||
let (tx, mut rx) = broadcast::channel::<CrdtEvent>(16);
|
||||
let evt = CrdtEvent {
|
||||
story_id: "70_story_broadcast".to_string(),
|
||||
from_stage: Some("1_backlog".to_string()),
|
||||
to_stage: "2_current".to_string(),
|
||||
name: Some("Broadcast Test".to_string()),
|
||||
};
|
||||
tx.send(evt).unwrap();
|
||||
|
||||
let received = rx.try_recv().unwrap();
|
||||
assert_eq!(received.story_id, "70_story_broadcast");
|
||||
assert_eq!(received.from_stage.as_deref(), Some("1_backlog"));
|
||||
assert_eq!(received.to_stage, "2_current");
|
||||
assert_eq!(received.name.as_deref(), Some("Broadcast Test"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dep_is_done_crdt_returns_false_when_no_crdt_state() {
|
||||
// When the global CRDT state is not initialised (or in a test environment),
|
||||
// dep_is_done_crdt should return false rather than panicking.
|
||||
// Note: in the test binary the global may or may not be initialised,
|
||||
// but the function should never panic either way.
|
||||
let _ = dep_is_done_crdt(9999);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn check_unmet_deps_crdt_returns_empty_when_item_not_found() {
|
||||
// Non-existent story should return empty deps.
|
||||
let result = check_unmet_deps_crdt("nonexistent_story");
|
||||
assert!(result.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user