huskies: merge 605_story_extract_events_and_health_services
This commit is contained in:
+10
-155
@@ -4,163 +4,18 @@
|
||||
//! server to aggregate cross-project pipeline notifications into a single
|
||||
//! gateway chat channel. Each project server buffers up to 500 events in
|
||||
//! memory and serves them via this endpoint.
|
||||
//!
|
||||
//! Domain logic lives in `service::events`; this module is a thin HTTP
|
||||
//! adapter: extract query params → call service → shape response.
|
||||
|
||||
pub use crate::service::events::{EventBuffer, StoredEvent, subscribe_to_watcher};
|
||||
// MAX_BUFFER_SIZE is used in tests via `use super::*`.
|
||||
#[cfg(test)]
|
||||
pub use crate::service::events::MAX_BUFFER_SIZE;
|
||||
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use poem::web::{Data, Query};
|
||||
use poem::{Response, handler, http::StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
/// Maximum number of events retained in the in-memory buffer.
|
||||
const MAX_BUFFER_SIZE: usize = 500;
|
||||
|
||||
/// A pipeline event stored in the event buffer with a timestamp.
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum StoredEvent {
|
||||
/// A work item transitioned between pipeline stages.
|
||||
StageTransition {
|
||||
/// Work item ID (e.g. `"42_story_my_feature"`).
|
||||
story_id: String,
|
||||
/// The stage the item moved FROM (display name, e.g. `"Current"`).
|
||||
from_stage: String,
|
||||
/// The stage the item moved TO (directory key, e.g. `"3_qa"`).
|
||||
to_stage: String,
|
||||
/// Unix timestamp in milliseconds when this event was recorded.
|
||||
timestamp_ms: u64,
|
||||
},
|
||||
/// A merge operation failed for a story.
|
||||
MergeFailure {
|
||||
/// Work item ID (e.g. `"42_story_my_feature"`).
|
||||
story_id: String,
|
||||
/// Human-readable description of the failure.
|
||||
reason: String,
|
||||
/// Unix timestamp in milliseconds when this event was recorded.
|
||||
timestamp_ms: u64,
|
||||
},
|
||||
/// A story was blocked (e.g. retry limit exceeded).
|
||||
StoryBlocked {
|
||||
/// Work item ID (e.g. `"42_story_my_feature"`).
|
||||
story_id: String,
|
||||
/// Human-readable reason the story was blocked.
|
||||
reason: String,
|
||||
/// Unix timestamp in milliseconds when this event was recorded.
|
||||
timestamp_ms: u64,
|
||||
},
|
||||
}
|
||||
|
||||
impl StoredEvent {
|
||||
/// Returns the `timestamp_ms` field common to all event variants.
|
||||
pub fn timestamp_ms(&self) -> u64 {
|
||||
match self {
|
||||
StoredEvent::StageTransition { timestamp_ms, .. } => *timestamp_ms,
|
||||
StoredEvent::MergeFailure { timestamp_ms, .. } => *timestamp_ms,
|
||||
StoredEvent::StoryBlocked { timestamp_ms, .. } => *timestamp_ms,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared, thread-safe ring buffer of recent pipeline events.
|
||||
///
|
||||
/// Wrapped in `Arc` so it can be shared between the background subscriber
|
||||
/// task and the HTTP handler. The inner `Mutex` guards the `VecDeque`.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct EventBuffer(Arc<Mutex<VecDeque<StoredEvent>>>);
|
||||
|
||||
impl EventBuffer {
|
||||
/// Create a new, empty event buffer.
|
||||
pub fn new() -> Self {
|
||||
EventBuffer(Arc::new(Mutex::new(VecDeque::new())))
|
||||
}
|
||||
|
||||
/// Append an event to the buffer, evicting the oldest entry if the buffer
|
||||
/// exceeds [`MAX_BUFFER_SIZE`].
|
||||
pub fn push(&self, event: StoredEvent) {
|
||||
let mut buf = self.0.lock().unwrap();
|
||||
if buf.len() >= MAX_BUFFER_SIZE {
|
||||
buf.pop_front();
|
||||
}
|
||||
buf.push_back(event);
|
||||
}
|
||||
|
||||
/// Return all events whose `timestamp_ms` is strictly greater than `since_ms`.
|
||||
pub fn events_since(&self, since_ms: u64) -> Vec<StoredEvent> {
|
||||
let buf = self.0.lock().unwrap();
|
||||
buf.iter()
|
||||
.filter(|e| e.timestamp_ms() > since_ms)
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EventBuffer {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the current Unix timestamp in milliseconds.
|
||||
fn now_ms() -> u64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.map(|d| d.as_millis() as u64)
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Spawn a background task that consumes [`WatcherEvent`] broadcasts and
|
||||
/// stores relevant events in `buffer`.
|
||||
///
|
||||
/// Only [`WatcherEvent::WorkItem`] (with a known `from_stage`),
|
||||
/// [`WatcherEvent::MergeFailure`], and [`WatcherEvent::StoryBlocked`]
|
||||
/// variants are stored. All other variants are silently ignored.
|
||||
pub fn subscribe_to_watcher(buffer: EventBuffer, mut rx: broadcast::Receiver<WatcherEvent>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(WatcherEvent::WorkItem {
|
||||
stage,
|
||||
item_id,
|
||||
from_stage,
|
||||
..
|
||||
}) => {
|
||||
// Only store genuine transitions (from_stage is known).
|
||||
if let Some(from) = from_stage {
|
||||
buffer.push(StoredEvent::StageTransition {
|
||||
story_id: item_id,
|
||||
from_stage: from,
|
||||
to_stage: stage,
|
||||
timestamp_ms: now_ms(),
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(WatcherEvent::MergeFailure { story_id, reason }) => {
|
||||
buffer.push(StoredEvent::MergeFailure {
|
||||
story_id,
|
||||
reason,
|
||||
timestamp_ms: now_ms(),
|
||||
});
|
||||
}
|
||||
Ok(WatcherEvent::StoryBlocked { story_id, reason }) => {
|
||||
buffer.push(StoredEvent::StoryBlocked {
|
||||
story_id,
|
||||
reason,
|
||||
timestamp_ms: now_ms(),
|
||||
});
|
||||
}
|
||||
Ok(_) => {} // Ignore all other event types.
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
crate::slog!("[events] Subscriber lagged, skipped {n} events");
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
crate::slog!("[events] Watcher channel closed; stopping event subscriber");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
use serde::Deserialize;
|
||||
|
||||
/// Query parameters for `GET /api/events`.
|
||||
#[derive(Deserialize)]
|
||||
@@ -181,7 +36,7 @@ pub fn events_handler(
|
||||
Query(params): Query<EventsQuery>,
|
||||
Data(buffer): Data<&EventBuffer>,
|
||||
) -> Response {
|
||||
let events = buffer.events_since(params.since);
|
||||
let events = crate::service::events::events_since(buffer, params.since);
|
||||
let body = serde_json::to_vec(&events).unwrap_or_else(|_| b"[]".to_vec());
|
||||
Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
|
||||
+10
-11
@@ -1,7 +1,13 @@
|
||||
//! Health check endpoint — returns a static "ok" response.
|
||||
//! Health check endpoint — thin HTTP adapter over `service::health`.
|
||||
//!
|
||||
//! Domain logic (the `HealthStatus` type and check function) lives in
|
||||
//! `service::health`; this module is a thin adapter: call service → shape
|
||||
//! response.
|
||||
|
||||
pub use crate::service::health::HealthStatus;
|
||||
|
||||
use poem::handler;
|
||||
use poem_openapi::{Object, OpenApi, Tags, payload::Json};
|
||||
use serde::Serialize;
|
||||
use poem_openapi::{OpenApi, Tags, payload::Json};
|
||||
|
||||
/// Health check endpoint.
|
||||
///
|
||||
@@ -16,11 +22,6 @@ enum HealthTags {
|
||||
Health,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Object)]
|
||||
pub struct HealthStatus {
|
||||
status: String,
|
||||
}
|
||||
|
||||
pub struct HealthApi;
|
||||
|
||||
#[OpenApi(tag = "HealthTags::Health")]
|
||||
@@ -30,9 +31,7 @@ impl HealthApi {
|
||||
/// Returns a JSON status object to confirm the server is running.
|
||||
#[oai(path = "/health", method = "get")]
|
||||
async fn health(&self) -> Json<HealthStatus> {
|
||||
Json(HealthStatus {
|
||||
status: "ok".to_string(),
|
||||
})
|
||||
Json(crate::service::health::check())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user