//! Pipeline transition event log — persists every `TransitionFired` event into //! the CRDT so the log survives server restarts and replicates across nodes. //! //! ## Design //! //! Each [`TransitionFired`][crate::pipeline_state::TransitionFired] is written //! as an [`EventLogEntryCrdt`][crate::crdt_state::EventLogEntryCrdt] entry in //! the `PipelineDoc::event_log` grow-only list. Because the list is backed by //! CRDT ops that are persisted to SQLite and replayed on startup, the log //! survives `rebuild_and_restart` without any additional bookkeeping. //! //! A monotonic per-sled sequence number (`event_seq`) is computed atomically //! while the CRDT lock is held, guaranteeing that no two entries from the same //! sled share a sequence number and that the numbers are contiguous from 0. #![allow(dead_code)] use chrono::DateTime; /// Monotonic per-sled logical sequence number identifying a pipeline event. /// /// This is the sequence number that *would have been assigned* to an event in the /// contiguous logical event stream, as tracked by the event-log subscriber. It /// differs from the CRDT `event_seq` (which counts CRDT entries including gap /// sentinels) but is meaningful for identifying the range of dropped events when /// a gap is inserted. pub type EventId = u64; /// A snapshot of a single persisted pipeline transition event. /// /// Constructed by [`read_event_log`] from the raw CRDT entries. pub struct LoggedEvent { /// Monotonic sequence number for `sled_id` (0-based, contiguous). pub event_id: u64, /// Hex-encoded Ed25519 public key of the sled that recorded this event. pub sled_id: String, /// UTC timestamp when the transition fired. pub at: DateTime, /// Story ID of the work item that transitioned. pub story_id: String, /// Human-readable label of the stage before the transition. pub from_stage: String, /// Human-readable label of the stage after the transition. pub to_stage: String, /// String label of the `PipelineEvent` variant that triggered the transition. pub pipeline_event: String, } /// Write a single `TransitionFired` event into the CRDT event log. /// /// Computes the next monotonic `event_seq` for this sled atomically inside /// the CRDT write lock and appends the entry. No-ops when the CRDT is not /// yet initialised (e.g. in gateway mode with no project root). pub fn log_transition_event(fired: &crate::pipeline_state::TransitionFired) { let sled_id = crate::crdt_state::our_node_id().unwrap_or_default(); let timestamp = fired.at.timestamp() as f64; let from_stage = crate::pipeline_state::stage_label(&fired.before); let to_stage = crate::pipeline_state::stage_label(&fired.after); let pipeline_event = crate::pipeline_state::event_label(&fired.event); crate::crdt_state::append_event_log_entry( &sled_id, timestamp, &fired.story_id.0, from_stage, to_stage, pipeline_event, ); } /// Read all persisted events from the CRDT event log. /// /// Entries are returned sorted by `(sled_id, event_id)` so that events from /// each sled appear in monotonic order. Entries with malformed CRDT fields /// are silently dropped. pub fn read_event_log() -> Vec { let mut entries: Vec = crate::crdt_state::read_all_event_log_entries() .into_iter() .map(|raw| LoggedEvent { event_id: raw.event_seq, sled_id: raw.sled_id, at: DateTime::from_timestamp(raw.timestamp as i64, 0).unwrap_or_default(), story_id: raw.story_id, from_stage: raw.from_stage, to_stage: raw.to_stage, pipeline_event: raw.pipeline_event, }) .collect(); entries.sort_by(|a, b| a.sled_id.cmp(&b.sled_id).then(a.event_id.cmp(&b.event_id))); entries } /// Append a gap sentinel to the event log for the local sled. /// /// Encodes the logical [`EventId`] range `[from_id, to_id]` of dropped events /// using the `EventStreamGap` pipeline event marker. Should be called whenever /// the event-log subscriber detects a lag in the broadcast channel so that no /// drop is silent. pub fn insert_gap_sentinel(from_id: EventId, to_id: EventId) { let sled_id = crate::crdt_state::our_node_id().unwrap_or_default(); crate::crdt_state::append_gap_log_entry(&sled_id, from_id, to_id); log_gap_observability(&sled_id, from_id, to_id); } /// Spawn a background task that persists every `TransitionFired` event to the CRDT. /// /// Subscribes to the global `TransitionFired` broadcast channel. Normal events /// are persisted via [`log_transition_event`]. When the subscriber lags (the /// broadcast channel drops the oldest messages), a single /// `EventStreamGap` sentinel is appended to the log covering the dropped range /// so no transition is silently lost. pub fn spawn_event_log_subscriber() { let mut rx = crate::pipeline_state::subscribe_transitions(); tokio::spawn(async move { // Tracks the next expected logical sequence number in the subscriber's // view of the event stream. Incremented on every successfully processed // event; advanced by the gap size on each lag so we can identify the // exact logical range of dropped events. let mut next_logical_seq: EventId = 0; loop { match rx.recv().await { Ok(fired) => { log_transition_event(&fired); next_logical_seq += 1; } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { let from = next_logical_seq; let to = next_logical_seq + n - 1; crate::slog_warn!( "[event-log] Subscriber lagged; {n} event(s) dropped \ (logical ids {from}..={to}); gap sentinel appended." ); insert_gap_sentinel(from, to); next_logical_seq += n; } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } }); } /// Emit observability log lines after inserting a gap sentinel. fn log_gap_observability(sled_id: &str, from_id: EventId, to_id: EventId) { let entries = crate::crdt_state::read_all_event_log_entries(); let sled_total: usize = entries.iter().filter(|e| e.sled_id == sled_id).count(); let gap_count: usize = entries .iter() .filter(|e| { e.sled_id == sled_id && e.pipeline_event == crate::crdt_state::GAP_PIPELINE_EVENT }) .count(); crate::slog!( "[event-log] gap inserted sled={sled_id} from={from_id} to={to_id} \ sled_entries={sled_total} gap_count={gap_count}" ); } #[cfg(test)] mod tests { use super::*; use crate::crdt_state::PipelineDoc; use crate::pipeline_state::{PipelineEvent, PlanState, Stage, StoryId, TransitionFired}; use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, OpState}; use bft_json_crdt::keypair::make_keypair; use bft_json_crdt::op::ROOT_ID; use serde_json::json; fn make_fired(i: u32) -> TransitionFired { TransitionFired { story_id: StoryId(format!("test_{i}")), before: Stage::Backlog, after: Stage::Coding { claim: None, plan: PlanState::Missing, retries: 0, }, event: PipelineEvent::DepsMet, at: chrono::Utc::now(), } } /// AC4: fire N `TransitionFired` events, simulate a restart by re-initialising /// the CRDT (replaying all ops on a fresh doc), assert all N entries appear in /// the log in insertion order with monotonically increasing IDs. #[test] fn event_log_survives_crdt_reinit() { let kp = make_keypair(); let mut crdt1 = BaseCrdt::::new(&kp); let sled_id = crate::crdt_state::hex::encode(&crdt1.id); let n = 5usize; let mut ops = Vec::new(); // Track the last OpId so each entry appends to the end (insert after // ROOT_ID would place each entry at the front, reversing the sequence). let mut last_id = ROOT_ID; for i in 0..n { let entry: JsonValue = json!({ "event_seq": i as f64, "sled_id": &sled_id, "timestamp": 1_000_000.0_f64 + i as f64, "story_id": format!("story_{i}"), "from_stage": "backlog", "to_stage": "coding", "pipeline_event": "DepsMet", }) .into(); let op = crdt1.doc.event_log.insert(last_id, entry).sign(&kp); last_id = op.inner.id; assert_eq!(crdt1.apply(op.clone()), OpState::Ok); ops.push(op); } assert_eq!(crdt1.doc.event_log.view().len(), n); // Simulate restart: replay the same ops on a fresh CRDT instance. let mut crdt2 = BaseCrdt::::new(&kp); for op in ops { assert_eq!(crdt2.apply(op), OpState::Ok); } assert_eq!( crdt2.doc.event_log.view().len(), n, "all {n} entries must survive CRDT re-init" ); // Entries must appear in insertion order with monotonically increasing IDs. for i in 0..n { let entry = &crdt2.doc.event_log[i]; let seq = match entry.event_seq.view() { JsonValue::Number(v) => v as u64, other => panic!("expected numeric event_seq at index {i}, got {other:?}"), }; assert_eq!(seq, i as u64, "event_seq must equal insertion index {i}"); assert_eq!( entry.story_id.view(), JsonValue::String(format!("story_{i}")), "story_id mismatch at index {i}" ); assert_eq!( entry.sled_id.view(), JsonValue::String(sled_id.clone()), "sled_id mismatch at index {i}" ); } } /// AC4: fill the feeder queue past capacity by inserting a gap sentinel, then /// assert (a) the gap sentinel appears in the event log and (b) the assembled /// context contains the human-readable gap line. #[test] fn gap_sentinel_in_log_and_assembled_context() { crate::crdt_state::init_for_test(); // Log 3 real events (logical ids 0, 1, 2). for i in 0..3u32 { log_transition_event(&make_fired(i)); } // Simulate: the feeder queue overflowed and logical ids 3..=5 were dropped. insert_gap_sentinel(3, 5); // Log one more real event after the gap. log_transition_event(&make_fired(99)); // (a) Gap sentinel must appear in read_event_log(). let entries = read_event_log(); let gap = entries .iter() .find(|e| e.pipeline_event == crate::crdt_state::GAP_PIPELINE_EVENT); assert!(gap.is_some(), "gap sentinel must be present in event log"); let gap = gap.unwrap(); // from_stage encodes the from EventId; to_stage encodes the to EventId. assert_eq!(gap.from_stage, "3", "gap from_stage must be '3'"); assert_eq!(gap.to_stage, "5", "gap to_stage must be '5'"); // (b) assemble_prompt_context must render the gap line. let ctx = crate::llm_session::assemble_prompt_context("room-gap-e2e"); assert!( ctx.contains("events between 3 and 5 were dropped"), "assembled context must contain gap line; got: {ctx}" ); // Real events must also appear. assert!( ctx.contains("test_0"), "first story must appear; got: {ctx}" ); assert!( ctx.contains("test_99"), "last story must appear; got: {ctx}" ); } /// AC2: every `TransitionFired` event is written to the log without filtering. #[test] fn log_transition_event_appends_all_events() { crate::crdt_state::init_for_test(); let n = 4u32; for i in 0..n { log_transition_event(&make_fired(i)); } let entries = crate::crdt_state::read_all_event_log_entries(); assert_eq!( entries.len(), n as usize, "expected {n} event log entries, got {}", entries.len() ); // Verify monotonic sequence numbers 0..n-1. let mut seqs: Vec = entries.iter().map(|e| e.event_seq).collect(); seqs.sort_unstable(); let expected: Vec = (0..u64::from(n)).collect(); assert_eq!(seqs, expected, "event_seq values must be 0..{n}"); } }