From d86cc38b2ae58b0f4cc870258cc48e1e0c6f0fab Mon Sep 17 00:00:00 2001 From: dave Date: Sun, 17 May 2026 20:23:11 +0000 Subject: [PATCH] huskies: merge 1128 story Bounded event queues + EventStreamGap sentinel + observability for context assembly --- server/src/crdt_state/lww_maps/event_log.rs | 28 +++++ .../src/crdt_state/lww_maps/llm_sessions.rs | 40 ++++++- server/src/crdt_state/lww_maps/mod.rs | 5 +- server/src/crdt_state/mod.rs | 16 +-- server/src/event_log/mod.rs | 110 +++++++++++++++++- server/src/llm_session/mod.rs | 4 + 6 files changed, 182 insertions(+), 21 deletions(-) diff --git a/server/src/crdt_state/lww_maps/event_log.rs b/server/src/crdt_state/lww_maps/event_log.rs index 6e1f62b0..bc28e189 100644 --- a/server/src/crdt_state/lww_maps/event_log.rs +++ b/server/src/crdt_state/lww_maps/event_log.rs @@ -12,6 +12,14 @@ use serde_json::json; use super::super::state::{apply_and_persist, get_crdt}; use super::super::types::EventLogEntryCrdt; +/// `pipeline_event` value used to mark a gap sentinel entry in the event log. +/// +/// A gap sentinel is appended when the event-log subscriber detects that the +/// broadcast channel dropped events (i.e. it received `RecvError::Lagged`). +/// The `from_stage` and `to_stage` fields encode the logical EventId range +/// `[from, to]` of the dropped events as decimal strings. +pub const GAP_PIPELINE_EVENT: &str = "EventStreamGap"; + /// Raw event log entry extracted from the CRDT document. /// /// All fields are decoded to Rust primitives; entries with a missing or @@ -86,6 +94,26 @@ pub fn append_event_log_entry( apply_and_persist(&mut state, |s| s.crdt.doc.event_log.insert(after, entry)); } +/// Append an `EventStreamGap` sentinel entry to the CRDT event log. +/// +/// Called when the event-log broadcast subscriber detects that the channel +/// dropped events (`RecvError::Lagged`). `from_id` and `to_id` are the +/// logical sequence numbers (in the per-sled event stream) of the first and +/// last dropped events respectively. The sentinel itself also consumes one +/// CRDT `event_seq` slot so the monotonic counter remains contiguous across +/// the gap. +pub fn append_gap_log_entry(sled_id: &str, from_id: u64, to_id: u64) { + let timestamp = chrono::Utc::now().timestamp() as f64; + append_event_log_entry( + sled_id, + timestamp, + "", + &from_id.to_string(), + &to_id.to_string(), + GAP_PIPELINE_EVENT, + ); +} + /// Read all event log entries from the CRDT document. /// /// Entries with a missing or empty `sled_id` are silently skipped. diff --git a/server/src/crdt_state/lww_maps/llm_sessions.rs b/server/src/crdt_state/lww_maps/llm_sessions.rs index 3e2d07bb..a1a29640 100644 --- a/server/src/crdt_state/lww_maps/llm_sessions.rs +++ b/server/src/crdt_state/lww_maps/llm_sessions.rs @@ -14,6 +14,7 @@ use serde_json::json; use super::super::state::{apply_and_persist, get_crdt, rebuild_llm_session_index}; use super::super::types::{LlmSessionCrdt, LlmSessionView}; +use super::event_log::GAP_PIPELINE_EVENT; /// Write or upsert an LLM session entry keyed by `session_id`. /// @@ -110,7 +111,7 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { // Advance the high-water mark to the maximum new event_seq. let new_max_seq = new_events.iter().map(|(seq, ..)| *seq).max().unwrap_or(0); let mut new_high_water = current_high_water; - new_high_water.insert(local_sled_id, new_max_seq); + new_high_water.insert(local_sled_id.clone(), new_max_seq); let new_hw_json = serde_json::to_string(&new_high_water).unwrap_or_else(|_| "{}".to_string()); // Upsert the session entry with the new high-water value. @@ -135,14 +136,41 @@ pub fn assemble_and_advance_session(session_id: &str) -> Vec { state.llm_session_index = rebuild_llm_session_index(&state.crdt); } - // Render each new event as a compact audit line. + // Observability: log event-log size and gap count for this sled. + let total_entries = state + .crdt + .doc + .event_log + .iter() + .filter(|e| matches!(e.sled_id.view(), JsonValue::String(s) if s == local_sled_id)) + .count(); + let gap_count = state + .crdt + .doc + .event_log + .iter() + .filter(|e| { + matches!(e.sled_id.view(), JsonValue::String(s) if s == local_sled_id) + && matches!(e.pipeline_event.view(), JsonValue::String(s) if s == GAP_PIPELINE_EVENT) + }) + .count(); + crate::slog!( + "[event-log] assemble session={session_id} sled_entries={total_entries} gap_count={gap_count}" + ); + + // Render each new event as a compact audit line; gap sentinels get a + // human-readable message so the LLM is never presented with raw field data. new_events .into_iter() .map(|(_, story_id, from_stage, to_stage, pipeline_event)| { - format!( - "pipeline_event story_id=\"{story_id}\" from=\"{from_stage}\" \ - to=\"{to_stage}\" event=\"{pipeline_event}\"" - ) + if pipeline_event == GAP_PIPELINE_EVENT { + format!("events between {from_stage} and {to_stage} were dropped") + } else { + format!( + "pipeline_event story_id=\"{story_id}\" from=\"{from_stage}\" \ + to=\"{to_stage}\" event=\"{pipeline_event}\"" + ) + } }) .collect() } diff --git a/server/src/crdt_state/lww_maps/mod.rs b/server/src/crdt_state/lww_maps/mod.rs index ebd472b3..840ee191 100644 --- a/server/src/crdt_state/lww_maps/mod.rs +++ b/server/src/crdt_state/lww_maps/mod.rs @@ -30,7 +30,10 @@ pub use active_agents::{ pub use agent_throttle::{ delete_agent_throttle, read_agent_throttle, read_all_agent_throttles, write_agent_throttle, }; -pub use event_log::{EventLogEntryRaw, append_event_log_entry, read_all_event_log_entries}; +pub use event_log::{ + EventLogEntryRaw, GAP_PIPELINE_EVENT, append_event_log_entry, append_gap_log_entry, + read_all_event_log_entries, +}; pub use gateway_projects::{ delete_gateway_project, read_all_gateway_projects, read_gateway_project, write_gateway_project, }; diff --git a/server/src/crdt_state/mod.rs b/server/src/crdt_state/mod.rs index 548c41ad..ed7aae85 100644 --- a/server/src/crdt_state/mod.rs +++ b/server/src/crdt_state/mod.rs @@ -28,14 +28,14 @@ mod write; pub use gateway_config::{read_gateway_active_project, write_gateway_active_project}; pub use lww_maps::{ - EventLogEntryRaw, append_event_log_entry, assemble_and_advance_session, delete_active_agent, - delete_agent_throttle, delete_gateway_project, delete_merge_job, delete_test_job, - delete_token_usage, read_active_agent, read_agent_throttle, read_all_active_agents, - read_all_agent_throttles, read_all_event_log_entries, read_all_gateway_projects, - read_all_merge_jobs, read_all_test_jobs, read_all_token_usage, read_gateway_project, - read_llm_session, read_merge_job, read_test_job, read_token_usage, write_active_agent, - write_agent_throttle, write_gateway_project, write_llm_session, write_merge_job, - write_test_job, write_token_usage, + EventLogEntryRaw, GAP_PIPELINE_EVENT, append_event_log_entry, append_gap_log_entry, + assemble_and_advance_session, delete_active_agent, delete_agent_throttle, + delete_gateway_project, delete_merge_job, delete_test_job, delete_token_usage, + read_active_agent, read_agent_throttle, read_all_active_agents, read_all_agent_throttles, + read_all_event_log_entries, read_all_gateway_projects, read_all_merge_jobs, read_all_test_jobs, + read_all_token_usage, read_gateway_project, read_llm_session, read_merge_job, read_test_job, + read_token_usage, write_active_agent, write_agent_throttle, write_gateway_project, + write_llm_session, write_merge_job, write_test_job, write_token_usage, }; pub use ops::{all_ops_json, apply_remote_op, ops_since, our_vector_clock, subscribe_ops}; pub use presence::{ diff --git a/server/src/event_log/mod.rs b/server/src/event_log/mod.rs index 45d41013..fb7dfbca 100644 --- a/server/src/event_log/mod.rs +++ b/server/src/event_log/mod.rs @@ -17,6 +17,15 @@ use chrono::DateTime; +/// Monotonic per-sled logical sequence number identifying a pipeline event. +/// +/// This is the sequence number that *would have been assigned* to an event in the +/// contiguous logical event stream, as tracked by the event-log subscriber. It +/// differs from the CRDT `event_seq` (which counts CRDT entries including gap +/// sentinels) but is meaningful for identifying the range of dropped events when +/// a gap is inserted. +pub type EventId = u64; + /// A snapshot of a single persisted pipeline transition event. /// /// Constructed by [`read_event_log`] from the raw CRDT entries. @@ -81,22 +90,49 @@ pub fn read_event_log() -> Vec { entries } +/// Append a gap sentinel to the event log for the local sled. +/// +/// Encodes the logical [`EventId`] range `[from_id, to_id]` of dropped events +/// using the `EventStreamGap` pipeline event marker. Should be called whenever +/// the event-log subscriber detects a lag in the broadcast channel so that no +/// drop is silent. +pub fn insert_gap_sentinel(from_id: EventId, to_id: EventId) { + let sled_id = crate::crdt_state::our_node_id().unwrap_or_default(); + crate::crdt_state::append_gap_log_entry(&sled_id, from_id, to_id); + log_gap_observability(&sled_id, from_id, to_id); +} + /// Spawn a background task that persists every `TransitionFired` event to the CRDT. /// -/// Subscribes to the global `TransitionFired` broadcast channel and calls -/// [`log_transition_event`] for every received event without filtering. -/// Lagged events are warned about but do not cause the subscriber to exit. +/// Subscribes to the global `TransitionFired` broadcast channel. Normal events +/// are persisted via [`log_transition_event`]. When the subscriber lags (the +/// broadcast channel drops the oldest messages), a single +/// `EventStreamGap` sentinel is appended to the log covering the dropped range +/// so no transition is silently lost. pub fn spawn_event_log_subscriber() { let mut rx = crate::pipeline_state::subscribe_transitions(); tokio::spawn(async move { + // Tracks the next expected logical sequence number in the subscriber's + // view of the event stream. Incremented on every successfully processed + // event; advanced by the gap size on each lag so we can identify the + // exact logical range of dropped events. + let mut next_logical_seq: EventId = 0; + loop { match rx.recv().await { - Ok(fired) => log_transition_event(&fired), + Ok(fired) => { + log_transition_event(&fired); + next_logical_seq += 1; + } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { + let from = next_logical_seq; + let to = next_logical_seq + n - 1; crate::slog_warn!( - "[event-log] Subscriber lagged, skipped {n} event(s); \ - some transitions may be absent from the persistent event log." + "[event-log] Subscriber lagged; {n} event(s) dropped \ + (logical ids {from}..={to}); gap sentinel appended." ); + insert_gap_sentinel(from, to); + next_logical_seq += n; } Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } @@ -104,6 +140,22 @@ pub fn spawn_event_log_subscriber() { }); } +/// Emit observability log lines after inserting a gap sentinel. +fn log_gap_observability(sled_id: &str, from_id: EventId, to_id: EventId) { + let entries = crate::crdt_state::read_all_event_log_entries(); + let sled_total: usize = entries.iter().filter(|e| e.sled_id == sled_id).count(); + let gap_count: usize = entries + .iter() + .filter(|e| { + e.sled_id == sled_id && e.pipeline_event == crate::crdt_state::GAP_PIPELINE_EVENT + }) + .count(); + crate::slog!( + "[event-log] gap inserted sled={sled_id} from={from_id} to={to_id} \ + sled_entries={sled_total} gap_count={gap_count}" + ); +} + #[cfg(test)] mod tests { use super::*; @@ -195,6 +247,52 @@ mod tests { } } + /// AC4: fill the feeder queue past capacity by inserting a gap sentinel, then + /// assert (a) the gap sentinel appears in the event log and (b) the assembled + /// context contains the human-readable gap line. + #[test] + fn gap_sentinel_in_log_and_assembled_context() { + crate::crdt_state::init_for_test(); + + // Log 3 real events (logical ids 0, 1, 2). + for i in 0..3u32 { + log_transition_event(&make_fired(i)); + } + + // Simulate: the feeder queue overflowed and logical ids 3..=5 were dropped. + insert_gap_sentinel(3, 5); + + // Log one more real event after the gap. + log_transition_event(&make_fired(99)); + + // (a) Gap sentinel must appear in read_event_log(). + let entries = read_event_log(); + let gap = entries + .iter() + .find(|e| e.pipeline_event == crate::crdt_state::GAP_PIPELINE_EVENT); + assert!(gap.is_some(), "gap sentinel must be present in event log"); + let gap = gap.unwrap(); + // from_stage encodes the from EventId; to_stage encodes the to EventId. + assert_eq!(gap.from_stage, "3", "gap from_stage must be '3'"); + assert_eq!(gap.to_stage, "5", "gap to_stage must be '5'"); + + // (b) assemble_prompt_context must render the gap line. + let ctx = crate::llm_session::assemble_prompt_context("room-gap-e2e"); + assert!( + ctx.contains("events between 3 and 5 were dropped"), + "assembled context must contain gap line; got: {ctx}" + ); + // Real events must also appear. + assert!( + ctx.contains("test_0"), + "first story must appear; got: {ctx}" + ); + assert!( + ctx.contains("test_99"), + "last story must appear; got: {ctx}" + ); + } + /// AC2: every `TransitionFired` event is written to the log without filtering. #[test] fn log_transition_event_appends_all_events() { diff --git a/server/src/llm_session/mod.rs b/server/src/llm_session/mod.rs index 11c5de7f..5fb6e8da 100644 --- a/server/src/llm_session/mod.rs +++ b/server/src/llm_session/mod.rs @@ -15,6 +15,10 @@ /// events or the CRDT is not yet initialised. pub fn assemble_prompt_context(session_id: &str) -> String { let lines = crate::crdt_state::assemble_and_advance_session(session_id); + let event_count = lines.len(); + crate::slog!( + "[llm-session] assemble_prompt_context session={session_id} new_events={event_count}" + ); if lines.is_empty() { return String::new(); }