diff --git a/server/src/http/events.rs b/server/src/http/events.rs index f06f60ab..5b0e3730 100644 --- a/server/src/http/events.rs +++ b/server/src/http/events.rs @@ -4,163 +4,18 @@ //! server to aggregate cross-project pipeline notifications into a single //! gateway chat channel. Each project server buffers up to 500 events in //! memory and serves them via this endpoint. +//! +//! Domain logic lives in `service::events`; this module is a thin HTTP +//! adapter: extract query params → call service → shape response. + +pub use crate::service::events::{EventBuffer, StoredEvent, subscribe_to_watcher}; +// MAX_BUFFER_SIZE is used in tests via `use super::*`. +#[cfg(test)] +pub use crate::service::events::MAX_BUFFER_SIZE; -use crate::io::watcher::WatcherEvent; use poem::web::{Data, Query}; use poem::{Response, handler, http::StatusCode}; -use serde::{Deserialize, Serialize}; -use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; -use tokio::sync::broadcast; - -/// Maximum number of events retained in the in-memory buffer. -const MAX_BUFFER_SIZE: usize = 500; - -/// A pipeline event stored in the event buffer with a timestamp. -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum StoredEvent { - /// A work item transitioned between pipeline stages. - StageTransition { - /// Work item ID (e.g. `"42_story_my_feature"`). - story_id: String, - /// The stage the item moved FROM (display name, e.g. `"Current"`). - from_stage: String, - /// The stage the item moved TO (directory key, e.g. `"3_qa"`). - to_stage: String, - /// Unix timestamp in milliseconds when this event was recorded. - timestamp_ms: u64, - }, - /// A merge operation failed for a story. - MergeFailure { - /// Work item ID (e.g. `"42_story_my_feature"`). - story_id: String, - /// Human-readable description of the failure. - reason: String, - /// Unix timestamp in milliseconds when this event was recorded. - timestamp_ms: u64, - }, - /// A story was blocked (e.g. retry limit exceeded). - StoryBlocked { - /// Work item ID (e.g. `"42_story_my_feature"`). - story_id: String, - /// Human-readable reason the story was blocked. - reason: String, - /// Unix timestamp in milliseconds when this event was recorded. - timestamp_ms: u64, - }, -} - -impl StoredEvent { - /// Returns the `timestamp_ms` field common to all event variants. - pub fn timestamp_ms(&self) -> u64 { - match self { - StoredEvent::StageTransition { timestamp_ms, .. } => *timestamp_ms, - StoredEvent::MergeFailure { timestamp_ms, .. } => *timestamp_ms, - StoredEvent::StoryBlocked { timestamp_ms, .. } => *timestamp_ms, - } - } -} - -/// Shared, thread-safe ring buffer of recent pipeline events. -/// -/// Wrapped in `Arc` so it can be shared between the background subscriber -/// task and the HTTP handler. The inner `Mutex` guards the `VecDeque`. -#[derive(Clone, Debug)] -pub struct EventBuffer(Arc>>); - -impl EventBuffer { - /// Create a new, empty event buffer. - pub fn new() -> Self { - EventBuffer(Arc::new(Mutex::new(VecDeque::new()))) - } - - /// Append an event to the buffer, evicting the oldest entry if the buffer - /// exceeds [`MAX_BUFFER_SIZE`]. - pub fn push(&self, event: StoredEvent) { - let mut buf = self.0.lock().unwrap(); - if buf.len() >= MAX_BUFFER_SIZE { - buf.pop_front(); - } - buf.push_back(event); - } - - /// Return all events whose `timestamp_ms` is strictly greater than `since_ms`. - pub fn events_since(&self, since_ms: u64) -> Vec { - let buf = self.0.lock().unwrap(); - buf.iter() - .filter(|e| e.timestamp_ms() > since_ms) - .cloned() - .collect() - } -} - -impl Default for EventBuffer { - fn default() -> Self { - Self::new() - } -} - -/// Returns the current Unix timestamp in milliseconds. -fn now_ms() -> u64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_millis() as u64) - .unwrap_or(0) -} - -/// Spawn a background task that consumes [`WatcherEvent`] broadcasts and -/// stores relevant events in `buffer`. -/// -/// Only [`WatcherEvent::WorkItem`] (with a known `from_stage`), -/// [`WatcherEvent::MergeFailure`], and [`WatcherEvent::StoryBlocked`] -/// variants are stored. All other variants are silently ignored. -pub fn subscribe_to_watcher(buffer: EventBuffer, mut rx: broadcast::Receiver) { - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(WatcherEvent::WorkItem { - stage, - item_id, - from_stage, - .. - }) => { - // Only store genuine transitions (from_stage is known). - if let Some(from) = from_stage { - buffer.push(StoredEvent::StageTransition { - story_id: item_id, - from_stage: from, - to_stage: stage, - timestamp_ms: now_ms(), - }); - } - } - Ok(WatcherEvent::MergeFailure { story_id, reason }) => { - buffer.push(StoredEvent::MergeFailure { - story_id, - reason, - timestamp_ms: now_ms(), - }); - } - Ok(WatcherEvent::StoryBlocked { story_id, reason }) => { - buffer.push(StoredEvent::StoryBlocked { - story_id, - reason, - timestamp_ms: now_ms(), - }); - } - Ok(_) => {} // Ignore all other event types. - Err(broadcast::error::RecvError::Lagged(n)) => { - crate::slog!("[events] Subscriber lagged, skipped {n} events"); - } - Err(broadcast::error::RecvError::Closed) => { - crate::slog!("[events] Watcher channel closed; stopping event subscriber"); - break; - } - } - } - }); -} +use serde::Deserialize; /// Query parameters for `GET /api/events`. #[derive(Deserialize)] @@ -181,7 +36,7 @@ pub fn events_handler( Query(params): Query, Data(buffer): Data<&EventBuffer>, ) -> Response { - let events = buffer.events_since(params.since); + let events = crate::service::events::events_since(buffer, params.since); let body = serde_json::to_vec(&events).unwrap_or_else(|_| b"[]".to_vec()); Response::builder() .status(StatusCode::OK) diff --git a/server/src/http/health.rs b/server/src/http/health.rs index 64c3120b..c8800fe3 100644 --- a/server/src/http/health.rs +++ b/server/src/http/health.rs @@ -1,7 +1,13 @@ -//! Health check endpoint — returns a static "ok" response. +//! Health check endpoint — thin HTTP adapter over `service::health`. +//! +//! Domain logic (the `HealthStatus` type and check function) lives in +//! `service::health`; this module is a thin adapter: call service → shape +//! response. + +pub use crate::service::health::HealthStatus; + use poem::handler; -use poem_openapi::{Object, OpenApi, Tags, payload::Json}; -use serde::Serialize; +use poem_openapi::{OpenApi, Tags, payload::Json}; /// Health check endpoint. /// @@ -16,11 +22,6 @@ enum HealthTags { Health, } -#[derive(Serialize, Object)] -pub struct HealthStatus { - status: String, -} - pub struct HealthApi; #[OpenApi(tag = "HealthTags::Health")] @@ -30,9 +31,7 @@ impl HealthApi { /// Returns a JSON status object to confirm the server is running. #[oai(path = "/health", method = "get")] async fn health(&self) -> Json { - Json(HealthStatus { - status: "ok".to_string(), - }) + Json(crate::service::health::check()) } } diff --git a/server/src/service/events/buffer.rs b/server/src/service/events/buffer.rs new file mode 100644 index 00000000..20efdbe5 --- /dev/null +++ b/server/src/service/events/buffer.rs @@ -0,0 +1,184 @@ +//! Pure event-buffer types — no side effects. +//! +//! `StoredEvent` and `EventBuffer` contain only data-transformation and +//! structural logic; all I/O (clocks, spawned tasks) lives in `io.rs`. + +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; + +/// Maximum number of events retained in the in-memory buffer. +pub const MAX_BUFFER_SIZE: usize = 500; + +/// A pipeline event stored in the event buffer with a timestamp. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum StoredEvent { + /// A work item transitioned between pipeline stages. + StageTransition { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// The stage the item moved FROM (display name, e.g. `"Current"`). + from_stage: String, + /// The stage the item moved TO (directory key, e.g. `"3_qa"`). + to_stage: String, + /// Unix timestamp in milliseconds when this event was recorded. + timestamp_ms: u64, + }, + /// A merge operation failed for a story. + MergeFailure { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// Human-readable description of the failure. + reason: String, + /// Unix timestamp in milliseconds when this event was recorded. + timestamp_ms: u64, + }, + /// A story was blocked (e.g. retry limit exceeded). + StoryBlocked { + /// Work item ID (e.g. `"42_story_my_feature"`). + story_id: String, + /// Human-readable reason the story was blocked. + reason: String, + /// Unix timestamp in milliseconds when this event was recorded. + timestamp_ms: u64, + }, +} + +impl StoredEvent { + /// Returns the `timestamp_ms` field common to all event variants. + pub fn timestamp_ms(&self) -> u64 { + match self { + StoredEvent::StageTransition { timestamp_ms, .. } => *timestamp_ms, + StoredEvent::MergeFailure { timestamp_ms, .. } => *timestamp_ms, + StoredEvent::StoryBlocked { timestamp_ms, .. } => *timestamp_ms, + } + } +} + +/// Shared, thread-safe ring buffer of recent pipeline events. +/// +/// Wrapped in `Arc` so it can be shared between the background subscriber +/// task and the HTTP handler. The inner `Mutex` guards the `VecDeque`. +#[derive(Clone, Debug)] +pub struct EventBuffer(Arc>>); + +impl EventBuffer { + /// Create a new, empty event buffer. + pub fn new() -> Self { + EventBuffer(Arc::new(Mutex::new(VecDeque::new()))) + } + + /// Append an event to the buffer, evicting the oldest entry if the buffer + /// exceeds [`MAX_BUFFER_SIZE`]. + pub fn push(&self, event: StoredEvent) { + let mut buf = self.0.lock().unwrap(); + if buf.len() >= MAX_BUFFER_SIZE { + buf.pop_front(); + } + buf.push_back(event); + } + + /// Return all events whose `timestamp_ms` is strictly greater than `since_ms`. + pub fn events_since(&self, since_ms: u64) -> Vec { + let buf = self.0.lock().unwrap(); + buf.iter() + .filter(|e| e.timestamp_ms() > since_ms) + .cloned() + .collect() + } +} + +impl Default for EventBuffer { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_and_retrieve_events() { + let buf = EventBuffer::new(); + buf.push(StoredEvent::MergeFailure { + story_id: "42_story_x".to_string(), + reason: "conflict".to_string(), + timestamp_ms: 1000, + }); + buf.push(StoredEvent::StoryBlocked { + story_id: "43_story_y".to_string(), + reason: "retry limit".to_string(), + timestamp_ms: 2000, + }); + + let all = buf.events_since(0); + assert_eq!(all.len(), 2); + + let after_1000 = buf.events_since(1000); + assert_eq!(after_1000.len(), 1); + assert!(matches!(after_1000[0], StoredEvent::StoryBlocked { .. })); + } + + #[test] + fn evicts_oldest_when_full() { + let buf = EventBuffer::new(); + for i in 0..MAX_BUFFER_SIZE + 1 { + buf.push(StoredEvent::MergeFailure { + story_id: format!("{i}_story_x"), + reason: "x".to_string(), + timestamp_ms: i as u64, + }); + } + assert_eq!(buf.events_since(0).len(), MAX_BUFFER_SIZE); + assert!(buf.events_since(0).iter().all(|e| e.timestamp_ms() > 0)); + } + + #[test] + fn timestamp_ms_accessor_for_all_variants() { + let variants = [ + StoredEvent::StageTransition { + story_id: "1".to_string(), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: 100, + }, + StoredEvent::MergeFailure { + story_id: "2".to_string(), + reason: "x".to_string(), + timestamp_ms: 200, + }, + StoredEvent::StoryBlocked { + story_id: "3".to_string(), + reason: "y".to_string(), + timestamp_ms: 300, + }, + ]; + assert_eq!(variants[0].timestamp_ms(), 100); + assert_eq!(variants[1].timestamp_ms(), 200); + assert_eq!(variants[2].timestamp_ms(), 300); + } + + #[test] + fn events_since_filters_by_timestamp() { + let buf = EventBuffer::new(); + for ts in [100u64, 200, 300] { + buf.push(StoredEvent::MergeFailure { + story_id: "x".to_string(), + reason: "r".to_string(), + timestamp_ms: ts, + }); + } + // strictly greater than 100 + let result = buf.events_since(100); + assert_eq!(result.len(), 2); + assert!(result.iter().all(|e| e.timestamp_ms() > 100)); + } + + #[test] + fn default_creates_empty_buffer() { + let buf = EventBuffer::default(); + assert_eq!(buf.events_since(0).len(), 0); + } +} diff --git a/server/src/service/events/io.rs b/server/src/service/events/io.rs new file mode 100644 index 00000000..af8b17c8 --- /dev/null +++ b/server/src/service/events/io.rs @@ -0,0 +1,67 @@ +//! Events I/O wrappers — the ONLY place in `service/events/` that may perform +//! side effects such as reading the system clock or spawning async tasks. + +use crate::io::watcher::WatcherEvent; +use tokio::sync::broadcast; + +use super::buffer::{EventBuffer, StoredEvent}; + +/// Returns the current Unix timestamp in milliseconds. +pub(super) fn now_ms() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +/// Spawn a background task that consumes [`WatcherEvent`] broadcasts and +/// stores relevant events in `buffer`. +/// +/// Only [`WatcherEvent::WorkItem`] (with a known `from_stage`), +/// [`WatcherEvent::MergeFailure`], and [`WatcherEvent::StoryBlocked`] +/// variants are stored. All other variants are silently ignored. +pub fn subscribe_to_watcher(buffer: EventBuffer, mut rx: broadcast::Receiver) { + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(WatcherEvent::WorkItem { + stage, + item_id, + from_stage, + .. + }) => { + if let Some(from) = from_stage { + buffer.push(StoredEvent::StageTransition { + story_id: item_id, + from_stage: from, + to_stage: stage, + timestamp_ms: now_ms(), + }); + } + } + Ok(WatcherEvent::MergeFailure { story_id, reason }) => { + buffer.push(StoredEvent::MergeFailure { + story_id, + reason, + timestamp_ms: now_ms(), + }); + } + Ok(WatcherEvent::StoryBlocked { story_id, reason }) => { + buffer.push(StoredEvent::StoryBlocked { + story_id, + reason, + timestamp_ms: now_ms(), + }); + } + Ok(_) => {} + Err(broadcast::error::RecvError::Lagged(n)) => { + crate::slog!("[events] Subscriber lagged, skipped {n} events"); + } + Err(broadcast::error::RecvError::Closed) => { + crate::slog!("[events] Watcher channel closed; stopping event subscriber"); + break; + } + } + } + }); +} diff --git a/server/src/service/events/mod.rs b/server/src/service/events/mod.rs new file mode 100644 index 00000000..53057402 --- /dev/null +++ b/server/src/service/events/mod.rs @@ -0,0 +1,45 @@ +//! Events service — public API for the events domain. +//! +//! This module re-exports the pure buffer types from `buffer.rs` and the +//! side-effectful watcher subscription from `io.rs`. HTTP handlers call +//! these exports instead of containing the logic inline. +//! +//! Conventions: `docs/architecture/service-modules.md` + +pub mod buffer; +pub(super) mod io; + +pub use buffer::{EventBuffer, StoredEvent}; +// Re-exported for tests (http::events uses it via `use super::*`). +#[allow(unused_imports)] +pub use buffer::MAX_BUFFER_SIZE; +pub use io::subscribe_to_watcher; + +// ── Error type ──────────────────────────────────────────────────────────────── + +/// Typed errors returned by `service::events` functions. +/// +/// Events operations on the in-memory buffer are infallible; this enum +/// exists to satisfy the module convention and to accommodate future +/// error cases (e.g. persistence). +#[allow(dead_code)] +#[derive(Debug)] +pub enum Error { + /// A serialisation or internal error occurred. + Internal(String), +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Internal(msg) => write!(f, "Events error: {msg}"), + } + } +} + +// ── Public API ──────────────────────────────────────────────────────────────── + +/// Return all events in `buffer` recorded after `since_ms` milliseconds. +pub fn events_since(buffer: &EventBuffer, since_ms: u64) -> Vec { + buffer.events_since(since_ms) +} diff --git a/server/src/service/health/check.rs b/server/src/service/health/check.rs new file mode 100644 index 00000000..5ea0b602 --- /dev/null +++ b/server/src/service/health/check.rs @@ -0,0 +1,38 @@ +//! Pure health-check logic — no side effects. + +use poem_openapi::Object; +use serde::Serialize; + +/// The JSON payload returned by the health check endpoint. +#[derive(Serialize, Object)] +pub struct HealthStatus { + /// Human-readable status string, always `"ok"` when the server is healthy. + pub status: String, +} + +/// Return a healthy status response. +pub fn ok() -> HealthStatus { + HealthStatus { + status: "ok".to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ok_returns_status_ok() { + let s = ok(); + assert_eq!(s.status, "ok"); + } + + #[test] + fn health_status_serializes() { + let s = HealthStatus { + status: "ok".to_string(), + }; + let json = serde_json::to_value(&s).unwrap(); + assert_eq!(json["status"], "ok"); + } +} diff --git a/server/src/service/health/io.rs b/server/src/service/health/io.rs new file mode 100644 index 00000000..a525d8c0 --- /dev/null +++ b/server/src/service/health/io.rs @@ -0,0 +1,4 @@ +//! Health I/O wrappers. +//! +//! Health has no side effects; this file exists to satisfy the +//! service-module convention (`docs/architecture/service-modules.md`). diff --git a/server/src/service/health/mod.rs b/server/src/service/health/mod.rs new file mode 100644 index 00000000..9252509e --- /dev/null +++ b/server/src/service/health/mod.rs @@ -0,0 +1,39 @@ +//! Health service — public API for the health domain. +//! +//! Exposes a single `check()` function that returns a [`HealthStatus`]. +//! HTTP handlers call this instead of constructing the response inline. +//! +//! Conventions: `docs/architecture/service-modules.md` + +pub mod check; +pub(super) mod io; + +pub use check::HealthStatus; + +// ── Error type ──────────────────────────────────────────────────────────────── + +/// Typed errors returned by `service::health` functions. +/// +/// Health checks are currently infallible; this enum satisfies the module +/// convention and accommodates future error cases (e.g. dependency checks). +#[allow(dead_code)] +#[derive(Debug)] +pub enum Error { + /// An internal error occurred during the health check. + Internal(String), +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Internal(msg) => write!(f, "Health error: {msg}"), + } + } +} + +// ── Public API ──────────────────────────────────────────────────────────────── + +/// Perform a health check and return the status. +pub fn check() -> HealthStatus { + check::ok() +} diff --git a/server/src/service/mod.rs b/server/src/service/mod.rs index 003b2b3f..b4ea26c6 100644 --- a/server/src/service/mod.rs +++ b/server/src/service/mod.rs @@ -6,3 +6,5 @@ //! - `io.rs` is the only file that performs side effects //! - Topic-named pure files contain branching logic with no I/O pub mod agents; +pub mod events; +pub mod health;