//! Per-project event buffer and `GET /api/events` HTTP endpoint. //! //! The gateway polls `/api/events?since={ts_ms}` on each registered project //! server to aggregate cross-project pipeline notifications into a single //! gateway chat channel. Each project server buffers up to 500 events in //! memory and serves them via this endpoint. use crate::io::watcher::WatcherEvent; use poem::web::{Data, Query}; use poem::{Response, handler, http::StatusCode}; use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; /// Maximum number of events retained in the in-memory buffer. const MAX_BUFFER_SIZE: usize = 500; /// A pipeline event stored in the event buffer with a timestamp. #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] pub enum StoredEvent { /// A work item transitioned between pipeline stages. StageTransition { /// Work item ID (e.g. `"42_story_my_feature"`). story_id: String, /// The stage the item moved FROM (display name, e.g. `"Current"`). from_stage: String, /// The stage the item moved TO (directory key, e.g. `"3_qa"`). to_stage: String, /// Unix timestamp in milliseconds when this event was recorded. timestamp_ms: u64, }, /// A merge operation failed for a story. MergeFailure { /// Work item ID (e.g. `"42_story_my_feature"`). story_id: String, /// Human-readable description of the failure. reason: String, /// Unix timestamp in milliseconds when this event was recorded. timestamp_ms: u64, }, /// A story was blocked (e.g. retry limit exceeded). StoryBlocked { /// Work item ID (e.g. `"42_story_my_feature"`). story_id: String, /// Human-readable reason the story was blocked. reason: String, /// Unix timestamp in milliseconds when this event was recorded. timestamp_ms: u64, }, } impl StoredEvent { /// Returns the `timestamp_ms` field common to all event variants. pub fn timestamp_ms(&self) -> u64 { match self { StoredEvent::StageTransition { timestamp_ms, .. } => *timestamp_ms, StoredEvent::MergeFailure { timestamp_ms, .. } => *timestamp_ms, StoredEvent::StoryBlocked { timestamp_ms, .. } => *timestamp_ms, } } } /// Shared, thread-safe ring buffer of recent pipeline events. /// /// Wrapped in `Arc` so it can be shared between the background subscriber /// task and the HTTP handler. The inner `Mutex` guards the `VecDeque`. #[derive(Clone, Debug)] pub struct EventBuffer(Arc>>); impl EventBuffer { /// Create a new, empty event buffer. pub fn new() -> Self { EventBuffer(Arc::new(Mutex::new(VecDeque::new()))) } /// Append an event to the buffer, evicting the oldest entry if the buffer /// exceeds [`MAX_BUFFER_SIZE`]. pub fn push(&self, event: StoredEvent) { let mut buf = self.0.lock().unwrap(); if buf.len() >= MAX_BUFFER_SIZE { buf.pop_front(); } buf.push_back(event); } /// Return all events whose `timestamp_ms` is strictly greater than `since_ms`. pub fn events_since(&self, since_ms: u64) -> Vec { let buf = self.0.lock().unwrap(); buf.iter() .filter(|e| e.timestamp_ms() > since_ms) .cloned() .collect() } } impl Default for EventBuffer { fn default() -> Self { Self::new() } } /// Returns the current Unix timestamp in milliseconds. fn now_ms() -> u64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .map(|d| d.as_millis() as u64) .unwrap_or(0) } /// Spawn a background task that consumes [`WatcherEvent`] broadcasts and /// stores relevant events in `buffer`. /// /// Only [`WatcherEvent::WorkItem`] (with a known `from_stage`), /// [`WatcherEvent::MergeFailure`], and [`WatcherEvent::StoryBlocked`] /// variants are stored. All other variants are silently ignored. pub fn subscribe_to_watcher(buffer: EventBuffer, mut rx: broadcast::Receiver) { tokio::spawn(async move { loop { match rx.recv().await { Ok(WatcherEvent::WorkItem { stage, item_id, from_stage, .. }) => { // Only store genuine transitions (from_stage is known). if let Some(from) = from_stage { buffer.push(StoredEvent::StageTransition { story_id: item_id, from_stage: from, to_stage: stage, timestamp_ms: now_ms(), }); } } Ok(WatcherEvent::MergeFailure { story_id, reason }) => { buffer.push(StoredEvent::MergeFailure { story_id, reason, timestamp_ms: now_ms(), }); } Ok(WatcherEvent::StoryBlocked { story_id, reason }) => { buffer.push(StoredEvent::StoryBlocked { story_id, reason, timestamp_ms: now_ms(), }); } Ok(_) => {} // Ignore all other event types. Err(broadcast::error::RecvError::Lagged(n)) => { crate::slog!("[events] Subscriber lagged, skipped {n} events"); } Err(broadcast::error::RecvError::Closed) => { crate::slog!("[events] Watcher channel closed; stopping event subscriber"); break; } } } }); } /// Query parameters for `GET /api/events`. #[derive(Deserialize)] pub struct EventsQuery { /// Return only events with `timestamp_ms` strictly greater than this value. /// Defaults to `0` (return all buffered events). #[serde(default)] pub since: u64, } /// `GET /api/events?since={ts_ms}` /// /// Returns a JSON array of [`StoredEvent`] objects recorded after `since` ms. /// The gateway polls this endpoint on each registered project server to build /// an aggregated cross-project notification stream. #[handler] pub fn events_handler( Query(params): Query, Data(buffer): Data<&EventBuffer>, ) -> Response { let events = buffer.events_since(params.since); let body = serde_json::to_vec(&events).unwrap_or_else(|_| b"[]".to_vec()); Response::builder() .status(StatusCode::OK) .header(poem::http::header::CONTENT_TYPE, "application/json") .body(body) } #[cfg(test)] mod tests { use super::*; use tokio::sync::broadcast; #[test] fn event_buffer_push_and_retrieve() { let buf = EventBuffer::new(); buf.push(StoredEvent::MergeFailure { story_id: "42_story_x".to_string(), reason: "conflict".to_string(), timestamp_ms: 1000, }); buf.push(StoredEvent::StoryBlocked { story_id: "43_story_y".to_string(), reason: "retry limit".to_string(), timestamp_ms: 2000, }); let all = buf.events_since(0); assert_eq!(all.len(), 2); let after_1000 = buf.events_since(1000); assert_eq!(after_1000.len(), 1); assert!(matches!(after_1000[0], StoredEvent::StoryBlocked { .. })); } #[test] fn event_buffer_evicts_oldest_when_full() { let buf = EventBuffer::new(); for i in 0..MAX_BUFFER_SIZE + 1 { buf.push(StoredEvent::MergeFailure { story_id: format!("{i}_story_x"), reason: "x".to_string(), timestamp_ms: i as u64, }); } // Buffer must not exceed MAX_BUFFER_SIZE. assert_eq!(buf.events_since(0).len(), MAX_BUFFER_SIZE); // Oldest entry (timestamp_ms == 0) should have been evicted. assert!(buf.events_since(0).iter().all(|e| e.timestamp_ms() > 0)); } #[test] fn stage_transition_timestamp_ms_accessor() { let e = StoredEvent::StageTransition { story_id: "1".to_string(), from_stage: "2_current".to_string(), to_stage: "3_qa".to_string(), timestamp_ms: 9999, }; assert_eq!(e.timestamp_ms(), 9999); } #[tokio::test] async fn subscribe_to_watcher_stores_work_item_with_from_stage() { let buf = EventBuffer::new(); let (tx, rx) = broadcast::channel(16); subscribe_to_watcher(buf.clone(), rx); tx.send(crate::io::watcher::WatcherEvent::WorkItem { stage: "3_qa".to_string(), item_id: "42_story_foo".to_string(), action: "qa".to_string(), commit_msg: "huskies: qa 42_story_foo".to_string(), from_stage: Some("2_current".to_string()), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(50)).await; let events = buf.events_since(0); assert_eq!(events.len(), 1); assert!(matches!(events[0], StoredEvent::StageTransition { .. })); if let StoredEvent::StageTransition { ref story_id, ref from_stage, ref to_stage, .. } = events[0] { assert_eq!(story_id, "42_story_foo"); assert_eq!(from_stage, "2_current"); assert_eq!(to_stage, "3_qa"); } } #[tokio::test] async fn subscribe_to_watcher_ignores_work_item_without_from_stage() { let buf = EventBuffer::new(); let (tx, rx) = broadcast::channel(16); subscribe_to_watcher(buf.clone(), rx); // Synthetic event: no from_stage. tx.send(crate::io::watcher::WatcherEvent::WorkItem { stage: "2_current".to_string(), item_id: "99_story_syn".to_string(), action: "start".to_string(), commit_msg: "huskies: start 99_story_syn".to_string(), from_stage: None, }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(50)).await; assert_eq!(buf.events_since(0).len(), 0); } #[tokio::test] async fn subscribe_to_watcher_stores_merge_failure() { let buf = EventBuffer::new(); let (tx, rx) = broadcast::channel(16); subscribe_to_watcher(buf.clone(), rx); tx.send(crate::io::watcher::WatcherEvent::MergeFailure { story_id: "42_story_foo".to_string(), reason: "merge conflict".to_string(), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(50)).await; let events = buf.events_since(0); assert_eq!(events.len(), 1); assert!(matches!(events[0], StoredEvent::MergeFailure { .. })); } #[tokio::test] async fn subscribe_to_watcher_stores_story_blocked() { let buf = EventBuffer::new(); let (tx, rx) = broadcast::channel(16); subscribe_to_watcher(buf.clone(), rx); tx.send(crate::io::watcher::WatcherEvent::StoryBlocked { story_id: "43_story_bar".to_string(), reason: "retry limit exceeded".to_string(), }) .unwrap(); tokio::time::sleep(std::time::Duration::from_millis(50)).await; let events = buf.events_since(0); assert_eq!(events.len(), 1); assert!(matches!(events[0], StoredEvent::StoryBlocked { .. })); } }