223 lines
8.4 KiB
Rust
223 lines
8.4 KiB
Rust
|
|
//! Pipeline transition event log — persists every `TransitionFired` event into
|
||
|
|
//! the CRDT so the log survives server restarts and replicates across nodes.
|
||
|
|
//!
|
||
|
|
//! ## Design
|
||
|
|
//!
|
||
|
|
//! Each [`TransitionFired`][crate::pipeline_state::TransitionFired] is written
|
||
|
|
//! as an [`EventLogEntryCrdt`][crate::crdt_state::EventLogEntryCrdt] entry in
|
||
|
|
//! the `PipelineDoc::event_log` grow-only list. Because the list is backed by
|
||
|
|
//! CRDT ops that are persisted to SQLite and replayed on startup, the log
|
||
|
|
//! survives `rebuild_and_restart` without any additional bookkeeping.
|
||
|
|
//!
|
||
|
|
//! A monotonic per-sled sequence number (`event_seq`) is computed atomically
|
||
|
|
//! while the CRDT lock is held, guaranteeing that no two entries from the same
|
||
|
|
//! sled share a sequence number and that the numbers are contiguous from 0.
|
||
|
|
|
||
|
|
#![allow(dead_code)]
|
||
|
|
|
||
|
|
use chrono::DateTime;
|
||
|
|
|
||
|
|
/// A snapshot of a single persisted pipeline transition event.
|
||
|
|
///
|
||
|
|
/// Constructed by [`read_event_log`] from the raw CRDT entries.
|
||
|
|
pub struct LoggedEvent {
|
||
|
|
/// Monotonic sequence number for `sled_id` (0-based, contiguous).
|
||
|
|
pub event_id: u64,
|
||
|
|
/// Hex-encoded Ed25519 public key of the sled that recorded this event.
|
||
|
|
pub sled_id: String,
|
||
|
|
/// UTC timestamp when the transition fired.
|
||
|
|
pub at: DateTime<chrono::Utc>,
|
||
|
|
/// Story ID of the work item that transitioned.
|
||
|
|
pub story_id: String,
|
||
|
|
/// Human-readable label of the stage before the transition.
|
||
|
|
pub from_stage: String,
|
||
|
|
/// Human-readable label of the stage after the transition.
|
||
|
|
pub to_stage: String,
|
||
|
|
/// String label of the `PipelineEvent` variant that triggered the transition.
|
||
|
|
pub pipeline_event: String,
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Write a single `TransitionFired` event into the CRDT event log.
|
||
|
|
///
|
||
|
|
/// Computes the next monotonic `event_seq` for this sled atomically inside
|
||
|
|
/// the CRDT write lock and appends the entry. No-ops when the CRDT is not
|
||
|
|
/// yet initialised (e.g. in gateway mode with no project root).
|
||
|
|
pub fn log_transition_event(fired: &crate::pipeline_state::TransitionFired) {
|
||
|
|
let sled_id = crate::crdt_state::our_node_id().unwrap_or_default();
|
||
|
|
let timestamp = fired.at.timestamp() as f64;
|
||
|
|
let from_stage = crate::pipeline_state::stage_label(&fired.before);
|
||
|
|
let to_stage = crate::pipeline_state::stage_label(&fired.after);
|
||
|
|
let pipeline_event = crate::pipeline_state::event_label(&fired.event);
|
||
|
|
|
||
|
|
crate::crdt_state::append_event_log_entry(
|
||
|
|
&sled_id,
|
||
|
|
timestamp,
|
||
|
|
&fired.story_id.0,
|
||
|
|
from_stage,
|
||
|
|
to_stage,
|
||
|
|
pipeline_event,
|
||
|
|
);
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Read all persisted events from the CRDT event log.
|
||
|
|
///
|
||
|
|
/// Entries are returned sorted by `(sled_id, event_id)` so that events from
|
||
|
|
/// each sled appear in monotonic order. Entries with malformed CRDT fields
|
||
|
|
/// are silently dropped.
|
||
|
|
pub fn read_event_log() -> Vec<LoggedEvent> {
|
||
|
|
let mut entries: Vec<LoggedEvent> = crate::crdt_state::read_all_event_log_entries()
|
||
|
|
.into_iter()
|
||
|
|
.map(|raw| LoggedEvent {
|
||
|
|
event_id: raw.event_seq,
|
||
|
|
sled_id: raw.sled_id,
|
||
|
|
at: DateTime::from_timestamp(raw.timestamp as i64, 0).unwrap_or_default(),
|
||
|
|
story_id: raw.story_id,
|
||
|
|
from_stage: raw.from_stage,
|
||
|
|
to_stage: raw.to_stage,
|
||
|
|
pipeline_event: raw.pipeline_event,
|
||
|
|
})
|
||
|
|
.collect();
|
||
|
|
entries.sort_by(|a, b| a.sled_id.cmp(&b.sled_id).then(a.event_id.cmp(&b.event_id)));
|
||
|
|
entries
|
||
|
|
}
|
||
|
|
|
||
|
|
/// Spawn a background task that persists every `TransitionFired` event to the CRDT.
|
||
|
|
///
|
||
|
|
/// Subscribes to the global `TransitionFired` broadcast channel and calls
|
||
|
|
/// [`log_transition_event`] for every received event without filtering.
|
||
|
|
/// Lagged events are warned about but do not cause the subscriber to exit.
|
||
|
|
pub fn spawn_event_log_subscriber() {
|
||
|
|
let mut rx = crate::pipeline_state::subscribe_transitions();
|
||
|
|
tokio::spawn(async move {
|
||
|
|
loop {
|
||
|
|
match rx.recv().await {
|
||
|
|
Ok(fired) => log_transition_event(&fired),
|
||
|
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||
|
|
crate::slog_warn!(
|
||
|
|
"[event-log] Subscriber lagged, skipped {n} event(s); \
|
||
|
|
some transitions may be absent from the persistent event log."
|
||
|
|
);
|
||
|
|
}
|
||
|
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
||
|
|
}
|
||
|
|
}
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
#[cfg(test)]
|
||
|
|
mod tests {
|
||
|
|
use super::*;
|
||
|
|
use crate::crdt_state::PipelineDoc;
|
||
|
|
use crate::pipeline_state::{PipelineEvent, PlanState, Stage, StoryId, TransitionFired};
|
||
|
|
use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode, JsonValue, OpState};
|
||
|
|
use bft_json_crdt::keypair::make_keypair;
|
||
|
|
use bft_json_crdt::op::ROOT_ID;
|
||
|
|
use serde_json::json;
|
||
|
|
|
||
|
|
fn make_fired(i: u32) -> TransitionFired {
|
||
|
|
TransitionFired {
|
||
|
|
story_id: StoryId(format!("test_{i}")),
|
||
|
|
before: Stage::Backlog,
|
||
|
|
after: Stage::Coding {
|
||
|
|
claim: None,
|
||
|
|
plan: PlanState::Missing,
|
||
|
|
retries: 0,
|
||
|
|
},
|
||
|
|
event: PipelineEvent::DepsMet,
|
||
|
|
at: chrono::Utc::now(),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// AC4: fire N `TransitionFired` events, simulate a restart by re-initialising
|
||
|
|
/// the CRDT (replaying all ops on a fresh doc), assert all N entries appear in
|
||
|
|
/// the log in insertion order with monotonically increasing IDs.
|
||
|
|
#[test]
|
||
|
|
fn event_log_survives_crdt_reinit() {
|
||
|
|
let kp = make_keypair();
|
||
|
|
let mut crdt1 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||
|
|
let sled_id = crate::crdt_state::hex::encode(&crdt1.id);
|
||
|
|
|
||
|
|
let n = 5usize;
|
||
|
|
let mut ops = Vec::new();
|
||
|
|
// Track the last OpId so each entry appends to the end (insert after
|
||
|
|
// ROOT_ID would place each entry at the front, reversing the sequence).
|
||
|
|
let mut last_id = ROOT_ID;
|
||
|
|
|
||
|
|
for i in 0..n {
|
||
|
|
let entry: JsonValue = json!({
|
||
|
|
"event_seq": i as f64,
|
||
|
|
"sled_id": &sled_id,
|
||
|
|
"timestamp": 1_000_000.0_f64 + i as f64,
|
||
|
|
"story_id": format!("story_{i}"),
|
||
|
|
"from_stage": "backlog",
|
||
|
|
"to_stage": "coding",
|
||
|
|
"pipeline_event": "DepsMet",
|
||
|
|
})
|
||
|
|
.into();
|
||
|
|
let op = crdt1.doc.event_log.insert(last_id, entry).sign(&kp);
|
||
|
|
last_id = op.inner.id;
|
||
|
|
assert_eq!(crdt1.apply(op.clone()), OpState::Ok);
|
||
|
|
ops.push(op);
|
||
|
|
}
|
||
|
|
|
||
|
|
assert_eq!(crdt1.doc.event_log.view().len(), n);
|
||
|
|
|
||
|
|
// Simulate restart: replay the same ops on a fresh CRDT instance.
|
||
|
|
let mut crdt2 = BaseCrdt::<PipelineDoc>::new(&kp);
|
||
|
|
for op in ops {
|
||
|
|
assert_eq!(crdt2.apply(op), OpState::Ok);
|
||
|
|
}
|
||
|
|
|
||
|
|
assert_eq!(
|
||
|
|
crdt2.doc.event_log.view().len(),
|
||
|
|
n,
|
||
|
|
"all {n} entries must survive CRDT re-init"
|
||
|
|
);
|
||
|
|
|
||
|
|
// Entries must appear in insertion order with monotonically increasing IDs.
|
||
|
|
for i in 0..n {
|
||
|
|
let entry = &crdt2.doc.event_log[i];
|
||
|
|
let seq = match entry.event_seq.view() {
|
||
|
|
JsonValue::Number(v) => v as u64,
|
||
|
|
other => panic!("expected numeric event_seq at index {i}, got {other:?}"),
|
||
|
|
};
|
||
|
|
assert_eq!(seq, i as u64, "event_seq must equal insertion index {i}");
|
||
|
|
assert_eq!(
|
||
|
|
entry.story_id.view(),
|
||
|
|
JsonValue::String(format!("story_{i}")),
|
||
|
|
"story_id mismatch at index {i}"
|
||
|
|
);
|
||
|
|
assert_eq!(
|
||
|
|
entry.sled_id.view(),
|
||
|
|
JsonValue::String(sled_id.clone()),
|
||
|
|
"sled_id mismatch at index {i}"
|
||
|
|
);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/// AC2: every `TransitionFired` event is written to the log without filtering.
|
||
|
|
#[test]
|
||
|
|
fn log_transition_event_appends_all_events() {
|
||
|
|
crate::crdt_state::init_for_test();
|
||
|
|
|
||
|
|
let n = 4u32;
|
||
|
|
for i in 0..n {
|
||
|
|
log_transition_event(&make_fired(i));
|
||
|
|
}
|
||
|
|
|
||
|
|
let entries = crate::crdt_state::read_all_event_log_entries();
|
||
|
|
assert_eq!(
|
||
|
|
entries.len(),
|
||
|
|
n as usize,
|
||
|
|
"expected {n} event log entries, got {}",
|
||
|
|
entries.len()
|
||
|
|
);
|
||
|
|
|
||
|
|
// Verify monotonic sequence numbers 0..n-1.
|
||
|
|
let mut seqs: Vec<u64> = entries.iter().map(|e| e.event_seq).collect();
|
||
|
|
seqs.sort_unstable();
|
||
|
|
let expected: Vec<u64> = (0..u64::from(n)).collect();
|
||
|
|
assert_eq!(seqs, expected, "event_seq values must be 0..{n}");
|
||
|
|
}
|
||
|
|
}
|