diff --git a/server/src/crdt_state/lww_maps/event_log.rs b/server/src/crdt_state/lww_maps/event_log.rs new file mode 100644 index 00000000..6e1f62b0 --- /dev/null +++ b/server/src/crdt_state/lww_maps/event_log.rs @@ -0,0 +1,148 @@ +//! Read/write helpers for the `event_log` append-only list in the CRDT document. +//! +//! Every pipeline stage transition is appended as an [`EventLogEntryCrdt`][super::super::types::EventLogEntryCrdt] +//! entry. Entries are never updated or tombstoned — the list is strictly grow-only. +//! Monotonic sequencing is computed at write time while holding the CRDT lock, +//! so `event_seq` values for a given sled are always contiguous and gap-free. + +use bft_json_crdt::json_crdt::{JsonValue, *}; +use bft_json_crdt::op::ROOT_ID; +use serde_json::json; + +use super::super::state::{apply_and_persist, get_crdt}; +use super::super::types::EventLogEntryCrdt; + +/// Raw event log entry extracted from the CRDT document. +/// +/// All fields are decoded to Rust primitives; entries with a missing or +/// malformed `sled_id` are silently dropped by [`read_all_event_log_entries`]. +pub struct EventLogEntryRaw { + /// Monotonic sequence number for the recording sled (0-based). + pub event_seq: u64, + /// Hex-encoded Ed25519 public key of the sled that wrote this entry. + pub sled_id: String, + /// Unix timestamp (seconds) when the transition fired. + pub timestamp: f64, + /// Story ID of the work item that transitioned. + pub story_id: String, + /// Human-readable label of the stage before the transition. + pub from_stage: String, + /// Human-readable label of the stage after the transition. + pub to_stage: String, + /// String label of the `PipelineEvent` variant. + pub pipeline_event: String, +} + +/// Append a new event log entry to the CRDT, computing the monotonic `event_seq` +/// atomically while the CRDT lock is held. +/// +/// No-ops silently when the CRDT is not yet initialised. +pub fn append_event_log_entry( + sled_id: &str, + timestamp: f64, + story_id: &str, + from_stage: &str, + to_stage: &str, + pipeline_event: &str, +) { + let Some(state_mutex) = get_crdt() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + + // Count existing entries for this sled while holding the lock so the seq + // is computed and used in the same critical section — no TOCTOU gap. + let event_seq = state + .crdt + .doc + .event_log + .iter() + .filter(|e| matches!(e.sled_id.view(), JsonValue::String(s) if s == sled_id)) + .count() as f64; + + // Append after the last existing entry so the list stays in insertion order. + // Inserting after ROOT_ID would place each entry at the front (RGA semantics), + // reversing the sequence; inserting after the current tail preserves order. + let total_len = state.crdt.doc.event_log.view().len(); + let after = if total_len > 0 { + super::list_id_at(&state.crdt.doc.event_log, total_len - 1).unwrap_or(ROOT_ID) + } else { + ROOT_ID + }; + + let entry: JsonValue = json!({ + "event_seq": event_seq, + "sled_id": sled_id, + "timestamp": timestamp, + "story_id": story_id, + "from_stage": from_stage, + "to_stage": to_stage, + "pipeline_event": pipeline_event, + }) + .into(); + + apply_and_persist(&mut state, |s| s.crdt.doc.event_log.insert(after, entry)); +} + +/// Read all event log entries from the CRDT document. +/// +/// Entries with a missing or empty `sled_id` are silently skipped. +/// Order reflects CRDT insertion order (RGA list semantics). +pub fn read_all_event_log_entries() -> Vec { + let Some(state_mutex) = get_crdt() else { + return Vec::new(); + }; + let Ok(state) = state_mutex.lock() else { + return Vec::new(); + }; + state + .crdt + .doc + .event_log + .iter() + .filter_map(extract_entry) + .collect() +} + +/// Convert a CRDT event log entry to its read-side representation. +fn extract_entry(e: &EventLogEntryCrdt) -> Option { + let event_seq = match e.event_seq.view() { + JsonValue::Number(n) => n as u64, + _ => return None, + }; + let sled_id = match e.sled_id.view() { + JsonValue::String(s) if !s.is_empty() => s, + _ => return None, + }; + let timestamp = match e.timestamp.view() { + JsonValue::Number(n) => n, + _ => 0.0, + }; + let story_id = match e.story_id.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let from_stage = match e.from_stage.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let to_stage = match e.to_stage.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + let pipeline_event = match e.pipeline_event.view() { + JsonValue::String(s) => s, + _ => String::new(), + }; + Some(EventLogEntryRaw { + event_seq, + sled_id, + timestamp, + story_id, + from_stage, + to_stage, + pipeline_event, + }) +} diff --git a/server/src/crdt_state/lww_maps/mod.rs b/server/src/crdt_state/lww_maps/mod.rs index d25bc656..b605a4f1 100644 --- a/server/src/crdt_state/lww_maps/mod.rs +++ b/server/src/crdt_state/lww_maps/mod.rs @@ -14,6 +14,7 @@ use bft_json_crdt::op::OpId; mod active_agents; mod agent_throttle; +mod event_log; mod gateway_projects; mod merge_jobs; mod test_jobs; @@ -28,6 +29,7 @@ pub use active_agents::{ pub use agent_throttle::{ delete_agent_throttle, read_agent_throttle, read_all_agent_throttles, write_agent_throttle, }; +pub use event_log::{EventLogEntryRaw, append_event_log_entry, read_all_event_log_entries}; pub use gateway_projects::{ delete_gateway_project, read_all_gateway_projects, read_gateway_project, write_gateway_project, }; diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index d11bd39d..37f4db81 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -28,12 +28,13 @@ mod write; pub use gateway_config::{read_gateway_active_project, write_gateway_active_project}; pub use lww_maps::{ - delete_active_agent, delete_agent_throttle, delete_gateway_project, delete_merge_job, - delete_test_job, delete_token_usage, read_active_agent, read_agent_throttle, - read_all_active_agents, read_all_agent_throttles, read_all_gateway_projects, - read_all_merge_jobs, read_all_test_jobs, read_all_token_usage, read_gateway_project, - read_merge_job, read_test_job, read_token_usage, write_active_agent, write_agent_throttle, - write_gateway_project, write_merge_job, write_test_job, write_token_usage, + EventLogEntryRaw, append_event_log_entry, delete_active_agent, delete_agent_throttle, + delete_gateway_project, delete_merge_job, delete_test_job, delete_token_usage, + read_active_agent, read_agent_throttle, read_all_active_agents, read_all_agent_throttles, + read_all_event_log_entries, read_all_gateway_projects, read_all_merge_jobs, read_all_test_jobs, + read_all_token_usage, read_gateway_project, read_merge_job, read_test_job, read_token_usage, + write_active_agent, write_agent_throttle, write_gateway_project, write_merge_job, + write_test_job, write_token_usage, }; pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops}; pub use presence::{ @@ -49,9 +50,9 @@ pub(crate) use state::flush_persistence; pub use state::{init, subscribe}; pub use types::{ ActiveAgentCrdt, ActiveAgentView, AgentThrottleCrdt, AgentThrottleView, CrdtEvent, EpicId, - GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, MergeJobView, - NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, PipelineItemView, - TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, + EventLogEntryCrdt, GatewayConfigCrdt, GatewayProjectCrdt, GatewayProjectView, MergeJobCrdt, + MergeJobView, NodePresenceCrdt, NodePresenceView, PipelineDoc, PipelineItemCrdt, + PipelineItemView, TestJobCrdt, TestJobView, TokenUsageCrdt, TokenUsageView, WorkItem, }; pub use write::{ bump_retry_count, migrate_legacy_stage_strings, migrate_merge_job, migrate_names_from_slugs, diff --git a/server/src/crdt_state/types.rs b/server/src/crdt_state/types.rs index 42e8a496..01e75aac 100644 --- a/server/src/crdt_state/types.rs +++ b/server/src/crdt_state/types.rs @@ -46,6 +46,34 @@ pub struct PipelineDoc { pub agent_throttle: ListCrdt, pub gateway_projects: ListCrdt, pub gateway_config: GatewayConfigCrdt, + /// Append-only log of every pipeline transition, persisted as CRDT ops. + pub event_log: ListCrdt, +} + +/// CRDT entry representing a single persisted pipeline stage-transition event. +/// +/// Entries are append-only; once written they are never updated or tombstoned. +/// The `event_seq` field is a per-sled monotonic counter computed at write time +/// (count of existing entries for that sled), giving deterministic ordering for +/// all transitions recorded by a single node even after CRDT replay on restart. +#[add_crdt_fields] +#[derive(Clone, CrdtNode, Debug)] +pub struct EventLogEntryCrdt { + /// Monotonic sequence number for this sled (0, 1, 2, …). Stored as `f64` + /// because all CRDT scalar registers use JSON numbers. + pub event_seq: LwwRegisterCrdt, + /// Hex-encoded Ed25519 public key of the sled that recorded this event. + pub sled_id: LwwRegisterCrdt, + /// Unix timestamp (seconds) when the transition fired. + pub timestamp: LwwRegisterCrdt, + /// Story ID of the work item that transitioned (e.g. `"42_story_foo"`). + pub story_id: LwwRegisterCrdt, + /// Human-readable label of the stage before the transition. + pub from_stage: LwwRegisterCrdt, + /// Human-readable label of the stage after the transition. + pub to_stage: LwwRegisterCrdt, + /// String label of the `PipelineEvent` variant that triggered the transition. + pub pipeline_event: LwwRegisterCrdt, } /// CRDT sub-document representing a single pipeline work item with LWW fields for stage, agent, etc. diff --git a/server/src/event_log/mod.rs b/server/src/event_log/mod.rs new file mode 100644 index 00000000..45d41013 --- /dev/null +++ b/server/src/event_log/mod.rs @@ -0,0 +1,222 @@ +//! Pipeline transition event log — persists every `TransitionFired` event into +//! the CRDT so the log survives server restarts and replicates across nodes. +//! +//! ## Design +//! +//! Each [`TransitionFired`][crate::pipeline_state::TransitionFired] is written +//! as an [`EventLogEntryCrdt`][crate::crdt_state::EventLogEntryCrdt] entry in +//! the `PipelineDoc::event_log` grow-only list. Because the list is backed by +//! CRDT ops that are persisted to SQLite and replayed on startup, the log +//! survives `rebuild_and_restart` without any additional bookkeeping. +//! +//! A monotonic per-sled sequence number (`event_seq`) is computed atomically +//! while the CRDT lock is held, guaranteeing that no two entries from the same +//! sled share a sequence number and that the numbers are contiguous from 0. + +#![allow(dead_code)] + +use chrono::DateTime; + +/// A snapshot of a single persisted pipeline transition event. +/// +/// Constructed by [`read_event_log`] from the raw CRDT entries. +pub struct LoggedEvent { + /// Monotonic sequence number for `sled_id` (0-based, contiguous). + pub event_id: u64, + /// Hex-encoded Ed25519 public key of the sled that recorded this event. + pub sled_id: String, + /// UTC timestamp when the transition fired. + pub at: DateTime, + /// Story ID of the work item that transitioned. + pub story_id: String, + /// Human-readable label of the stage before the transition. + pub from_stage: String, + /// Human-readable label of the stage after the transition. + pub to_stage: String, + /// String label of the `PipelineEvent` variant that triggered the transition. + pub pipeline_event: String, +} + +/// Write a single `TransitionFired` event into the CRDT event log. +/// +/// Computes the next monotonic `event_seq` for this sled atomically inside +/// the CRDT write lock and appends the entry. No-ops when the CRDT is not +/// yet initialised (e.g. in gateway mode with no project root). +pub fn log_transition_event(fired: &crate::pipeline_state::TransitionFired) { + let sled_id = crate::crdt_state::our_node_id().unwrap_or_default(); + let timestamp = fired.at.timestamp() as f64; + let from_stage = crate::pipeline_state::stage_label(&fired.before); + let to_stage = crate::pipeline_state::stage_label(&fired.after); + let pipeline_event = crate::pipeline_state::event_label(&fired.event); + + crate::crdt_state::append_event_log_entry( + &sled_id, + timestamp, + &fired.story_id.0, + from_stage, + to_stage, + pipeline_event, + ); +} + +/// Read all persisted events from the CRDT event log. +/// +/// Entries are returned sorted by `(sled_id, event_id)` so that events from +/// each sled appear in monotonic order. Entries with malformed CRDT fields +/// are silently dropped. +pub fn read_event_log() -> Vec { + let mut entries: Vec = crate::crdt_state::read_all_event_log_entries() + .into_iter() + .map(|raw| LoggedEvent { + event_id: raw.event_seq, + sled_id: raw.sled_id, + at: DateTime::from_timestamp(raw.timestamp as i64, 0).unwrap_or_default(), + story_id: raw.story_id, + from_stage: raw.from_stage, + to_stage: raw.to_stage, + pipeline_event: raw.pipeline_event, + }) + .collect(); + entries.sort_by(|a, b| a.sled_id.cmp(&b.sled_id).then(a.event_id.cmp(&b.event_id))); + entries +} + +/// Spawn a background task that persists every `TransitionFired` event to the CRDT. +/// +/// Subscribes to the global `TransitionFired` broadcast channel and calls +/// [`log_transition_event`] for every received event without filtering. +/// Lagged events are warned about but do not cause the subscriber to exit. +pub fn spawn_event_log_subscriber() { + let mut rx = crate::pipeline_state::subscribe_transitions(); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(fired) => log_transition_event(&fired), + Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + crate::slog_warn!( + "[event-log] Subscriber lagged, skipped {n} event(s); \ + some transitions may be absent from the persistent event log." + ); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::crdt_state::PipelineDoc; + use crate::pipeline_state::{PipelineEvent, PlanState, Stage, StoryId, TransitionFired}; + use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, OpState}; + use bft_json_crdt::keypair::make_keypair; + use bft_json_crdt::op::ROOT_ID; + use serde_json::json; + + fn make_fired(i: u32) -> TransitionFired { + TransitionFired { + story_id: StoryId(format!("test_{i}")), + before: Stage::Backlog, + after: Stage::Coding { + claim: None, + plan: PlanState::Missing, + retries: 0, + }, + event: PipelineEvent::DepsMet, + at: chrono::Utc::now(), + } + } + + /// AC4: fire N `TransitionFired` events, simulate a restart by re-initialising + /// the CRDT (replaying all ops on a fresh doc), assert all N entries appear in + /// the log in insertion order with monotonically increasing IDs. + #[test] + fn event_log_survives_crdt_reinit() { + let kp = make_keypair(); + let mut crdt1 = BaseCrdt::::new(&kp); + let sled_id = crate::crdt_state::hex::encode(&crdt1.id); + + let n = 5usize; + let mut ops = Vec::new(); + // Track the last OpId so each entry appends to the end (insert after + // ROOT_ID would place each entry at the front, reversing the sequence). + let mut last_id = ROOT_ID; + + for i in 0..n { + let entry: JsonValue = json!({ + "event_seq": i as f64, + "sled_id": &sled_id, + "timestamp": 1_000_000.0_f64 + i as f64, + "story_id": format!("story_{i}"), + "from_stage": "backlog", + "to_stage": "coding", + "pipeline_event": "DepsMet", + }) + .into(); + let op = crdt1.doc.event_log.insert(last_id, entry).sign(&kp); + last_id = op.inner.id; + assert_eq!(crdt1.apply(op.clone()), OpState::Ok); + ops.push(op); + } + + assert_eq!(crdt1.doc.event_log.view().len(), n); + + // Simulate restart: replay the same ops on a fresh CRDT instance. + let mut crdt2 = BaseCrdt::::new(&kp); + for op in ops { + assert_eq!(crdt2.apply(op), OpState::Ok); + } + + assert_eq!( + crdt2.doc.event_log.view().len(), + n, + "all {n} entries must survive CRDT re-init" + ); + + // Entries must appear in insertion order with monotonically increasing IDs. + for i in 0..n { + let entry = &crdt2.doc.event_log[i]; + let seq = match entry.event_seq.view() { + JsonValue::Number(v) => v as u64, + other => panic!("expected numeric event_seq at index {i}, got {other:?}"), + }; + assert_eq!(seq, i as u64, "event_seq must equal insertion index {i}"); + assert_eq!( + entry.story_id.view(), + JsonValue::String(format!("story_{i}")), + "story_id mismatch at index {i}" + ); + assert_eq!( + entry.sled_id.view(), + JsonValue::String(sled_id.clone()), + "sled_id mismatch at index {i}" + ); + } + } + + /// AC2: every `TransitionFired` event is written to the log without filtering. + #[test] + fn log_transition_event_appends_all_events() { + crate::crdt_state::init_for_test(); + + let n = 4u32; + for i in 0..n { + log_transition_event(&make_fired(i)); + } + + let entries = crate::crdt_state::read_all_event_log_entries(); + assert_eq!( + entries.len(), + n as usize, + "expected {n} event log entries, got {}", + entries.len() + ); + + // Verify monotonic sequence numbers 0..n-1. + let mut seqs: Vec = entries.iter().map(|e| e.event_seq).collect(); + seqs.sort_unstable(); + let expected: Vec = (0..u64::from(n)).collect(); + assert_eq!(seqs, expected, "event_seq values must be 0..{n}"); + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 5ef3ccf6..e21fbc16 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -20,6 +20,8 @@ pub mod crdt_sync; /// CRDT wire format — on-wire message types for the crdt-sync protocol. pub mod crdt_wire; mod db; +/// Event log — CRDT-persisted append-only log of every pipeline stage transition. +pub(crate) mod event_log; /// Gateway mode — multi-project reverse proxy that fronts multiple project containers. pub mod gateway; mod gateway_relay; diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs index 4621ed84..954eaf2f 100644 --- a/server/src/startup/tick_loop.rs +++ b/server/src/startup/tick_loop.rs @@ -28,6 +28,10 @@ pub(crate) fn spawn_event_bridges( // Audit log subscriber: write one structured line per pipeline transition. crate::pipeline_state::spawn_audit_log_subscriber(); + // Event log subscriber: persist every transition to the CRDT event log so + // the history survives rebuild_and_restart and replicates across nodes. + crate::event_log::spawn_event_log_subscriber(); + // CRDT → watcher bridge: translate CRDT stage-transition events into // WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign) // see a uniform stream regardless of whether the event originated from the