Files
huskies/server/src/http/events.rs
T

199 lines
6.5 KiB
Rust
Raw Normal View History

//! Per-project event buffer and `GET /api/events` HTTP endpoint.
//!
//! The gateway polls `/api/events?since={ts_ms}` on each registered project
//! server to aggregate cross-project pipeline notifications into a single
//! gateway chat channel. Each project server buffers up to 500 events in
//! memory and serves them via this endpoint.
//!
//! Domain logic lives in `service::events`; this module is a thin HTTP
//! adapter: extract query params → call service → shape response.
#[cfg(test)]
pub use crate::service::events::StoredEvent;
pub use crate::service::events::{EventBuffer, subscribe_to_watcher};
// MAX_BUFFER_SIZE is used in tests via `use super::*`.
#[cfg(test)]
pub use crate::service::events::MAX_BUFFER_SIZE;
use poem::web::{Data, Query};
use poem::{Response, handler, http::StatusCode};
use serde::Deserialize;
/// Query parameters for `GET /api/events`.
#[derive(Deserialize)]
pub struct EventsQuery {
/// Return only events with `timestamp_ms` strictly greater than this value.
/// Defaults to `0` (return all buffered events).
#[serde(default)]
pub since: u64,
}
/// `GET /api/events?since={ts_ms}`
///
/// Returns a JSON array of [`StoredEvent`] objects recorded after `since` ms.
/// The gateway polls this endpoint on each registered project server to build
/// an aggregated cross-project notification stream.
#[handler]
pub fn events_handler(
Query(params): Query<EventsQuery>,
Data(buffer): Data<&EventBuffer>,
) -> Response {
let events = crate::service::events::events_since(buffer, params.since);
let body = serde_json::to_vec(&events).unwrap_or_else(|_| b"[]".to_vec());
Response::builder()
.status(StatusCode::OK)
.header(poem::http::header::CONTENT_TYPE, "application/json")
.body(body)
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::broadcast;
#[test]
fn event_buffer_push_and_retrieve() {
let buf = EventBuffer::new();
buf.push(StoredEvent::MergeFailure {
story_id: "42_story_x".to_string(),
reason: "conflict".to_string(),
timestamp_ms: 1000,
});
buf.push(StoredEvent::StoryBlocked {
story_id: "43_story_y".to_string(),
reason: "retry limit".to_string(),
timestamp_ms: 2000,
});
let all = buf.events_since(0);
assert_eq!(all.len(), 2);
let after_1000 = buf.events_since(1000);
assert_eq!(after_1000.len(), 1);
assert!(matches!(after_1000[0], StoredEvent::StoryBlocked { .. }));
}
#[test]
fn event_buffer_evicts_oldest_when_full() {
let buf = EventBuffer::new();
for i in 0..MAX_BUFFER_SIZE + 1 {
buf.push(StoredEvent::MergeFailure {
story_id: format!("{i}_story_x"),
reason: "x".to_string(),
timestamp_ms: i as u64,
});
}
// Buffer must not exceed MAX_BUFFER_SIZE.
assert_eq!(buf.events_since(0).len(), MAX_BUFFER_SIZE);
// Oldest entry (timestamp_ms == 0) should have been evicted.
assert!(buf.events_since(0).iter().all(|e| e.timestamp_ms() > 0));
}
#[test]
fn stage_transition_timestamp_ms_accessor() {
let e = StoredEvent::StageTransition {
story_id: "1".to_string(),
from_stage: "2_current".to_string(),
to_stage: "3_qa".to_string(),
timestamp_ms: 9999,
};
assert_eq!(e.timestamp_ms(), 9999);
}
#[tokio::test]
async fn subscribe_to_watcher_stores_work_item_with_from_stage() {
let buf = EventBuffer::new();
let (tx, rx) = broadcast::channel(16);
subscribe_to_watcher(buf.clone(), rx);
tx.send(crate::io::watcher::WatcherEvent::WorkItem {
stage: "3_qa".to_string(),
item_id: "42_story_foo".to_string(),
action: "qa".to_string(),
commit_msg: "huskies: qa 42_story_foo".to_string(),
from_stage: Some("2_current".to_string()),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let events = buf.events_since(0);
assert_eq!(events.len(), 1);
assert!(matches!(events[0], StoredEvent::StageTransition { .. }));
if let StoredEvent::StageTransition {
ref story_id,
ref from_stage,
ref to_stage,
..
} = events[0]
{
assert_eq!(story_id, "42_story_foo");
assert_eq!(from_stage, "2_current");
assert_eq!(to_stage, "3_qa");
}
}
#[tokio::test]
async fn subscribe_to_watcher_ignores_work_item_without_from_stage() {
let buf = EventBuffer::new();
let (tx, rx) = broadcast::channel(16);
subscribe_to_watcher(buf.clone(), rx);
// Synthetic event: no from_stage.
tx.send(crate::io::watcher::WatcherEvent::WorkItem {
stage: "2_current".to_string(),
item_id: "99_story_syn".to_string(),
action: "start".to_string(),
commit_msg: "huskies: start 99_story_syn".to_string(),
from_stage: None,
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(buf.events_since(0).len(), 0);
}
#[tokio::test]
async fn subscribe_to_watcher_stores_merge_failure() {
let buf = EventBuffer::new();
let (tx, rx) = broadcast::channel(16);
subscribe_to_watcher(buf.clone(), rx);
tx.send(crate::io::watcher::WatcherEvent::MergeFailure {
story_id: "42_story_foo".to_string(),
reason: "merge conflict".to_string(),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let events = buf.events_since(0);
assert_eq!(events.len(), 1);
assert!(matches!(events[0], StoredEvent::MergeFailure { .. }));
}
#[tokio::test]
async fn subscribe_to_watcher_stores_story_blocked() {
let buf = EventBuffer::new();
let (tx, rx) = broadcast::channel(16);
subscribe_to_watcher(buf.clone(), rx);
tx.send(crate::io::watcher::WatcherEvent::StoryBlocked {
story_id: "43_story_bar".to_string(),
reason: "retry limit exceeded".to_string(),
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let events = buf.events_since(0);
assert_eq!(events.len(), 1);
assert!(matches!(events[0], StoredEvent::StoryBlocked { .. }));
}
}