//! Pure event-buffer types — no side effects. //! //! `StoredEvent` and `EventBuffer` contain only data-transformation and //! structural logic; all I/O (clocks, spawned tasks) lives in `io.rs`. use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; /// Maximum number of events retained in the in-memory buffer. pub const MAX_BUFFER_SIZE: usize = 500; /// A pipeline event stored in the event buffer with a timestamp. #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum StoredEvent { /// A work item transitioned between pipeline stages. StageTransition { /// Work item ID (e.g. `"42_story_my_feature"`). story_id: String, /// The stage the item moved FROM (display name, e.g. `"Current"`). from_stage: String, /// The stage the item moved TO (directory key, e.g. `"3_qa"`). to_stage: String, /// Unix timestamp in milliseconds when this event was recorded. timestamp_ms: u64, }, /// A merge operation failed for a story. MergeFailure { /// Work item ID (e.g. `"42_story_my_feature"`). story_id: String, /// Human-readable description of the failure. reason: String, /// Unix timestamp in milliseconds when this event was recorded. timestamp_ms: u64, }, /// A story was blocked (e.g. retry limit exceeded). StoryBlocked { /// Work item ID (e.g. `"42_story_my_feature"`). story_id: String, /// Human-readable reason the story was blocked. reason: String, /// Unix timestamp in milliseconds when this event was recorded. timestamp_ms: u64, }, } impl StoredEvent { /// Returns the `timestamp_ms` field common to all event variants. pub fn timestamp_ms(&self) -> u64 { match self { StoredEvent::StageTransition { timestamp_ms, .. } => *timestamp_ms, StoredEvent::MergeFailure { timestamp_ms, .. } => *timestamp_ms, StoredEvent::StoryBlocked { timestamp_ms, .. } => *timestamp_ms, } } } /// Shared, thread-safe ring buffer of recent pipeline events. /// /// Wrapped in `Arc` so it can be shared between the background subscriber /// task and the HTTP handler. The inner `Mutex` guards the `VecDeque`. #[derive(Clone, Debug)] pub struct EventBuffer(Arc>>); impl EventBuffer { /// Create a new, empty event buffer. pub fn new() -> Self { EventBuffer(Arc::new(Mutex::new(VecDeque::new()))) } /// Append an event to the buffer, evicting the oldest entry if the buffer /// exceeds [`MAX_BUFFER_SIZE`]. pub fn push(&self, event: StoredEvent) { let mut buf = self.0.lock().unwrap(); if buf.len() >= MAX_BUFFER_SIZE { buf.pop_front(); } buf.push_back(event); } /// Return all events whose `timestamp_ms` is strictly greater than `since_ms`. pub fn events_since(&self, since_ms: u64) -> Vec { let buf = self.0.lock().unwrap(); buf.iter() .filter(|e| e.timestamp_ms() > since_ms) .cloned() .collect() } } impl Default for EventBuffer { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests { use super::*; #[test] fn push_and_retrieve_events() { let buf = EventBuffer::new(); buf.push(StoredEvent::MergeFailure { story_id: "42_story_x".to_string(), reason: "conflict".to_string(), timestamp_ms: 1000, }); buf.push(StoredEvent::StoryBlocked { story_id: "43_story_y".to_string(), reason: "retry limit".to_string(), timestamp_ms: 2000, }); let all = buf.events_since(0); assert_eq!(all.len(), 2); let after_1000 = buf.events_since(1000); assert_eq!(after_1000.len(), 1); assert!(matches!(after_1000[0], StoredEvent::StoryBlocked { .. })); } #[test] fn evicts_oldest_when_full() { let buf = EventBuffer::new(); for i in 0..MAX_BUFFER_SIZE + 1 { buf.push(StoredEvent::MergeFailure { story_id: format!("{i}_story_x"), reason: "x".to_string(), timestamp_ms: i as u64, }); } assert_eq!(buf.events_since(0).len(), MAX_BUFFER_SIZE); assert!(buf.events_since(0).iter().all(|e| e.timestamp_ms() > 0)); } #[test] fn timestamp_ms_accessor_for_all_variants() { let variants = [ StoredEvent::StageTransition { story_id: "1".to_string(), from_stage: "2_current".to_string(), to_stage: "3_qa".to_string(), timestamp_ms: 100, }, StoredEvent::MergeFailure { story_id: "2".to_string(), reason: "x".to_string(), timestamp_ms: 200, }, StoredEvent::StoryBlocked { story_id: "3".to_string(), reason: "y".to_string(), timestamp_ms: 300, }, ]; assert_eq!(variants[0].timestamp_ms(), 100); assert_eq!(variants[1].timestamp_ms(), 200); assert_eq!(variants[2].timestamp_ms(), 300); } #[test] fn events_since_filters_by_timestamp() { let buf = EventBuffer::new(); for ts in [100u64, 200, 300] { buf.push(StoredEvent::MergeFailure { story_id: "x".to_string(), reason: "r".to_string(), timestamp_ms: ts, }); } // strictly greater than 100 let result = buf.events_since(100); assert_eq!(result.len(), 2); assert!(result.iter().all(|e| e.timestamp_ms() > 100)); } #[test] fn default_creates_empty_buffer() { let buf = EventBuffer::default(); assert_eq!(buf.events_since(0).len(), 0); } }