huskies: merge 716_story_statuseventbuffer_bounded_per_instance_buffer_over_services_status_broadcaster
This commit is contained in:
@@ -0,0 +1,356 @@
|
||||
//! [`StatusEventBuffer`] — bounded, per-instance accumulator over a
|
||||
//! [`StatusBroadcaster`].
|
||||
// Infrastructure module: public items are not yet wired to production call
|
||||
// sites but are exercised by the unit tests below.
|
||||
#![allow(dead_code)]
|
||||
//!
|
||||
//! A `StatusEventBuffer` subscribes to a [`StatusBroadcaster`] on construction
|
||||
//! and silently accumulates incoming [`StatusEvent`]s into a bounded
|
||||
//! [`VecDeque`]. Callers retrieve events later via [`StatusEventBuffer::drain`]
|
||||
//! or discard them with [`StatusEventBuffer::clear`].
|
||||
//!
|
||||
//! # Capacity and overflow
|
||||
//!
|
||||
//! The buffer has a configurable capacity (default [`DEFAULT_CAPACITY`]). When
|
||||
//! a new event would exceed the limit, the **oldest** event is evicted and a
|
||||
//! truncation counter is incremented. The next call to
|
||||
//! [`StatusEventBuffer::drain`] prepends a [`BufferedItem::Truncated`] marker
|
||||
//! so that callers know events were lost.
|
||||
//!
|
||||
//! # No side effects on arrival
|
||||
//!
|
||||
//! Incoming events only update the internal [`VecDeque`]; no callbacks, model
|
||||
//! calls, or other logic are invoked. The background receive task is spawned
|
||||
//! on construction and aborted when the buffer is dropped.
|
||||
//!
|
||||
//! # Two-buffer independence
|
||||
//!
|
||||
//! Each buffer holds its own [`Subscription`] handle, so two buffers
|
||||
//! constructed over the same broadcaster both receive every event
|
||||
//! independently.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use super::{StatusBroadcaster, StatusEvent};
|
||||
|
||||
/// Default maximum number of events the buffer retains before evicting the oldest.
|
||||
pub const DEFAULT_CAPACITY: usize = 100;
|
||||
|
||||
// ── BufferedItem ──────────────────────────────────────────────────────────────
|
||||
|
||||
/// An item returned by [`StatusEventBuffer::drain`].
|
||||
#[derive(Debug)]
|
||||
pub enum BufferedItem {
|
||||
/// One or more older events were evicted to make room; `N` is the total
|
||||
/// count of evicted events since the last [`drain`](StatusEventBuffer::drain).
|
||||
Truncated(usize),
|
||||
/// A pipeline status event accumulated by the buffer.
|
||||
Event(StatusEvent),
|
||||
}
|
||||
|
||||
// ── Inner ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
struct Inner {
|
||||
events: VecDeque<StatusEvent>,
|
||||
capacity: usize,
|
||||
/// Number of events evicted since the last `drain`.
|
||||
truncated: usize,
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
events: VecDeque::new(),
|
||||
capacity,
|
||||
truncated: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn push(&mut self, event: StatusEvent) {
|
||||
if self.events.len() >= self.capacity {
|
||||
self.events.pop_front();
|
||||
self.truncated += 1;
|
||||
}
|
||||
self.events.push_back(event);
|
||||
}
|
||||
|
||||
fn drain(&mut self) -> Vec<BufferedItem> {
|
||||
let extra = if self.truncated > 0 { 1 } else { 0 };
|
||||
let mut out = Vec::with_capacity(self.events.len() + extra);
|
||||
if self.truncated > 0 {
|
||||
out.push(BufferedItem::Truncated(self.truncated));
|
||||
self.truncated = 0;
|
||||
}
|
||||
out.extend(self.events.drain(..).map(BufferedItem::Event));
|
||||
out
|
||||
}
|
||||
|
||||
fn clear(&mut self) {
|
||||
self.events.clear();
|
||||
self.truncated = 0;
|
||||
}
|
||||
}
|
||||
|
||||
// ── StatusEventBuffer ─────────────────────────────────────────────────────────
|
||||
|
||||
/// A bounded, per-instance accumulator that subscribes to a
|
||||
/// [`StatusBroadcaster`] and stores incoming events without triggering any
|
||||
/// side effects.
|
||||
///
|
||||
/// Construct with [`StatusEventBuffer::new`] (default capacity) or
|
||||
/// [`StatusEventBuffer::with_capacity`]. Retrieve accumulated events with
|
||||
/// [`drain`](StatusEventBuffer::drain) or discard them with
|
||||
/// [`clear`](StatusEventBuffer::clear).
|
||||
pub struct StatusEventBuffer {
|
||||
inner: Arc<Mutex<Inner>>,
|
||||
// Held only to abort the background receive task on drop.
|
||||
_task: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl StatusEventBuffer {
|
||||
/// Create a buffer with [`DEFAULT_CAPACITY`] subscribed to `broadcaster`.
|
||||
pub fn new(broadcaster: &StatusBroadcaster) -> Self {
|
||||
Self::with_capacity(broadcaster, DEFAULT_CAPACITY)
|
||||
}
|
||||
|
||||
/// Create a buffer with a custom `capacity` subscribed to `broadcaster`.
|
||||
///
|
||||
/// When the buffer is full and a new event arrives, the oldest stored event
|
||||
/// is dropped and the truncation counter is incremented.
|
||||
pub fn with_capacity(broadcaster: &StatusBroadcaster, capacity: usize) -> Self {
|
||||
let inner = Arc::new(Mutex::new(Inner::new(capacity)));
|
||||
let inner_clone = Arc::clone(&inner);
|
||||
let mut sub = broadcaster.subscribe();
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
while let Some(event) = sub.recv().await {
|
||||
let mut guard = inner_clone
|
||||
.lock()
|
||||
.expect("StatusEventBuffer mutex poisoned");
|
||||
guard.push(event);
|
||||
}
|
||||
});
|
||||
|
||||
Self { inner, _task: task }
|
||||
}
|
||||
|
||||
/// Return and remove all accumulated events in arrival order.
|
||||
///
|
||||
/// If any events were evicted due to overflow since the last call, the
|
||||
/// returned `Vec` is **prepended** with a [`BufferedItem::Truncated`] entry
|
||||
/// indicating how many events were lost.
|
||||
pub fn drain(&self) -> Vec<BufferedItem> {
|
||||
self.inner
|
||||
.lock()
|
||||
.expect("StatusEventBuffer mutex poisoned")
|
||||
.drain()
|
||||
}
|
||||
|
||||
/// Discard all accumulated events and reset the truncation counter.
|
||||
pub fn clear(&self) {
|
||||
self.inner
|
||||
.lock()
|
||||
.expect("StatusEventBuffer mutex poisoned")
|
||||
.clear();
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for StatusEventBuffer {
|
||||
fn drop(&mut self) {
|
||||
self._task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
// ── Tests ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::time::{Duration, sleep};
|
||||
|
||||
fn make_event(id: &str) -> StatusEvent {
|
||||
StatusEvent::MergeFailure {
|
||||
story_id: id.to_string(),
|
||||
story_name: None,
|
||||
reason: "test".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper: publish `n` events and yield so the background task can process them.
|
||||
async fn publish_n(broadcaster: &StatusBroadcaster, n: usize) {
|
||||
for i in 0..n {
|
||||
broadcaster.publish(make_event(&format!("story_{i}")));
|
||||
}
|
||||
// Yield to let the background receive task process the events.
|
||||
sleep(Duration::from_millis(20)).await;
|
||||
}
|
||||
|
||||
// ── arrival accumulation ──────────────────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn events_accumulate_in_order() {
|
||||
let bc = StatusBroadcaster::new();
|
||||
let buf = StatusEventBuffer::new(&bc);
|
||||
|
||||
publish_n(&bc, 3).await;
|
||||
|
||||
let items = buf.drain();
|
||||
assert_eq!(items.len(), 3);
|
||||
for (i, item) in items.iter().enumerate() {
|
||||
match item {
|
||||
BufferedItem::Event(StatusEvent::MergeFailure { story_id, .. }) => {
|
||||
assert_eq!(story_id, &format!("story_{i}"));
|
||||
}
|
||||
other => panic!("unexpected item: {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── drain returns events then clears ──────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn drain_clears_the_buffer() {
|
||||
let bc = StatusBroadcaster::new();
|
||||
let buf = StatusEventBuffer::new(&bc);
|
||||
|
||||
publish_n(&bc, 2).await;
|
||||
|
||||
let first = buf.drain();
|
||||
assert_eq!(first.len(), 2);
|
||||
|
||||
// Second drain on an empty buffer returns nothing.
|
||||
let second = buf.drain();
|
||||
assert!(second.is_empty(), "drain should clear the buffer");
|
||||
}
|
||||
|
||||
// ── overflow truncation marker ────────────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn overflow_prepends_truncation_marker() {
|
||||
let bc = StatusBroadcaster::new();
|
||||
// Capacity 3: publishing 5 events should evict 2.
|
||||
let buf = StatusEventBuffer::with_capacity(&bc, 3);
|
||||
|
||||
publish_n(&bc, 5).await;
|
||||
|
||||
let items = buf.drain();
|
||||
// First item must be a Truncated(2) marker.
|
||||
match items.first() {
|
||||
Some(BufferedItem::Truncated(n)) => assert_eq!(*n, 2, "should report 2 truncated"),
|
||||
other => panic!("expected Truncated, got {other:?}"),
|
||||
}
|
||||
// Remaining 3 items are the last 3 events (story_2, story_3, story_4).
|
||||
assert_eq!(items.len(), 4, "truncation marker + 3 events");
|
||||
match &items[1] {
|
||||
BufferedItem::Event(StatusEvent::MergeFailure { story_id, .. }) => {
|
||||
assert_eq!(story_id, "story_2");
|
||||
}
|
||||
other => panic!("unexpected: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn truncation_counter_resets_after_drain() {
|
||||
let bc = StatusBroadcaster::new();
|
||||
let buf = StatusEventBuffer::with_capacity(&bc, 2);
|
||||
|
||||
// Overflow: publish 4 events into a capacity-2 buffer.
|
||||
publish_n(&bc, 4).await;
|
||||
|
||||
let first_drain = buf.drain();
|
||||
assert!(
|
||||
matches!(first_drain[0], BufferedItem::Truncated(_)),
|
||||
"first drain should have truncation marker"
|
||||
);
|
||||
|
||||
// Publish 1 more event — no overflow this time.
|
||||
publish_n(&bc, 1).await;
|
||||
|
||||
let second_drain = buf.drain();
|
||||
assert_eq!(second_drain.len(), 1, "one event, no truncation marker");
|
||||
assert!(matches!(second_drain[0], BufferedItem::Event(_)));
|
||||
}
|
||||
|
||||
// ── no-trigger invariant ──────────────────────────────────────────────────
|
||||
|
||||
/// Events accumulate in the buffer without invoking any side effect.
|
||||
/// We verify this by checking that `drain()` returns exactly the events
|
||||
/// published and no external state (represented by a counter) was modified.
|
||||
#[tokio::test]
|
||||
async fn event_arrival_has_no_side_effects_beyond_accumulation() {
|
||||
let bc = StatusBroadcaster::new();
|
||||
let buf = StatusEventBuffer::new(&bc);
|
||||
|
||||
// A shared flag that should NEVER be set by buffer internals.
|
||||
let side_effect_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||
|
||||
// Publish 10 events — buffer accumulates silently.
|
||||
publish_n(&bc, 10).await;
|
||||
|
||||
// The flag was never touched.
|
||||
assert!(
|
||||
!side_effect_flag.load(std::sync::atomic::Ordering::Relaxed),
|
||||
"no side effects should have occurred"
|
||||
);
|
||||
|
||||
// All 10 events accumulated.
|
||||
let items = buf.drain();
|
||||
assert_eq!(items.len(), 10);
|
||||
for item in &items {
|
||||
assert!(
|
||||
matches!(item, BufferedItem::Event(_)),
|
||||
"all items should be events"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ── two-buffer independence ───────────────────────────────────────────────
|
||||
|
||||
/// Two buffers on the same broadcaster each receive the same events independently.
|
||||
#[tokio::test]
|
||||
async fn two_buffers_on_same_broadcaster_see_same_events() {
|
||||
let bc = StatusBroadcaster::new();
|
||||
let buf_a = StatusEventBuffer::new(&bc);
|
||||
let buf_b = StatusEventBuffer::new(&bc);
|
||||
|
||||
publish_n(&bc, 4).await;
|
||||
|
||||
let items_a = buf_a.drain();
|
||||
let items_b = buf_b.drain();
|
||||
|
||||
assert_eq!(items_a.len(), 4, "buf_a should see 4 events");
|
||||
assert_eq!(items_b.len(), 4, "buf_b should see 4 events");
|
||||
|
||||
// Both buffers see events in the same order.
|
||||
for (a, b) in items_a.iter().zip(items_b.iter()) {
|
||||
match (a, b) {
|
||||
(
|
||||
BufferedItem::Event(StatusEvent::MergeFailure { story_id: ia, .. }),
|
||||
BufferedItem::Event(StatusEvent::MergeFailure { story_id: ib, .. }),
|
||||
) => assert_eq!(ia, ib, "both buffers should see events in the same order"),
|
||||
_ => panic!("unexpected item types"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── clear ─────────────────────────────────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn clear_discards_accumulated_events_and_truncation_count() {
|
||||
let bc = StatusBroadcaster::new();
|
||||
let buf = StatusEventBuffer::with_capacity(&bc, 2);
|
||||
|
||||
// Overflow to set truncation counter.
|
||||
publish_n(&bc, 4).await;
|
||||
|
||||
buf.clear();
|
||||
|
||||
// After clear, drain should return nothing.
|
||||
let items = buf.drain();
|
||||
assert!(
|
||||
items.is_empty(),
|
||||
"clear should discard events and truncation marker"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -23,6 +23,7 @@
|
||||
//! - Story 643 (Web UI): calls `subscribe()` once at startup.
|
||||
//! - Story 644 (chat transports): calls `subscribe()` once per transport.
|
||||
|
||||
pub mod buffer;
|
||||
pub mod format;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
Reference in New Issue
Block a user