huskies: merge 1124 story Persist TransitionFired into a per-sled CRDT event log
This commit is contained in:
@@ -0,0 +1,148 @@
|
||||
//! Read/write helpers for the `event_log` append-only list in the CRDT document.
|
||||
//!
|
||||
//! Every pipeline stage transition is appended as an [`EventLogEntryCrdt`][super::super::types::EventLogEntryCrdt]
|
||||
//! entry. Entries are never updated or tombstoned — the list is strictly grow-only.
|
||||
//! Monotonic sequencing is computed at write time while holding the CRDT lock,
|
||||
//! so `event_seq` values for a given sled are always contiguous and gap-free.
|
||||
|
||||
use bft_json_crdt::json_crdt::{JsonValue, *};
|
||||
use bft_json_crdt::op::ROOT_ID;
|
||||
use serde_json::json;
|
||||
|
||||
use super::super::state::{apply_and_persist, get_crdt};
|
||||
use super::super::types::EventLogEntryCrdt;
|
||||
|
||||
/// Raw event log entry extracted from the CRDT document.
|
||||
///
|
||||
/// All fields are decoded to Rust primitives; entries with a missing or
|
||||
/// malformed `sled_id` are silently dropped by [`read_all_event_log_entries`].
|
||||
pub struct EventLogEntryRaw {
|
||||
/// Monotonic sequence number for the recording sled (0-based).
|
||||
pub event_seq: u64,
|
||||
/// Hex-encoded Ed25519 public key of the sled that wrote this entry.
|
||||
pub sled_id: String,
|
||||
/// Unix timestamp (seconds) when the transition fired.
|
||||
pub timestamp: f64,
|
||||
/// Story ID of the work item that transitioned.
|
||||
pub story_id: String,
|
||||
/// Human-readable label of the stage before the transition.
|
||||
pub from_stage: String,
|
||||
/// Human-readable label of the stage after the transition.
|
||||
pub to_stage: String,
|
||||
/// String label of the `PipelineEvent` variant.
|
||||
pub pipeline_event: String,
|
||||
}
|
||||
|
||||
/// Append a new event log entry to the CRDT, computing the monotonic `event_seq`
|
||||
/// atomically while the CRDT lock is held.
|
||||
///
|
||||
/// No-ops silently when the CRDT is not yet initialised.
|
||||
pub fn append_event_log_entry(
|
||||
sled_id: &str,
|
||||
timestamp: f64,
|
||||
story_id: &str,
|
||||
from_stage: &str,
|
||||
to_stage: &str,
|
||||
pipeline_event: &str,
|
||||
) {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return;
|
||||
};
|
||||
let Ok(mut state) = state_mutex.lock() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Count existing entries for this sled while holding the lock so the seq
|
||||
// is computed and used in the same critical section — no TOCTOU gap.
|
||||
let event_seq = state
|
||||
.crdt
|
||||
.doc
|
||||
.event_log
|
||||
.iter()
|
||||
.filter(|e| matches!(e.sled_id.view(), JsonValue::String(s) if s == sled_id))
|
||||
.count() as f64;
|
||||
|
||||
// Append after the last existing entry so the list stays in insertion order.
|
||||
// Inserting after ROOT_ID would place each entry at the front (RGA semantics),
|
||||
// reversing the sequence; inserting after the current tail preserves order.
|
||||
let total_len = state.crdt.doc.event_log.view().len();
|
||||
let after = if total_len > 0 {
|
||||
super::list_id_at(&state.crdt.doc.event_log, total_len - 1).unwrap_or(ROOT_ID)
|
||||
} else {
|
||||
ROOT_ID
|
||||
};
|
||||
|
||||
let entry: JsonValue = json!({
|
||||
"event_seq": event_seq,
|
||||
"sled_id": sled_id,
|
||||
"timestamp": timestamp,
|
||||
"story_id": story_id,
|
||||
"from_stage": from_stage,
|
||||
"to_stage": to_stage,
|
||||
"pipeline_event": pipeline_event,
|
||||
})
|
||||
.into();
|
||||
|
||||
apply_and_persist(&mut state, |s| s.crdt.doc.event_log.insert(after, entry));
|
||||
}
|
||||
|
||||
/// Read all event log entries from the CRDT document.
|
||||
///
|
||||
/// Entries with a missing or empty `sled_id` are silently skipped.
|
||||
/// Order reflects CRDT insertion order (RGA list semantics).
|
||||
pub fn read_all_event_log_entries() -> Vec<EventLogEntryRaw> {
|
||||
let Some(state_mutex) = get_crdt() else {
|
||||
return Vec::new();
|
||||
};
|
||||
let Ok(state) = state_mutex.lock() else {
|
||||
return Vec::new();
|
||||
};
|
||||
state
|
||||
.crdt
|
||||
.doc
|
||||
.event_log
|
||||
.iter()
|
||||
.filter_map(extract_entry)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Convert a CRDT event log entry to its read-side representation.
|
||||
fn extract_entry(e: &EventLogEntryCrdt) -> Option<EventLogEntryRaw> {
|
||||
let event_seq = match e.event_seq.view() {
|
||||
JsonValue::Number(n) => n as u64,
|
||||
_ => return None,
|
||||
};
|
||||
let sled_id = match e.sled_id.view() {
|
||||
JsonValue::String(s) if !s.is_empty() => s,
|
||||
_ => return None,
|
||||
};
|
||||
let timestamp = match e.timestamp.view() {
|
||||
JsonValue::Number(n) => n,
|
||||
_ => 0.0,
|
||||
};
|
||||
let story_id = match e.story_id.view() {
|
||||
JsonValue::String(s) => s,
|
||||
_ => String::new(),
|
||||
};
|
||||
let from_stage = match e.from_stage.view() {
|
||||
JsonValue::String(s) => s,
|
||||
_ => String::new(),
|
||||
};
|
||||
let to_stage = match e.to_stage.view() {
|
||||
JsonValue::String(s) => s,
|
||||
_ => String::new(),
|
||||
};
|
||||
let pipeline_event = match e.pipeline_event.view() {
|
||||
JsonValue::String(s) => s,
|
||||
_ => String::new(),
|
||||
};
|
||||
Some(EventLogEntryRaw {
|
||||
event_seq,
|
||||
sled_id,
|
||||
timestamp,
|
||||
story_id,
|
||||
from_stage,
|
||||
to_stage,
|
||||
pipeline_event,
|
||||
})
|
||||
}
|
||||
@@ -14,6 +14,7 @@ use bft_json_crdt::op::OpId;
|
||||
|
||||
mod active_agents;
|
||||
mod agent_throttle;
|
||||
mod event_log;
|
||||
mod gateway_projects;
|
||||
mod merge_jobs;
|
||||
mod test_jobs;
|
||||
@@ -28,6 +29,7 @@ pub use active_agents::{
|
||||
pub use agent_throttle::{
|
||||
delete_agent_throttle, read_agent_throttle, read_all_agent_throttles, write_agent_throttle,
|
||||
};
|
||||
pub use event_log::{EventLogEntryRaw, append_event_log_entry, read_all_event_log_entries};
|
||||
pub use gateway_projects::{
|
||||
delete_gateway_project, read_all_gateway_projects, read_gateway_project, write_gateway_project,
|
||||
};
|
||||
|
||||
@@ -28,12 +28,13 @@ mod write;
|
||||
|
||||
pub use gateway_config::{read_gateway_active_project, write_gateway_active_project};
|
||||
pub use lww_maps::{
|
||||
delete_active_agent, delete_agent_throttle, delete_gateway_project, delete_merge_job,
|
||||
delete_test_job, delete_token_usage, read_active_agent, read_agent_throttle,
|
||||
read_all_active_agents, read_all_agent_throttles, read_all_gateway_projects,
|
||||
read_all_merge_jobs, read_all_test_jobs, read_all_token_usage, read_gateway_project,
|
||||
read_merge_job, read_test_job, read_token_usage, write_active_agent, write_agent_throttle,
|
||||
write_gateway_project, write_merge_job, write_test_job, write_token_usage,
|
||||
EventLogEntryRaw, append_event_log_entry, delete_active_agent, delete_agent_throttle,
|
||||
delete_gateway_project, delete_merge_job, delete_test_job, delete_token_usage,
|
||||
read_active_agent, read_agent_throttle, read_all_active_agents, read_all_agent_throttles,
|
||||
read_all_event_log_entries, read_all_gateway_projects, read_all_merge_jobs, read_all_test_jobs,
|
||||
read_all_token_usage, read_gateway_project, read_merge_job, read_test_job, read_token_usage,
|
||||
write_active_agent, write_agent_throttle, write_gateway_project, write_merge_job,
|
||||
write_test_job, write_token_usage,
|
||||
};
|
||||
pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops};
|
||||
pub use presence::{
|
||||
@@ -49,9 +50,9 @@ pub(crate) use state::flush_persistence;
|
||||
pub use state::{init, subscribe};
|
||||
pub use types::{
|
||||
ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId,
|
||||
GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView,
|
||||
NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView,
|
||||
TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem,
|
||||
EventLogEntryCrdt, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt,
|
||||
MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt,
|
||||
PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem,
|
||||
};
|
||||
pub use write::{
|
||||
bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs,
|
||||
|
||||
@@ -46,6 +46,34 @@ pub struct PipelineDoc {
|
||||
pub agent_throttle: ListCrdt<AgentThrottleCrdt>,
|
||||
pub gateway_projects: ListCrdt<GatewayProjectCrdt>,
|
||||
pub gateway_config: GatewayConfigCrdt,
|
||||
/// Append-only log of every pipeline transition, persisted as CRDT ops.
|
||||
pub event_log: ListCrdt<EventLogEntryCrdt>,
|
||||
}
|
||||
|
||||
/// CRDT entry representing a single persisted pipeline stage-transition event.
|
||||
///
|
||||
/// Entries are append-only; once written they are never updated or tombstoned.
|
||||
/// The `event_seq` field is a per-sled monotonic counter computed at write time
|
||||
/// (count of existing entries for that sled), giving deterministic ordering for
|
||||
/// all transitions recorded by a single node even after CRDT replay on restart.
|
||||
#[add_crdt_fields]
|
||||
#[derive(Clone, CrdtNode, Debug)]
|
||||
pub struct EventLogEntryCrdt {
|
||||
/// Monotonic sequence number for this sled (0, 1, 2, …). Stored as `f64`
|
||||
/// because all CRDT scalar registers use JSON numbers.
|
||||
pub event_seq: LwwRegisterCrdt<f64>,
|
||||
/// Hex-encoded Ed25519 public key of the sled that recorded this event.
|
||||
pub sled_id: LwwRegisterCrdt<String>,
|
||||
/// Unix timestamp (seconds) when the transition fired.
|
||||
pub timestamp: LwwRegisterCrdt<f64>,
|
||||
/// Story ID of the work item that transitioned (e.g. `"42_story_foo"`).
|
||||
pub story_id: LwwRegisterCrdt<String>,
|
||||
/// Human-readable label of the stage before the transition.
|
||||
pub from_stage: LwwRegisterCrdt<String>,
|
||||
/// Human-readable label of the stage after the transition.
|
||||
pub to_stage: LwwRegisterCrdt<String>,
|
||||
/// String label of the `PipelineEvent` variant that triggered the transition.
|
||||
pub pipeline_event: LwwRegisterCrdt<String>,
|
||||
}
|
||||
|
||||
/// CRDT sub-document representing a single pipeline work item with LWW fields for stage, agent, etc.
|
||||
|
||||
@@ -0,0 +1,222 @@
|
||||
//! Pipeline transition event log — persists every `TransitionFired` event into
|
||||
//! the CRDT so the log survives server restarts and replicates across nodes.
|
||||
//!
|
||||
//! ## Design
|
||||
//!
|
||||
//! Each [`TransitionFired`][crate::pipeline_state::TransitionFired] is written
|
||||
//! as an [`EventLogEntryCrdt`][crate::crdt_state::EventLogEntryCrdt] entry in
|
||||
//! the `PipelineDoc::event_log` grow-only list. Because the list is backed by
|
||||
//! CRDT ops that are persisted to SQLite and replayed on startup, the log
|
||||
//! survives `rebuild_and_restart` without any additional bookkeeping.
|
||||
//!
|
||||
//! A monotonic per-sled sequence number (`event_seq`) is computed atomically
|
||||
//! while the CRDT lock is held, guaranteeing that no two entries from the same
|
||||
//! sled share a sequence number and that the numbers are contiguous from 0.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use chrono::DateTime;
|
||||
|
||||
/// A snapshot of a single persisted pipeline transition event.
|
||||
///
|
||||
/// Constructed by [`read_event_log`] from the raw CRDT entries.
|
||||
pub struct LoggedEvent {
|
||||
/// Monotonic sequence number for `sled_id` (0-based, contiguous).
|
||||
pub event_id: u64,
|
||||
/// Hex-encoded Ed25519 public key of the sled that recorded this event.
|
||||
pub sled_id: String,
|
||||
/// UTC timestamp when the transition fired.
|
||||
pub at: DateTime<chrono::Utc>,
|
||||
/// Story ID of the work item that transitioned.
|
||||
pub story_id: String,
|
||||
/// Human-readable label of the stage before the transition.
|
||||
pub from_stage: String,
|
||||
/// Human-readable label of the stage after the transition.
|
||||
pub to_stage: String,
|
||||
/// String label of the `PipelineEvent` variant that triggered the transition.
|
||||
pub pipeline_event: String,
|
||||
}
|
||||
|
||||
/// Write a single `TransitionFired` event into the CRDT event log.
|
||||
///
|
||||
/// Computes the next monotonic `event_seq` for this sled atomically inside
|
||||
/// the CRDT write lock and appends the entry. No-ops when the CRDT is not
|
||||
/// yet initialised (e.g. in gateway mode with no project root).
|
||||
pub fn log_transition_event(fired: &crate::pipeline_state::TransitionFired) {
|
||||
let sled_id = crate::crdt_state::our_node_id().unwrap_or_default();
|
||||
let timestamp = fired.at.timestamp() as f64;
|
||||
let from_stage = crate::pipeline_state::stage_label(&fired.before);
|
||||
let to_stage = crate::pipeline_state::stage_label(&fired.after);
|
||||
let pipeline_event = crate::pipeline_state::event_label(&fired.event);
|
||||
|
||||
crate::crdt_state::append_event_log_entry(
|
||||
&sled_id,
|
||||
timestamp,
|
||||
&fired.story_id.0,
|
||||
from_stage,
|
||||
to_stage,
|
||||
pipeline_event,
|
||||
);
|
||||
}
|
||||
|
||||
/// Read all persisted events from the CRDT event log.
|
||||
///
|
||||
/// Entries are returned sorted by `(sled_id, event_id)` so that events from
|
||||
/// each sled appear in monotonic order. Entries with malformed CRDT fields
|
||||
/// are silently dropped.
|
||||
pub fn read_event_log() -> Vec<LoggedEvent> {
|
||||
let mut entries: Vec<LoggedEvent> = crate::crdt_state::read_all_event_log_entries()
|
||||
.into_iter()
|
||||
.map(|raw| LoggedEvent {
|
||||
event_id: raw.event_seq,
|
||||
sled_id: raw.sled_id,
|
||||
at: DateTime::from_timestamp(raw.timestamp as i64, 0).unwrap_or_default(),
|
||||
story_id: raw.story_id,
|
||||
from_stage: raw.from_stage,
|
||||
to_stage: raw.to_stage,
|
||||
pipeline_event: raw.pipeline_event,
|
||||
})
|
||||
.collect();
|
||||
entries.sort_by(|a, b| a.sled_id.cmp(&b.sled_id).then(a.event_id.cmp(&b.event_id)));
|
||||
entries
|
||||
}
|
||||
|
||||
/// Spawn a background task that persists every `TransitionFired` event to the CRDT.
|
||||
///
|
||||
/// Subscribes to the global `TransitionFired` broadcast channel and calls
|
||||
/// [`log_transition_event`] for every received event without filtering.
|
||||
/// Lagged events are warned about but do not cause the subscriber to exit.
|
||||
pub fn spawn_event_log_subscriber() {
|
||||
let mut rx = crate::pipeline_state::subscribe_transitions();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(fired) => log_transition_event(&fired),
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||
crate::slog_warn!(
|
||||
"[event-log] Subscriber lagged, skipped {n} event(s); \
|
||||
some transitions may be absent from the persistent event log."
|
||||
);
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::crdt_state::PipelineDoc;
|
||||
use crate::pipeline_state::{PipelineEvent, PlanState, Stage, StoryId, TransitionFired};
|
||||
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, OpState};
|
||||
use bft_json_crdt::keypair::make_keypair;
|
||||
use bft_json_crdt::op::ROOT_ID;
|
||||
use serde_json::json;
|
||||
|
||||
fn make_fired(i: u32) -> TransitionFired {
|
||||
TransitionFired {
|
||||
story_id: StoryId(format!("test_{i}")),
|
||||
before: Stage::Backlog,
|
||||
after: Stage::Coding {
|
||||
claim: None,
|
||||
plan: PlanState::Missing,
|
||||
retries: 0,
|
||||
},
|
||||
event: PipelineEvent::DepsMet,
|
||||
at: chrono::Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// AC4: fire N `TransitionFired` events, simulate a restart by re-initialising
|
||||
/// the CRDT (replaying all ops on a fresh doc), assert all N entries appear in
|
||||
/// the log in insertion order with monotonically increasing IDs.
|
||||
#[test]
|
||||
fn event_log_survives_crdt_reinit() {
|
||||
let kp = make_keypair();
|
||||
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
let sled_id = crate::crdt_state::hex::encode(&crdt1.id);
|
||||
|
||||
let n = 5usize;
|
||||
let mut ops = Vec::new();
|
||||
// Track the last OpId so each entry appends to the end (insert after
|
||||
// ROOT_ID would place each entry at the front, reversing the sequence).
|
||||
let mut last_id = ROOT_ID;
|
||||
|
||||
for i in 0..n {
|
||||
let entry: JsonValue = json!({
|
||||
"event_seq": i as f64,
|
||||
"sled_id": &sled_id,
|
||||
"timestamp": 1_000_000.0_f64 + i as f64,
|
||||
"story_id": format!("story_{i}"),
|
||||
"from_stage": "backlog",
|
||||
"to_stage": "coding",
|
||||
"pipeline_event": "DepsMet",
|
||||
})
|
||||
.into();
|
||||
let op = crdt1.doc.event_log.insert(last_id, entry).sign(&kp);
|
||||
last_id = op.inner.id;
|
||||
assert_eq!(crdt1.apply(op.clone()), OpState::Ok);
|
||||
ops.push(op);
|
||||
}
|
||||
|
||||
assert_eq!(crdt1.doc.event_log.view().len(), n);
|
||||
|
||||
// Simulate restart: replay the same ops on a fresh CRDT instance.
|
||||
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||||
for op in ops {
|
||||
assert_eq!(crdt2.apply(op), OpState::Ok);
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
crdt2.doc.event_log.view().len(),
|
||||
n,
|
||||
"all {n} entries must survive CRDT re-init"
|
||||
);
|
||||
|
||||
// Entries must appear in insertion order with monotonically increasing IDs.
|
||||
for i in 0..n {
|
||||
let entry = &crdt2.doc.event_log[i];
|
||||
let seq = match entry.event_seq.view() {
|
||||
JsonValue::Number(v) => v as u64,
|
||||
other => panic!("expected numeric event_seq at index {i}, got {other:?}"),
|
||||
};
|
||||
assert_eq!(seq, i as u64, "event_seq must equal insertion index {i}");
|
||||
assert_eq!(
|
||||
entry.story_id.view(),
|
||||
JsonValue::String(format!("story_{i}")),
|
||||
"story_id mismatch at index {i}"
|
||||
);
|
||||
assert_eq!(
|
||||
entry.sled_id.view(),
|
||||
JsonValue::String(sled_id.clone()),
|
||||
"sled_id mismatch at index {i}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// AC2: every `TransitionFired` event is written to the log without filtering.
|
||||
#[test]
|
||||
fn log_transition_event_appends_all_events() {
|
||||
crate::crdt_state::init_for_test();
|
||||
|
||||
let n = 4u32;
|
||||
for i in 0..n {
|
||||
log_transition_event(&make_fired(i));
|
||||
}
|
||||
|
||||
let entries = crate::crdt_state::read_all_event_log_entries();
|
||||
assert_eq!(
|
||||
entries.len(),
|
||||
n as usize,
|
||||
"expected {n} event log entries, got {}",
|
||||
entries.len()
|
||||
);
|
||||
|
||||
// Verify monotonic sequence numbers 0..n-1.
|
||||
let mut seqs: Vec<u64> = entries.iter().map(|e| e.event_seq).collect();
|
||||
seqs.sort_unstable();
|
||||
let expected: Vec<u64> = (0..u64::from(n)).collect();
|
||||
assert_eq!(seqs, expected, "event_seq values must be 0..{n}");
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,8 @@ pub mod crdt_sync;
|
||||
/// CRDT wire format — on-wire message types for the crdt-sync protocol.
|
||||
pub mod crdt_wire;
|
||||
mod db;
|
||||
/// Event log — CRDT-persisted append-only log of every pipeline stage transition.
|
||||
pub(crate) mod event_log;
|
||||
/// Gateway mode — multi-project reverse proxy that fronts multiple project containers.
|
||||
pub mod gateway;
|
||||
mod gateway_relay;
|
||||
|
||||
@@ -28,6 +28,10 @@ pub(crate) fn spawn_event_bridges(
|
||||
// Audit log subscriber: write one structured line per pipeline transition.
|
||||
crate::pipeline_state::spawn_audit_log_subscriber();
|
||||
|
||||
// Event log subscriber: persist every transition to the CRDT event log so
|
||||
// the history survives rebuild_and_restart and replicates across nodes.
|
||||
crate::event_log::spawn_event_log_subscriber();
|
||||
|
||||
// CRDT → watcher bridge: translate CRDT stage-transition events into
|
||||
// WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign)
|
||||
// see a uniform stream regardless of whether the event originated from the
|
||||
|
||||
Reference in New Issue
Block a user