diff --git a/server/src/service/status/buffer.rs b/server/src/service/status/buffer.rs new file mode 100644 index 00000000..971d271a --- /dev/null +++ b/server/src/service/status/buffer.rs @@ -0,0 +1,356 @@ +//! [`StatusEventBuffer`] — bounded, per-instance accumulator over a +//! [`StatusBroadcaster`]. +// Infrastructure module: public items are not yet wired to production call +// sites but are exercised by the unit tests below. +#![allow(dead_code)] +//! +//! A `StatusEventBuffer` subscribes to a [`StatusBroadcaster`] on construction +//! and silently accumulates incoming [`StatusEvent`]s into a bounded +//! [`VecDeque`]. Callers retrieve events later via [`StatusEventBuffer::drain`] +//! or discard them with [`StatusEventBuffer::clear`]. +//! +//! # Capacity and overflow +//! +//! The buffer has a configurable capacity (default [`DEFAULT_CAPACITY`]). When +//! a new event would exceed the limit, the **oldest** event is evicted and a +//! truncation counter is incremented. The next call to +//! [`StatusEventBuffer::drain`] prepends a [`BufferedItem::Truncated`] marker +//! so that callers know events were lost. +//! +//! # No side effects on arrival +//! +//! Incoming events only update the internal [`VecDeque`]; no callbacks, model +//! calls, or other logic are invoked. The background receive task is spawned +//! on construction and aborted when the buffer is dropped. +//! +//! # Two-buffer independence +//! +//! Each buffer holds its own [`Subscription`] handle, so two buffers +//! constructed over the same broadcaster both receive every event +//! independently. + +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; + +use super::{StatusBroadcaster, StatusEvent}; + +/// Default maximum number of events the buffer retains before evicting the oldest. +pub const DEFAULT_CAPACITY: usize = 100; + +// ── BufferedItem ────────────────────────────────────────────────────────────── + +/// An item returned by [`StatusEventBuffer::drain`]. +#[derive(Debug)] +pub enum BufferedItem { + /// One or more older events were evicted to make room; `N` is the total + /// count of evicted events since the last [`drain`](StatusEventBuffer::drain). + Truncated(usize), + /// A pipeline status event accumulated by the buffer. + Event(StatusEvent), +} + +// ── Inner ───────────────────────────────────────────────────────────────────── + +struct Inner { + events: VecDeque, + capacity: usize, + /// Number of events evicted since the last `drain`. + truncated: usize, +} + +impl Inner { + fn new(capacity: usize) -> Self { + Self { + events: VecDeque::new(), + capacity, + truncated: 0, + } + } + + fn push(&mut self, event: StatusEvent) { + if self.events.len() >= self.capacity { + self.events.pop_front(); + self.truncated += 1; + } + self.events.push_back(event); + } + + fn drain(&mut self) -> Vec { + let extra = if self.truncated > 0 { 1 } else { 0 }; + let mut out = Vec::with_capacity(self.events.len() + extra); + if self.truncated > 0 { + out.push(BufferedItem::Truncated(self.truncated)); + self.truncated = 0; + } + out.extend(self.events.drain(..).map(BufferedItem::Event)); + out + } + + fn clear(&mut self) { + self.events.clear(); + self.truncated = 0; + } +} + +// ── StatusEventBuffer ───────────────────────────────────────────────────────── + +/// A bounded, per-instance accumulator that subscribes to a +/// [`StatusBroadcaster`] and stores incoming events without triggering any +/// side effects. +/// +/// Construct with [`StatusEventBuffer::new`] (default capacity) or +/// [`StatusEventBuffer::with_capacity`]. Retrieve accumulated events with +/// [`drain`](StatusEventBuffer::drain) or discard them with +/// [`clear`](StatusEventBuffer::clear). +pub struct StatusEventBuffer { + inner: Arc>, + // Held only to abort the background receive task on drop. + _task: tokio::task::JoinHandle<()>, +} + +impl StatusEventBuffer { + /// Create a buffer with [`DEFAULT_CAPACITY`] subscribed to `broadcaster`. + pub fn new(broadcaster: &StatusBroadcaster) -> Self { + Self::with_capacity(broadcaster, DEFAULT_CAPACITY) + } + + /// Create a buffer with a custom `capacity` subscribed to `broadcaster`. + /// + /// When the buffer is full and a new event arrives, the oldest stored event + /// is dropped and the truncation counter is incremented. + pub fn with_capacity(broadcaster: &StatusBroadcaster, capacity: usize) -> Self { + let inner = Arc::new(Mutex::new(Inner::new(capacity))); + let inner_clone = Arc::clone(&inner); + let mut sub = broadcaster.subscribe(); + + let task = tokio::spawn(async move { + while let Some(event) = sub.recv().await { + let mut guard = inner_clone + .lock() + .expect("StatusEventBuffer mutex poisoned"); + guard.push(event); + } + }); + + Self { inner, _task: task } + } + + /// Return and remove all accumulated events in arrival order. + /// + /// If any events were evicted due to overflow since the last call, the + /// returned `Vec` is **prepended** with a [`BufferedItem::Truncated`] entry + /// indicating how many events were lost. + pub fn drain(&self) -> Vec { + self.inner + .lock() + .expect("StatusEventBuffer mutex poisoned") + .drain() + } + + /// Discard all accumulated events and reset the truncation counter. + pub fn clear(&self) { + self.inner + .lock() + .expect("StatusEventBuffer mutex poisoned") + .clear(); + } +} + +impl Drop for StatusEventBuffer { + fn drop(&mut self) { + self._task.abort(); + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use tokio::time::{Duration, sleep}; + + fn make_event(id: &str) -> StatusEvent { + StatusEvent::MergeFailure { + story_id: id.to_string(), + story_name: None, + reason: "test".to_string(), + } + } + + /// Helper: publish `n` events and yield so the background task can process them. + async fn publish_n(broadcaster: &StatusBroadcaster, n: usize) { + for i in 0..n { + broadcaster.publish(make_event(&format!("story_{i}"))); + } + // Yield to let the background receive task process the events. + sleep(Duration::from_millis(20)).await; + } + + // ── arrival accumulation ────────────────────────────────────────────────── + + #[tokio::test] + async fn events_accumulate_in_order() { + let bc = StatusBroadcaster::new(); + let buf = StatusEventBuffer::new(&bc); + + publish_n(&bc, 3).await; + + let items = buf.drain(); + assert_eq!(items.len(), 3); + for (i, item) in items.iter().enumerate() { + match item { + BufferedItem::Event(StatusEvent::MergeFailure { story_id, .. }) => { + assert_eq!(story_id, &format!("story_{i}")); + } + other => panic!("unexpected item: {other:?}"), + } + } + } + + // ── drain returns events then clears ────────────────────────────────────── + + #[tokio::test] + async fn drain_clears_the_buffer() { + let bc = StatusBroadcaster::new(); + let buf = StatusEventBuffer::new(&bc); + + publish_n(&bc, 2).await; + + let first = buf.drain(); + assert_eq!(first.len(), 2); + + // Second drain on an empty buffer returns nothing. + let second = buf.drain(); + assert!(second.is_empty(), "drain should clear the buffer"); + } + + // ── overflow truncation marker ──────────────────────────────────────────── + + #[tokio::test] + async fn overflow_prepends_truncation_marker() { + let bc = StatusBroadcaster::new(); + // Capacity 3: publishing 5 events should evict 2. + let buf = StatusEventBuffer::with_capacity(&bc, 3); + + publish_n(&bc, 5).await; + + let items = buf.drain(); + // First item must be a Truncated(2) marker. + match items.first() { + Some(BufferedItem::Truncated(n)) => assert_eq!(*n, 2, "should report 2 truncated"), + other => panic!("expected Truncated, got {other:?}"), + } + // Remaining 3 items are the last 3 events (story_2, story_3, story_4). + assert_eq!(items.len(), 4, "truncation marker + 3 events"); + match &items[1] { + BufferedItem::Event(StatusEvent::MergeFailure { story_id, .. }) => { + assert_eq!(story_id, "story_2"); + } + other => panic!("unexpected: {other:?}"), + } + } + + #[tokio::test] + async fn truncation_counter_resets_after_drain() { + let bc = StatusBroadcaster::new(); + let buf = StatusEventBuffer::with_capacity(&bc, 2); + + // Overflow: publish 4 events into a capacity-2 buffer. + publish_n(&bc, 4).await; + + let first_drain = buf.drain(); + assert!( + matches!(first_drain[0], BufferedItem::Truncated(_)), + "first drain should have truncation marker" + ); + + // Publish 1 more event — no overflow this time. + publish_n(&bc, 1).await; + + let second_drain = buf.drain(); + assert_eq!(second_drain.len(), 1, "one event, no truncation marker"); + assert!(matches!(second_drain[0], BufferedItem::Event(_))); + } + + // ── no-trigger invariant ────────────────────────────────────────────────── + + /// Events accumulate in the buffer without invoking any side effect. + /// We verify this by checking that `drain()` returns exactly the events + /// published and no external state (represented by a counter) was modified. + #[tokio::test] + async fn event_arrival_has_no_side_effects_beyond_accumulation() { + let bc = StatusBroadcaster::new(); + let buf = StatusEventBuffer::new(&bc); + + // A shared flag that should NEVER be set by buffer internals. + let side_effect_flag = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + // Publish 10 events — buffer accumulates silently. + publish_n(&bc, 10).await; + + // The flag was never touched. + assert!( + !side_effect_flag.load(std::sync::atomic::Ordering::Relaxed), + "no side effects should have occurred" + ); + + // All 10 events accumulated. + let items = buf.drain(); + assert_eq!(items.len(), 10); + for item in &items { + assert!( + matches!(item, BufferedItem::Event(_)), + "all items should be events" + ); + } + } + + // ── two-buffer independence ─────────────────────────────────────────────── + + /// Two buffers on the same broadcaster each receive the same events independently. + #[tokio::test] + async fn two_buffers_on_same_broadcaster_see_same_events() { + let bc = StatusBroadcaster::new(); + let buf_a = StatusEventBuffer::new(&bc); + let buf_b = StatusEventBuffer::new(&bc); + + publish_n(&bc, 4).await; + + let items_a = buf_a.drain(); + let items_b = buf_b.drain(); + + assert_eq!(items_a.len(), 4, "buf_a should see 4 events"); + assert_eq!(items_b.len(), 4, "buf_b should see 4 events"); + + // Both buffers see events in the same order. + for (a, b) in items_a.iter().zip(items_b.iter()) { + match (a, b) { + ( + BufferedItem::Event(StatusEvent::MergeFailure { story_id: ia, .. }), + BufferedItem::Event(StatusEvent::MergeFailure { story_id: ib, .. }), + ) => assert_eq!(ia, ib, "both buffers should see events in the same order"), + _ => panic!("unexpected item types"), + } + } + } + + // ── clear ───────────────────────────────────────────────────────────────── + + #[tokio::test] + async fn clear_discards_accumulated_events_and_truncation_count() { + let bc = StatusBroadcaster::new(); + let buf = StatusEventBuffer::with_capacity(&bc, 2); + + // Overflow to set truncation counter. + publish_n(&bc, 4).await; + + buf.clear(); + + // After clear, drain should return nothing. + let items = buf.drain(); + assert!( + items.is_empty(), + "clear should discard events and truncation marker" + ); + } +} diff --git a/server/src/service/status/mod.rs b/server/src/service/status/mod.rs index a964a40a..59bdc008 100644 --- a/server/src/service/status/mod.rs +++ b/server/src/service/status/mod.rs @@ -23,6 +23,7 @@ //! - Story 643 (Web UI): calls `subscribe()` once at startup. //! - Story 644 (chat transports): calls `subscribe()` once per transport. +pub mod buffer; pub mod format; use chrono::{DateTime, Utc};