Files
huskies/server/src/event_log/mod.rs
T

331 lines
13 KiB
Rust
Raw Normal View History

//! Pipeline transition event log — persists every `TransitionFired` event into
//! the CRDT so the log survives server restarts and replicates across nodes.
//!
//! ## Design
//!
//! Each [`TransitionFired`][crate::pipeline_state::TransitionFired] is written
//! as an [`EventLogEntryCrdt`][crate::crdt_state::EventLogEntryCrdt] entry in
//! the `PipelineDoc::event_log` grow-only list. Because the list is backed by
//! CRDT ops that are persisted to SQLite and replayed on startup, the log
//! survives `rebuild_and_restart` without any additional bookkeeping.
//!
//! A monotonic per-sled sequence number (`event_seq`) is computed atomically
//! while the CRDT lock is held, guaranteeing that no two entries from the same
//! sled share a sequence number and that the numbers are contiguous from 0.
#![allow(dead_code)]
use chrono::DateTime;
/// Monotonic per-sled logical sequence number identifying a pipeline event.
///
/// This is the sequence number that *would have been assigned* to an event in the
/// contiguous logical event stream, as tracked by the event-log subscriber. It
/// differs from the CRDT `event_seq` (which counts CRDT entries including gap
/// sentinels) but is meaningful for identifying the range of dropped events when
/// a gap is inserted.
pub type EventId = u64;
/// A snapshot of a single persisted pipeline transition event.
///
/// Constructed by [`read_event_log`] from the raw CRDT entries.
pub struct LoggedEvent {
/// Monotonic sequence number for `sled_id` (0-based, contiguous).
pub event_id: u64,
/// Hex-encoded Ed25519 public key of the sled that recorded this event.
pub sled_id: String,
/// UTC timestamp when the transition fired.
pub at: DateTime<chrono::Utc>,
/// Story ID of the work item that transitioned.
pub story_id: String,
/// Human-readable label of the stage before the transition.
pub from_stage: String,
/// Human-readable label of the stage after the transition.
pub to_stage: String,
/// String label of the `PipelineEvent` variant that triggered the transition.
pub pipeline_event: String,
}
/// Write a single `TransitionFired` event into the CRDT event log.
///
/// Computes the next monotonic `event_seq` for this sled atomically inside
/// the CRDT write lock and appends the entry. No-ops when the CRDT is not
/// yet initialised (e.g. in gateway mode with no project root).
pub fn log_transition_event(fired: &crate::pipeline_state::TransitionFired) {
let sled_id = crate::crdt_state::our_node_id().unwrap_or_default();
let timestamp = fired.at.timestamp() as f64;
let from_stage = crate::pipeline_state::stage_label(&fired.before);
let to_stage = crate::pipeline_state::stage_label(&fired.after);
let pipeline_event = crate::pipeline_state::event_label(&fired.event);
crate::crdt_state::append_event_log_entry(
&sled_id,
timestamp,
&fired.story_id.0,
from_stage,
to_stage,
pipeline_event,
);
// Real-time push to per-persona WebSocket subscribers.
crate::pipeline_event_bus::broadcast(crate::pipeline_event_bus::BusEvent {
sled_id,
story_id: fired.story_id.0.clone(),
from_stage: crate::pipeline_state::stage_label(&fired.before).to_string(),
to_stage: crate::pipeline_state::stage_label(&fired.after).to_string(),
pipeline_event: crate::pipeline_state::event_label(&fired.event).to_string(),
});
}
/// Read all persisted events from the CRDT event log.
///
/// Entries are returned sorted by `(sled_id, event_id)` so that events from
/// each sled appear in monotonic order. Entries with malformed CRDT fields
/// are silently dropped.
pub fn read_event_log() -> Vec<LoggedEvent> {
let mut entries: Vec<LoggedEvent> = crate::crdt_state::read_all_event_log_entries()
.into_iter()
.map(|raw| LoggedEvent {
event_id: raw.event_seq,
sled_id: raw.sled_id,
at: DateTime::from_timestamp(raw.timestamp as i64, 0).unwrap_or_default(),
story_id: raw.story_id,
from_stage: raw.from_stage,
to_stage: raw.to_stage,
pipeline_event: raw.pipeline_event,
})
.collect();
entries.sort_by(|a, b| a.sled_id.cmp(&b.sled_id).then(a.event_id.cmp(&b.event_id)));
entries
}
/// Append a gap sentinel to the event log for the local sled.
///
/// Encodes the logical [`EventId`] range `[from_id, to_id]` of dropped events
/// using the `EventStreamGap` pipeline event marker. Should be called whenever
/// the event-log subscriber detects a lag in the broadcast channel so that no
/// drop is silent.
pub fn insert_gap_sentinel(from_id: EventId, to_id: EventId) {
let sled_id = crate::crdt_state::our_node_id().unwrap_or_default();
crate::crdt_state::append_gap_log_entry(&sled_id, from_id, to_id);
log_gap_observability(&sled_id, from_id, to_id);
}
/// Spawn a background task that persists every `TransitionFired` event to the CRDT.
///
/// Subscribes to the global `TransitionFired` broadcast channel. Normal events
/// are persisted via [`log_transition_event`]. When the subscriber lags (the
/// broadcast channel drops the oldest messages), a single
/// `EventStreamGap` sentinel is appended to the log covering the dropped range
/// so no transition is silently lost.
pub fn spawn_event_log_subscriber() {
let mut rx = crate::pipeline_state::subscribe_transitions();
tokio::spawn(async move {
// Tracks the next expected logical sequence number in the subscriber's
// view of the event stream. Incremented on every successfully processed
// event; advanced by the gap size on each lag so we can identify the
// exact logical range of dropped events.
let mut next_logical_seq: EventId = 0;
loop {
match rx.recv().await {
Ok(fired) => {
// log_transition_event also broadcasts to the pipeline_event_bus.
log_transition_event(&fired);
next_logical_seq += 1;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
let from = next_logical_seq;
let to = next_logical_seq + n - 1;
crate::slog_warn!(
"[event-log] Subscriber lagged; {n} event(s) dropped \
(logical ids {from}..={to}); gap sentinel appended."
);
insert_gap_sentinel(from, to);
next_logical_seq += n;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
}
/// Emit observability log lines after inserting a gap sentinel.
fn log_gap_observability(sled_id: &str, from_id: EventId, to_id: EventId) {
let entries = crate::crdt_state::read_all_event_log_entries();
let sled_total: usize = entries.iter().filter(|e| e.sled_id == sled_id).count();
let gap_count: usize = entries
.iter()
.filter(|e| {
e.sled_id == sled_id && e.pipeline_event == crate::crdt_state::GAP_PIPELINE_EVENT
})
.count();
crate::slog!(
"[event-log] gap inserted sled={sled_id} from={from_id} to={to_id} \
sled_entries={sled_total} gap_count={gap_count}"
);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crdt_state::PipelineDoc;
use crate::pipeline_state::{PipelineEvent, PlanState, Stage, StoryId, TransitionFired};
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, OpState};
use bft_json_crdt::keypair::make_keypair;
use bft_json_crdt::op::ROOT_ID;
use serde_json::json;
fn make_fired(i: u32) -> TransitionFired {
TransitionFired {
story_id: StoryId(format!("test_{i}")),
before: Stage::Backlog,
after: Stage::Coding {
claim: None,
plan: PlanState::Missing,
retries: 0,
},
event: PipelineEvent::DepsMet,
at: chrono::Utc::now(),
}
}
/// AC4: fire N `TransitionFired` events, simulate a restart by re-initialising
/// the CRDT (replaying all ops on a fresh doc), assert all N entries appear in
/// the log in insertion order with monotonically increasing IDs.
#[test]
fn event_log_survives_crdt_reinit() {
let kp = make_keypair();
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
let sled_id = crate::crdt_state::hex::encode(&crdt1.id);
let n = 5usize;
let mut ops = Vec::new();
// Track the last OpId so each entry appends to the end (insert after
// ROOT_ID would place each entry at the front, reversing the sequence).
let mut last_id = ROOT_ID;
for i in 0..n {
let entry: JsonValue = json!({
"event_seq": i as f64,
"sled_id": &sled_id,
"timestamp": 1_000_000.0_f64 + i as f64,
"story_id": format!("story_{i}"),
"from_stage": "backlog",
"to_stage": "coding",
"pipeline_event": "DepsMet",
})
.into();
let op = crdt1.doc.event_log.insert(last_id, entry).sign(&kp);
last_id = op.inner.id;
assert_eq!(crdt1.apply(op.clone()), OpState::Ok);
ops.push(op);
}
assert_eq!(crdt1.doc.event_log.view().len(), n);
// Simulate restart: replay the same ops on a fresh CRDT instance.
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
for op in ops {
assert_eq!(crdt2.apply(op), OpState::Ok);
}
assert_eq!(
crdt2.doc.event_log.view().len(),
n,
"all {n} entries must survive CRDT re-init"
);
// Entries must appear in insertion order with monotonically increasing IDs.
for i in 0..n {
let entry = &crdt2.doc.event_log[i];
let seq = match entry.event_seq.view() {
JsonValue::Number(v) => v as u64,
other => panic!("expected numeric event_seq at index {i}, got {other:?}"),
};
assert_eq!(seq, i as u64, "event_seq must equal insertion index {i}");
assert_eq!(
entry.story_id.view(),
JsonValue::String(format!("story_{i}")),
"story_id mismatch at index {i}"
);
assert_eq!(
entry.sled_id.view(),
JsonValue::String(sled_id.clone()),
"sled_id mismatch at index {i}"
);
}
}
/// AC4: fill the feeder queue past capacity by inserting a gap sentinel, then
/// assert (a) the gap sentinel appears in the event log and (b) the assembled
/// context contains the human-readable gap line.
#[test]
fn gap_sentinel_in_log_and_assembled_context() {
crate::crdt_state::init_for_test();
// Log 3 real events (logical ids 0, 1, 2).
for i in 0..3u32 {
log_transition_event(&make_fired(i));
}
// Simulate: the feeder queue overflowed and logical ids 3..=5 were dropped.
insert_gap_sentinel(3, 5);
// Log one more real event after the gap.
log_transition_event(&make_fired(99));
// (a) Gap sentinel must appear in read_event_log().
let entries = read_event_log();
let gap = entries
.iter()
.find(|e| e.pipeline_event == crate::crdt_state::GAP_PIPELINE_EVENT);
assert!(gap.is_some(), "gap sentinel must be present in event log");
let gap = gap.unwrap();
// from_stage encodes the from EventId; to_stage encodes the to EventId.
assert_eq!(gap.from_stage, "3", "gap from_stage must be '3'");
assert_eq!(gap.to_stage, "5", "gap to_stage must be '5'");
// (b) assemble_prompt_context must render the gap line.
let ctx = crate::llm_session::assemble_prompt_context("room-gap-e2e");
assert!(
ctx.contains("events between 3 and 5 were dropped"),
"assembled context must contain gap line; got: {ctx}"
);
// Real events must also appear.
assert!(
ctx.contains("test_0"),
"first story must appear; got: {ctx}"
);
assert!(
ctx.contains("test_99"),
"last story must appear; got: {ctx}"
);
}
/// AC2: every `TransitionFired` event is written to the log without filtering.
#[test]
fn log_transition_event_appends_all_events() {
crate::crdt_state::init_for_test();
let n = 4u32;
for i in 0..n {
log_transition_event(&make_fired(i));
}
let entries = crate::crdt_state::read_all_event_log_entries();
assert_eq!(
entries.len(),
n as usize,
"expected {n} event log entries, got {}",
entries.len()
);
// Verify monotonic sequence numbers 0..n-1.
let mut seqs: Vec<u64> = entries.iter().map(|e| e.event_seq).collect();
seqs.sort_unstable();
let expected: Vec<u64> = (0..u64::from(n)).collect();
assert_eq!(seqs, expected, "event_seq values must be 0..{n}");
}
}