huskies: merge 617_story_split_gateway_into_service_and_transport

This commit is contained in:
dave
2026-04-24 18:39:16 +00:00
parent 271f8ea6a8
commit 360bca45c8
12 changed files with 3016 additions and 2877 deletions
+190 -2854
View File
File diff suppressed because it is too large Load Diff
+3 -1
View File
@@ -8,7 +8,9 @@
//! Domain logic lives in `service::events`; this module is a thin HTTP
//! adapter: extract query params → call service → shape response.
pub use crate::service::events::{EventBuffer, StoredEvent, subscribe_to_watcher};
#[cfg(test)]
pub use crate::service::events::StoredEvent;
pub use crate::service::events::{EventBuffer, subscribe_to_watcher};
// MAX_BUFFER_SIZE is used in tests via `use super::*`.
#[cfg(test)]
pub use crate::service::events::MAX_BUFFER_SIZE;
File diff suppressed because it is too large Load Diff
+1
View File
@@ -18,6 +18,7 @@ pub mod settings;
pub(crate) mod test_helpers;
pub mod workflow;
pub mod gateway;
pub mod project;
pub mod wizard;
pub mod ws;
+136
View File
@@ -0,0 +1,136 @@
//! Gateway aggregation — pure functions for cross-project pipeline status.
//!
//! Formats aggregated pipeline data into compact text suitable for chat
//! transports (Matrix, Slack). Uses `service::pipeline::aggregate_pipeline_counts`
//! for per-project parsing.
use serde_json::Value;
use std::collections::BTreeMap;
/// Format an aggregated status map as a compact, one-line-per-project string
/// suitable for Matrix/Slack messages.
///
/// Healthy projects: `🟢 **name** — B:5 C:2 Q:1 M:0 D:12`
/// Blocked items appended on the same line: `| blocked: 42 [story]`
/// Unreachable projects: `🔴 **name** — UNREACHABLE`
pub fn format_aggregate_status_compact(statuses: &BTreeMap<String, Value>) -> String {
let mut lines: Vec<String> = Vec::new();
for (name, status) in statuses {
if let Some(err) = status.get("error").and_then(|e| e.as_str()) {
lines.push(format!("\u{1F534} **{name}** — UNREACHABLE: {err}"));
} else {
let counts = status.get("counts");
let b = counts
.and_then(|c| c.get("backlog"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let c = counts
.and_then(|c| c.get("current"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let q = counts
.and_then(|c| c.get("qa"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let m = counts
.and_then(|c| c.get("merge"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let d = counts
.and_then(|c| c.get("done"))
.and_then(|n| n.as_u64())
.unwrap_or(0);
let blocked_arr = status
.get("blocked")
.and_then(|a| a.as_array())
.cloned()
.unwrap_or_default();
let indicator = if blocked_arr.is_empty() {
"\u{1F7E2}" // 🟢
} else {
"\u{1F7E0}" // 🟠
};
let mut line = format!("{indicator} **{name}** — B:{b} C:{c} Q:{q} M:{m} D:{d}");
if !blocked_arr.is_empty() {
let ids: Vec<String> = blocked_arr
.iter()
.filter_map(|item| item.get("story_id").and_then(|s| s.as_str()))
.map(|s| s.to_string())
.collect();
line.push_str(&format!(" | blocked: {}", ids.join(", ")));
}
lines.push(line);
}
}
if lines.is_empty() {
return "No projects registered.".to_string();
}
format!("**All Projects**\n\n{}", lines.join("\n\n"))
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn format_healthy_project() {
let mut statuses = BTreeMap::new();
statuses.insert(
"huskies".to_string(),
json!({
"counts": { "backlog": 5, "current": 2, "qa": 1, "merge": 0, "done": 12 },
"blocked": []
}),
);
let output = format_aggregate_status_compact(&statuses);
assert!(output.contains("huskies"));
assert!(output.contains("B:5"));
assert!(output.contains("C:2"));
assert!(output.contains("Q:1"));
assert!(output.contains("D:12"));
assert!(!output.contains("blocked:"));
}
#[test]
fn format_unreachable_project() {
let mut statuses = BTreeMap::new();
statuses.insert(
"broken".to_string(),
json!({ "error": "connection refused" }),
);
let output = format_aggregate_status_compact(&statuses);
assert!(output.contains("broken"));
assert!(output.contains("UNREACHABLE"));
assert!(output.contains("connection refused"));
}
#[test]
fn format_blocked_items_shown() {
let mut statuses = BTreeMap::new();
statuses.insert(
"myproj".to_string(),
json!({
"counts": { "backlog": 0, "current": 1, "qa": 0, "merge": 0, "done": 0 },
"blocked": [{ "story_id": "42_story_x", "name": "X", "stage": "current", "reason": "blocked" }]
}),
);
let output = format_aggregate_status_compact(&statuses);
assert!(output.contains("blocked:"));
assert!(output.contains("42_story_x"));
}
#[test]
fn format_empty_projects() {
let statuses = BTreeMap::new();
let output = format_aggregate_status_compact(&statuses);
assert_eq!(output, "No projects registered.");
}
}
+191
View File
@@ -0,0 +1,191 @@
//! Gateway configuration types — pure parsing and validation.
//!
//! Contains `ProjectEntry`, `GatewayConfig`, and validation logic.
//! All filesystem I/O (loading from disk) lives in `io.rs`.
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
/// A single project entry in `projects.toml`.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ProjectEntry {
/// Base URL of the project's huskies container (e.g. `http://localhost:3001`).
pub url: String,
}
/// Top-level `projects.toml` config.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GatewayConfig {
/// Map of project name → container URL.
#[serde(default)]
pub projects: BTreeMap<String, ProjectEntry>,
}
/// Validate that a gateway config has at least one project.
///
/// Returns the name of the first project (alphabetically) on success,
/// or an error message if the config is empty.
pub fn validate_config(config: &GatewayConfig) -> Result<String, String> {
if config.projects.is_empty() {
return Err("projects.toml must define at least one project".to_string());
}
Ok(config.projects.keys().next().unwrap().clone())
}
/// Validate that a project name exists in the given project map.
///
/// Returns the project's URL on success.
pub fn validate_project_exists(
projects: &BTreeMap<String, ProjectEntry>,
name: &str,
) -> Result<String, String> {
projects.get(name).map(|p| p.url.clone()).ok_or_else(|| {
let available: Vec<&str> = projects.keys().map(|s| s.as_str()).collect();
format!(
"unknown project '{name}'. Available: {}",
available.join(", ")
)
})
}
/// Escape a string as a TOML quoted string.
pub fn toml_string(s: &str) -> String {
format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))
}
/// Serialize a `bot.toml` content string from the given fields.
pub fn serialize_bot_config(
transport: &str,
homeserver: Option<&str>,
username: Option<&str>,
password: Option<&str>,
slack_bot_token: Option<&str>,
slack_signing_secret: Option<&str>,
) -> String {
match transport {
"slack" => {
format!(
"enabled = true\ntransport = \"slack\"\n\nslack_bot_token = {}\nslack_signing_secret = {}\nslack_channel_ids = []\n",
toml_string(slack_bot_token.unwrap_or("")),
toml_string(slack_signing_secret.unwrap_or("")),
)
}
_ => {
format!(
"enabled = true\ntransport = \"matrix\"\n\nhomeserver = {}\nusername = {}\npassword = {}\nroom_ids = []\nallowed_users = []\n",
toml_string(homeserver.unwrap_or("")),
toml_string(username.unwrap_or("")),
toml_string(password.unwrap_or("")),
)
}
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_valid_projects_toml() {
let toml_str = r#"
[projects.huskies]
url = "http://localhost:3001"
[projects.robot-studio]
url = "http://localhost:3002"
"#;
let config: GatewayConfig = toml::from_str(toml_str).unwrap();
assert_eq!(config.projects.len(), 2);
assert_eq!(config.projects["huskies"].url, "http://localhost:3001");
assert_eq!(config.projects["robot-studio"].url, "http://localhost:3002");
}
#[test]
fn parse_empty_projects_toml() {
let toml_str = "[projects]\n";
let config: GatewayConfig = toml::from_str(toml_str).unwrap();
assert!(config.projects.is_empty());
}
#[test]
fn validate_config_rejects_empty() {
let config = GatewayConfig {
projects: BTreeMap::new(),
};
assert!(validate_config(&config).is_err());
}
#[test]
fn validate_config_returns_first_project_name() {
let mut projects = BTreeMap::new();
projects.insert(
"beta".into(),
ProjectEntry {
url: "http://b".into(),
},
);
projects.insert(
"alpha".into(),
ProjectEntry {
url: "http://a".into(),
},
);
let config = GatewayConfig { projects };
assert_eq!(validate_config(&config).unwrap(), "alpha");
}
#[test]
fn validate_project_exists_succeeds() {
let mut projects = BTreeMap::new();
projects.insert(
"p1".into(),
ProjectEntry {
url: "http://p1".into(),
},
);
assert_eq!(
validate_project_exists(&projects, "p1").unwrap(),
"http://p1"
);
}
#[test]
fn validate_project_exists_fails() {
let projects = BTreeMap::new();
assert!(validate_project_exists(&projects, "missing").is_err());
}
#[test]
fn toml_string_escapes_quotes() {
assert_eq!(toml_string(r#"a"b"#), r#""a\"b""#);
}
#[test]
fn toml_string_escapes_backslashes() {
assert_eq!(toml_string(r"a\b"), r#""a\\b""#);
}
#[test]
fn serialize_bot_config_matrix() {
let content = serialize_bot_config(
"matrix",
Some("https://mx.io"),
Some("@bot:mx.io"),
Some("pass"),
None,
None,
);
assert!(content.contains("transport = \"matrix\""));
assert!(content.contains("homeserver = \"https://mx.io\""));
}
#[test]
fn serialize_bot_config_slack() {
let content =
serialize_bot_config("slack", None, None, None, Some("xoxb-123"), Some("secret"));
assert!(content.contains("transport = \"slack\""));
assert!(content.contains("slack_bot_token = \"xoxb-123\""));
}
}
+407
View File
@@ -0,0 +1,407 @@
//! Gateway I/O — the ONLY place in `service/gateway/` that may perform side effects.
//!
//! Side effects here include: reading/writing config and agent state files,
//! HTTP requests to project containers (proxying, health checks, polling),
//! spawning the Matrix bot task, and the notification poller background task.
use super::config::{GatewayConfig, ProjectEntry};
use super::registration::JoinedAgent;
pub use reqwest::Client;
use serde_json::{Value, json};
use std::collections::{BTreeMap, HashMap};
use std::path::Path;
// ── Config I/O ───────────────────────────────────────────────────────────────
/// Load gateway config from a `projects.toml` file.
pub fn load_config(path: &Path) -> Result<GatewayConfig, String> {
let contents = std::fs::read_to_string(path)
.map_err(|e| format!("cannot read {}: {e}", path.display()))?;
toml::from_str(&contents).map_err(|e| format!("invalid projects.toml: {e}"))
}
/// Load persisted agents from `<config_dir>/gateway_agents.json`.
/// Returns an empty list if the file does not exist or cannot be parsed.
pub fn load_agents(config_dir: &Path) -> Vec<JoinedAgent> {
let path = config_dir.join("gateway_agents.json");
match std::fs::read(&path) {
Ok(data) => serde_json::from_slice(&data).unwrap_or_default(),
Err(_) => Vec::new(),
}
}
/// Persist the current projects map to `<config_dir>/projects.toml`.
/// Silently ignores write errors or skips when `config_dir` is empty.
pub async fn save_config(projects: &BTreeMap<String, ProjectEntry>, config_dir: &Path) {
if config_dir.as_os_str().is_empty() {
return;
}
let path = config_dir.join("projects.toml");
let config = GatewayConfig {
projects: projects.clone(),
};
if let Ok(data) = toml::to_string_pretty(&config) {
let _ = tokio::fs::write(&path, data).await;
}
}
/// Persist the current agent list to `<config_dir>/gateway_agents.json`.
/// Silently ignores write errors.
pub async fn save_agents(agents: &[JoinedAgent], config_dir: &Path) {
if config_dir == Path::new("") {
return;
}
let path = config_dir.join("gateway_agents.json");
if let Ok(data) = serde_json::to_vec_pretty(agents) {
let _ = tokio::fs::write(&path, data).await;
}
}
// ── Bot config I/O ──────────────────────────────────────────────────────────
/// Read the current raw bot.toml as key/value pairs for the configuration UI.
/// Returns `None` values if the file does not exist.
pub fn read_bot_config_raw(config_dir: &Path) -> BotConfigFields {
let path = config_dir.join(".huskies").join("bot.toml");
let content = match std::fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => return BotConfigFields::default(),
};
let table: toml::Value = match toml::from_str(&content) {
Ok(v) => v,
Err(_) => return BotConfigFields::default(),
};
let s = |key: &str| -> Option<String> {
table
.get(key)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
};
BotConfigFields {
transport: s("transport").unwrap_or_else(|| "matrix".to_string()),
homeserver: s("homeserver"),
username: s("username"),
password: s("password"),
slack_bot_token: s("slack_bot_token"),
slack_signing_secret: s("slack_signing_secret"),
}
}
/// Raw bot.toml fields for the configuration UI.
#[derive(Default)]
pub struct BotConfigFields {
pub transport: String,
pub homeserver: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub slack_bot_token: Option<String>,
pub slack_signing_secret: Option<String>,
}
/// Write a `bot.toml` from the given content string.
pub fn write_bot_config(config_dir: &Path, content: &str) -> Result<(), String> {
let huskies_dir = config_dir.join(".huskies");
std::fs::create_dir_all(&huskies_dir)
.map_err(|e| format!("cannot create .huskies dir: {e}"))?;
let path = huskies_dir.join("bot.toml");
std::fs::write(&path, content).map_err(|e| format!("cannot write bot.toml: {e}"))
}
// ── MCP proxy I/O ───────────────────────────────────────────────────────────
/// Proxy a raw MCP request body to the given project URL.
pub async fn proxy_mcp_call(
client: &Client,
base_url: &str,
request_bytes: &[u8],
) -> Result<Vec<u8>, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
let resp = client
.post(&mcp_url)
.header("Content-Type", "application/json")
.body(request_bytes.to_vec())
.send()
.await
.map_err(|e| format!("failed to reach {mcp_url}: {e}"))?;
resp.bytes()
.await
.map(|b| b.to_vec())
.map_err(|e| format!("failed to read response from {mcp_url}: {e}"))
}
/// Fetch tools/list from a project's MCP endpoint.
pub async fn fetch_tools_list(client: &Client, base_url: &str) -> Result<Value, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
});
let resp = client
.post(&mcp_url)
.json(&rpc_body)
.send()
.await
.map_err(|e| format!("failed to reach {mcp_url}: {e}"))?;
resp.json()
.await
.map_err(|e| format!("invalid JSON from upstream: {e}"))
}
/// Fetch and aggregate pipeline status for a single project URL.
pub async fn fetch_one_project_pipeline_status(url: &str, client: &Client) -> Value {
let mcp_url = format!("{}/mcp", url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_pipeline_status",
"arguments": {}
}
});
match client.post(&mcp_url).json(&rpc_body).send().await {
Ok(resp) => match resp.json::<Value>().await {
Ok(upstream) => {
if let Some(text) = upstream
.get("result")
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
{
match serde_json::from_str::<Value>(text) {
Ok(pipeline) => {
crate::service::pipeline::aggregate_pipeline_counts(&pipeline)
}
Err(_) => json!({ "error": "invalid pipeline JSON" }),
}
} else {
json!({ "error": "unexpected response shape" })
}
}
Err(e) => json!({ "error": format!("invalid response: {e}") }),
},
Err(e) => json!({ "error": format!("unreachable: {e}") }),
}
}
/// Fetch `get_pipeline_status` from every registered project URL in parallel.
pub async fn fetch_all_project_pipeline_statuses(
project_urls: &BTreeMap<String, String>,
client: &Client,
) -> BTreeMap<String, Value> {
use futures::future::join_all;
let futures: Vec<_> = project_urls
.iter()
.map(|(name, url)| {
let name = name.clone();
let url = url.clone();
let client = client.clone();
async move {
let result = fetch_one_project_pipeline_status(&url, &client).await;
(name, result)
}
})
.collect();
join_all(futures).await.into_iter().collect()
}
/// Fetch the pipeline status from a single project for the `gateway_status` tool.
pub async fn fetch_pipeline_status_for_project(
client: &Client,
base_url: &str,
) -> Result<Value, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_pipeline_status",
"arguments": {}
}
});
let resp = client
.post(&mcp_url)
.json(&rpc_body)
.send()
.await
.map_err(|e| format!("failed to reach {mcp_url}: {e}"))?;
resp.json()
.await
.map_err(|e| format!("invalid upstream response: {e}"))
}
/// Check health of a single project URL.
pub async fn check_project_health(client: &Client, base_url: &str) -> Result<bool, String> {
let health_url = format!("{}/health", base_url.trim_end_matches('/'));
match client.get(&health_url).send().await {
Ok(resp) => Ok(resp.status().is_success()),
Err(e) => Err(format!("unreachable: {e}")),
}
}
// ── Gateway MCP JSON ────────────────────────────────────────────────────────
/// Write (or overwrite) a `.mcp.json` in `config_dir` that points Claude Code
/// CLI at the gateway's own `/mcp` endpoint.
pub fn write_gateway_mcp_json(config_dir: &Path, port: u16) -> Result<(), std::io::Error> {
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let url = format!("http://{host}:{port}/mcp");
let content = json!({
"mcpServers": {
"huskies": {
"type": "http",
"url": url
}
}
});
let path = config_dir.join(".mcp.json");
std::fs::write(&path, serde_json::to_string_pretty(&content).unwrap())?;
crate::slog!("[gateway] Wrote {} pointing to {}", path.display(), url);
Ok(())
}
// ── Init project I/O ────────────────────────────────────────────────────────
/// Check if a path already has a `.huskies/` directory.
pub fn has_huskies_dir(path: &Path) -> bool {
path.join(".huskies").exists()
}
/// Create a directory (and parents) if it does not exist.
pub fn ensure_directory(path: &Path) -> Result<(), String> {
if !path.exists() {
std::fs::create_dir_all(path)
.map_err(|e| format!("failed to create directory '{}': {e}", path.display()))?;
}
Ok(())
}
/// Scaffold a huskies project at the given path.
pub fn scaffold_project(path: &Path) -> Result<(), String> {
crate::io::fs::scaffold::scaffold_story_kit(path, 3001)
}
/// Initialise wizard state at the given path.
pub fn init_wizard_state(path: &Path) {
crate::io::wizard::WizardState::init_if_missing(path);
}
// ── Notification poller ─────────────────────────────────────────────────────
/// Spawn a background task that polls events from all project servers.
pub fn spawn_gateway_notification_poller(
transport: std::sync::Arc<dyn crate::chat::ChatTransport>,
room_ids: Vec<String>,
project_urls: BTreeMap<String, String>,
poll_interval_secs: u64,
) {
tokio::spawn(async move {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_else(|_| Client::new());
let interval = std::time::Duration::from_secs(poll_interval_secs.max(1));
let mut last_ts: HashMap<String, u64> = project_urls
.keys()
.map(|name| (name.clone(), 0u64))
.collect();
loop {
for (project_name, base_url) in &project_urls {
let since = last_ts.get(project_name).copied().unwrap_or(0);
let url = format!("{base_url}/api/events?since={since}");
let response = match client.get(&url).send().await {
Ok(r) => r,
Err(e) => {
crate::slog!(
"[gateway-poller] {project_name}: unreachable ({e}); skipping"
);
continue;
}
};
let events: Vec<crate::service::events::StoredEvent> = match response.json().await {
Ok(v) => v,
Err(e) => {
crate::slog!(
"[gateway-poller] {project_name}: failed to parse events: {e}"
);
continue;
}
};
for event in &events {
let ts = event.timestamp_ms();
if ts > *last_ts.get(project_name).unwrap_or(&0) {
last_ts.insert(project_name.clone(), ts);
}
let (plain, html) = super::polling::format_gateway_event(project_name, event);
for room_id in &room_ids {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
crate::slog!(
"[gateway-poller] Failed to send notification to {room_id}: {e}"
);
}
}
}
}
tokio::time::sleep(interval).await;
}
});
}
// ── Gateway bot spawn ───────────────────────────────────────────────────────
/// Re-export type alias for the active project lock.
pub type ActiveProject = std::sync::Arc<tokio::sync::RwLock<String>>;
/// Attempt to spawn the Matrix bot against the gateway config directory.
pub fn spawn_gateway_bot(
config_dir: &Path,
active_project: ActiveProject,
gateway_projects: Vec<String>,
gateway_project_urls: BTreeMap<String, String>,
port: u16,
) -> Option<tokio::task::AbortHandle> {
use crate::agents::AgentPool;
use tokio::sync::{broadcast, mpsc};
let (watcher_tx, _) = broadcast::channel(16);
let (_perm_tx, perm_rx) = mpsc::unbounded_channel();
let perm_rx = std::sync::Arc::new(tokio::sync::Mutex::new(perm_rx));
let (shutdown_tx, shutdown_rx) =
tokio::sync::watch::channel::<Option<crate::rebuild::ShutdownReason>>(None);
std::mem::forget(shutdown_tx);
let agents = std::sync::Arc::new(AgentPool::new(port, watcher_tx.clone()));
crate::chat::transport::matrix::spawn_bot(
config_dir,
watcher_tx,
perm_rx,
agents,
shutdown_rx,
Some(active_project),
gateway_projects,
gateway_project_urls,
)
}
+580
View File
@@ -0,0 +1,580 @@
//! Gateway service — domain logic for the multi-project gateway.
//!
//! Follows the conventions in `docs/architecture/service-modules.md`:
//! - `mod.rs` (this file) — public API, typed [`Error`], orchestration, `GatewayState`
//! - `io.rs` — the ONLY place that performs side effects (filesystem, network, process spawn)
//! - `config.rs` — pure config types and validation
//! - `registration.rs` — pure agent registration logic
//! - `aggregation.rs` — pure cross-project pipeline formatting
//! - `polling.rs` — pure notification event formatting
pub mod aggregation;
pub mod config;
pub(crate) mod io;
pub mod polling;
pub mod registration;
pub use aggregation::format_aggregate_status_compact;
pub use config::{GatewayConfig, ProjectEntry};
pub use io::{fetch_all_project_pipeline_statuses, spawn_gateway_notification_poller};
pub use registration::JoinedAgent;
use io::Client;
use std::collections::{BTreeMap, HashMap};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex as TokioMutex;
use tokio::sync::RwLock;
// ── Error type ──────────────────────────────────────────────────────────────
/// Typed errors returned by `service::gateway` functions.
///
/// HTTP handlers map these to appropriate status codes:
/// - [`Error::ProjectNotFound`] → 404 Not Found
/// - [`Error::UnreachableProject`] → 502 Bad Gateway
/// - [`Error::DuplicateToken`] → 409 Conflict
/// - [`Error::InvalidAgent`] → 404 Not Found / 400 Bad Request
/// - [`Error::Config`] → 400 Bad Request
/// - [`Error::Upstream`] → 502 Bad Gateway
#[derive(Debug)]
pub enum Error {
/// A referenced project does not exist in the gateway config.
ProjectNotFound(String),
/// A project container is unreachable.
UnreachableProject(String),
/// A join token has already been consumed or a project name is taken.
DuplicateToken(String),
/// An agent ID is invalid or not found.
InvalidAgent(String),
/// A configuration value is invalid.
Config(String),
/// An upstream project container returned an unexpected response.
Upstream(String),
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ProjectNotFound(msg) => write!(f, "Project not found: {msg}"),
Self::UnreachableProject(msg) => write!(f, "Unreachable project: {msg}"),
Self::DuplicateToken(msg) => write!(f, "Duplicate token: {msg}"),
Self::InvalidAgent(msg) => write!(f, "Invalid agent: {msg}"),
Self::Config(msg) => write!(f, "Config error: {msg}"),
Self::Upstream(msg) => write!(f, "Upstream error: {msg}"),
}
}
}
// ── Gateway state ───────────────────────────────────────────────────────────
/// A one-time join token that has been generated but not yet consumed.
pub(crate) struct PendingToken {
#[allow(dead_code)]
pub(crate) created_at: f64,
}
/// Shared gateway state threaded through HTTP handlers.
#[derive(Clone)]
pub struct GatewayState {
/// The live set of registered projects (initially loaded from `projects.toml`).
pub projects: Arc<RwLock<BTreeMap<String, ProjectEntry>>>,
/// The currently active project name.
pub active_project: Arc<RwLock<String>>,
/// HTTP client for proxying requests to project containers.
pub client: Client,
/// Build agents that have joined this gateway.
pub joined_agents: Arc<RwLock<Vec<JoinedAgent>>>,
/// One-time join tokens that have been issued but not yet consumed.
pub(crate) pending_tokens: Arc<RwLock<HashMap<String, PendingToken>>>,
/// Directory containing `projects.toml` and the `.huskies/` subfolder.
pub config_dir: PathBuf,
/// HTTP port the gateway is listening on.
pub port: u16,
/// Abort handle for the running Matrix bot task (if any).
pub bot_handle: Arc<TokioMutex<Option<tokio::task::AbortHandle>>>,
}
impl GatewayState {
/// Create a new gateway state from a config and config directory.
///
/// The first project in the config becomes the active project by default.
/// Previously registered agents are loaded from `gateway_agents.json`.
pub fn new(
gateway_config: GatewayConfig,
config_dir: PathBuf,
port: u16,
) -> Result<Self, String> {
let first = config::validate_config(&gateway_config)?;
let agents = io::load_agents(&config_dir);
Ok(Self {
projects: Arc::new(RwLock::new(gateway_config.projects)),
active_project: Arc::new(RwLock::new(first)),
client: Client::new(),
joined_agents: Arc::new(RwLock::new(agents)),
pending_tokens: Arc::new(RwLock::new(HashMap::new())),
config_dir,
port,
bot_handle: Arc::new(TokioMutex::new(None)),
})
}
/// Get the URL of the currently active project.
pub async fn active_url(&self) -> Result<String, Error> {
let name = self.active_project.read().await.clone();
self.projects
.read()
.await
.get(&name)
.map(|p| p.url.clone())
.ok_or_else(|| {
Error::ProjectNotFound(format!("active project '{name}' not found in config"))
})
}
}
// ── Public API ──────────────────────────────────────────────────────────────
/// Switch the active project. Returns the project's URL on success.
pub async fn switch_project(state: &GatewayState, project: &str) -> Result<String, Error> {
if project.is_empty() {
return Err(Error::Config("missing required parameter: project".into()));
}
let url = {
let projects = state.projects.read().await;
config::validate_project_exists(&projects, project).map_err(Error::ProjectNotFound)?
};
*state.active_project.write().await = project.to_string();
Ok(url)
}
/// Generate a one-time join token. Returns the token string.
pub async fn generate_join_token(state: &GatewayState) -> String {
let token = uuid::Uuid::new_v4().to_string();
let now = chrono::Utc::now().timestamp() as f64;
state
.pending_tokens
.write()
.await
.insert(token.clone(), PendingToken { created_at: now });
crate::slog!("[gateway] Generated join token {:.8}…", &token);
token
}
/// Register a build agent with a join token.
pub async fn register_agent(
state: &GatewayState,
token: &str,
label: String,
address: String,
) -> Result<JoinedAgent, Error> {
// Validate and consume the token.
let mut tokens = state.pending_tokens.write().await;
if !tokens.contains_key(token) {
return Err(Error::DuplicateToken(
"invalid or already-used join token".into(),
));
}
tokens.remove(token);
drop(tokens);
let now = chrono::Utc::now().timestamp() as f64;
let agent = registration::create_agent(uuid::Uuid::new_v4().to_string(), label, address, now);
crate::slog!(
"[gateway] Agent '{}' registered (id={})",
agent.label,
agent.id
);
{
let mut agents = state.joined_agents.write().await;
agents.push(agent.clone());
io::save_agents(&agents, &state.config_dir).await;
}
Ok(agent)
}
/// Remove a registered agent by ID. Returns `true` if found and removed.
pub async fn remove_agent(state: &GatewayState, id: &str) -> bool {
let mut agents = state.joined_agents.write().await;
let removed = registration::remove_agent(&mut agents, id);
if removed {
io::save_agents(&agents, &state.config_dir).await;
crate::slog!("[gateway] Removed agent id={id}");
}
removed
}
/// Assign or unassign an agent to a project.
pub async fn assign_agent(
state: &GatewayState,
id: &str,
project: Option<String>,
) -> Result<JoinedAgent, Error> {
let project_clean = project.and_then(|p| if p.is_empty() { None } else { Some(p) });
let updated = {
let projects = state.projects.read().await;
let mut agents = state.joined_agents.write().await;
registration::assign_agent(&mut agents, id, project_clean, &projects)?
};
crate::slog!(
"[gateway] Agent '{}' (id={}) assigned to {:?}",
updated.label,
updated.id,
updated.assigned_project
);
let agents = state.joined_agents.read().await.clone();
io::save_agents(&agents, &state.config_dir).await;
Ok(updated)
}
/// Update an agent's heartbeat. Returns `true` if found.
pub async fn heartbeat_agent(state: &GatewayState, id: &str) -> bool {
let now = chrono::Utc::now().timestamp() as f64;
let mut agents = state.joined_agents.write().await;
registration::heartbeat(&mut agents, id, now)
}
/// Add a new project to the gateway config.
pub async fn add_project(state: &GatewayState, name: &str, url: &str) -> Result<(), Error> {
let name = name.trim().to_string();
let url = url.trim().to_string();
if name.is_empty() {
return Err(Error::Config("project name must not be empty".into()));
}
if url.is_empty() {
return Err(Error::Config("project url must not be empty".into()));
}
{
let mut projects = state.projects.write().await;
if projects.contains_key(&name) {
return Err(Error::DuplicateToken(format!(
"project '{name}' already exists"
)));
}
projects.insert(name.clone(), ProjectEntry { url: url.clone() });
}
let snapshot = state.projects.read().await.clone();
io::save_config(&snapshot, &state.config_dir).await;
crate::slog!("[gateway] Added project '{name}' ({url})");
Ok(())
}
/// Remove a project from the gateway config.
pub async fn remove_project(state: &GatewayState, name: &str) -> Result<(), Error> {
let active = state.active_project.read().await.clone();
{
let mut projects = state.projects.write().await;
if !projects.contains_key(name) {
return Err(Error::ProjectNotFound(format!(
"project '{name}' not found"
)));
}
if projects.len() == 1 {
return Err(Error::Config("cannot remove the last project".into()));
}
projects.remove(name);
}
let snapshot = state.projects.read().await.clone();
io::save_config(&snapshot, &state.config_dir).await;
// If the removed project was active, switch to the first remaining.
if active == name {
let first = state.projects.read().await.keys().next().cloned();
if let Some(new_active) = first {
*state.active_project.write().await = new_active;
}
}
crate::slog!("[gateway] Removed project '{name}'");
Ok(())
}
/// Initialise a new huskies project at the given path.
///
/// Optionally registers the project in the gateway's project map.
pub async fn init_project(
state: &GatewayState,
path_str: &str,
name: Option<&str>,
url: Option<&str>,
) -> Result<Option<String>, Error> {
let path_str = path_str.trim();
if path_str.is_empty() {
return Err(Error::Config("missing required parameter: path".into()));
}
let project_path = std::path::Path::new(path_str);
if io::has_huskies_dir(project_path) {
return Err(Error::Config(format!(
"path '{}' is already a huskies project (.huskies/ exists). \
Use wizard_status to check setup progress.",
project_path.display()
)));
}
io::ensure_directory(project_path).map_err(Error::Config)?;
io::scaffold_project(project_path)
.map_err(|e| Error::Config(format!("scaffold failed: {e}")))?;
io::init_wizard_state(project_path);
// Optionally register in projects.toml.
let registered_name: Option<String> = match (name, url) {
(Some(n), Some(u)) if !n.trim().is_empty() && !u.trim().is_empty() => {
let n = n.trim();
let u = u.trim();
let mut projects = state.projects.write().await;
if projects.contains_key(n) {
return Err(Error::DuplicateToken(format!(
"project '{n}' is already registered. Choose a different name or use switch_project."
)));
}
projects.insert(n.to_string(), ProjectEntry { url: u.to_string() });
io::save_config(&projects, &state.config_dir).await;
crate::slog!("[gateway] init_project: registered '{n}' ({u})");
Some(n.to_string())
}
_ => None,
};
Ok(registered_name)
}
/// Fetch aggregated health status across all projects.
pub async fn health_check_all(state: &GatewayState) -> (bool, BTreeMap<String, &'static str>) {
let mut all_healthy = true;
let mut statuses = BTreeMap::new();
let project_entries: Vec<(String, String)> = state
.projects
.read()
.await
.iter()
.map(|(n, e)| (n.clone(), e.url.clone()))
.collect();
for (name, url) in &project_entries {
let healthy = io::check_project_health(&state.client, url)
.await
.unwrap_or(false);
if !healthy {
all_healthy = false;
}
statuses.insert(name.clone(), if healthy { "ok" } else { "error" });
}
(all_healthy, statuses)
}
/// Save bot config and restart the bot.
pub async fn save_bot_config_and_restart(state: &GatewayState, content: &str) -> Result<(), Error> {
io::write_bot_config(&state.config_dir, content).map_err(Error::Config)?;
// Abort existing bot task and spawn a fresh one.
{
let mut handle = state.bot_handle.lock().await;
if let Some(h) = handle.take() {
h.abort();
}
let gateway_projects: Vec<String> = state.projects.read().await.keys().cloned().collect();
let gateway_project_urls: BTreeMap<String, String> = state
.projects
.read()
.await
.iter()
.map(|(name, entry)| (name.clone(), entry.url.clone()))
.collect();
let new_handle = io::spawn_gateway_bot(
&state.config_dir,
Arc::clone(&state.active_project),
gateway_projects,
gateway_project_urls,
state.port,
);
*handle = new_handle;
}
crate::slog!("[gateway] Bot configuration saved; bot restarted");
Ok(())
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
fn make_config(names: &[(&str, &str)]) -> GatewayConfig {
let mut projects = BTreeMap::new();
for (name, url) in names {
projects.insert(
name.to_string(),
ProjectEntry {
url: url.to_string(),
},
);
}
GatewayConfig { projects }
}
#[test]
fn gateway_state_rejects_empty_config() {
let config = GatewayConfig {
projects: BTreeMap::new(),
};
assert!(GatewayState::new(config, PathBuf::from("."), 3000).is_err());
}
#[test]
fn gateway_state_sets_first_project_active() {
let config = make_config(&[("alpha", "http://a:3001"), ("beta", "http://b:3002")]);
let state = GatewayState::new(config, PathBuf::from("."), 3000).unwrap();
let active = state.active_project.blocking_read().clone();
assert_eq!(active, "alpha");
}
#[tokio::test]
async fn switch_project_to_known_project() {
let config = make_config(&[("alpha", "http://a:3001"), ("beta", "http://b:3002")]);
let state = GatewayState::new(config, PathBuf::from("."), 3000).unwrap();
let url = switch_project(&state, "beta").await.unwrap();
assert_eq!(url, "http://b:3002");
assert_eq!(*state.active_project.read().await, "beta");
}
#[tokio::test]
async fn switch_project_to_unknown_fails() {
let config = make_config(&[("alpha", "http://a:3001")]);
let state = GatewayState::new(config, PathBuf::from("."), 3000).unwrap();
assert!(switch_project(&state, "nonexistent").await.is_err());
}
#[tokio::test]
async fn switch_project_empty_name_fails() {
let config = make_config(&[("alpha", "http://a:3001")]);
let state = GatewayState::new(config, PathBuf::from("."), 3000).unwrap();
assert!(switch_project(&state, "").await.is_err());
}
#[tokio::test]
async fn active_url_returns_correct_url() {
let config = make_config(&[("myproj", "http://my:3001")]);
let state = GatewayState::new(config, PathBuf::from("."), 3000).unwrap();
let url = state.active_url().await.unwrap();
assert_eq!(url, "http://my:3001");
}
#[test]
fn error_display_variants() {
assert!(
Error::ProjectNotFound("x".into())
.to_string()
.contains("Project not found")
);
assert!(
Error::UnreachableProject("x".into())
.to_string()
.contains("Unreachable")
);
assert!(
Error::DuplicateToken("x".into())
.to_string()
.contains("Duplicate")
);
assert!(
Error::InvalidAgent("x".into())
.to_string()
.contains("Invalid agent")
);
assert!(
Error::Config("x".into())
.to_string()
.contains("Config error")
);
assert!(Error::Upstream("x".into()).to_string().contains("Upstream"));
}
#[tokio::test]
async fn generate_and_register_agent() {
let config = make_config(&[("test", "http://test:3001")]);
let state = GatewayState::new(config, PathBuf::new(), 3000).unwrap();
let token = generate_join_token(&state).await;
let agent = register_agent(&state, &token, "test-agent".into(), "ws://a".into())
.await
.unwrap();
assert_eq!(agent.label, "test-agent");
assert!(state.pending_tokens.read().await.is_empty());
assert_eq!(state.joined_agents.read().await.len(), 1);
}
#[tokio::test]
async fn register_agent_invalid_token_fails() {
let config = make_config(&[("test", "http://test:3001")]);
let state = GatewayState::new(config, PathBuf::new(), 3000).unwrap();
let result = register_agent(&state, "bad-token", "a".into(), "ws://a".into()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn remove_agent_success() {
let config = make_config(&[("test", "http://test:3001")]);
let state = GatewayState::new(config, PathBuf::new(), 3000).unwrap();
let token = generate_join_token(&state).await;
let agent = register_agent(&state, &token, "a".into(), "ws://a".into())
.await
.unwrap();
assert!(remove_agent(&state, &agent.id).await);
assert!(state.joined_agents.read().await.is_empty());
}
#[tokio::test]
async fn heartbeat_agent_updates_timestamp() {
let config = make_config(&[("test", "http://test:3001")]);
let state = GatewayState::new(config, PathBuf::new(), 3000).unwrap();
let token = generate_join_token(&state).await;
let agent = register_agent(&state, &token, "a".into(), "ws://a".into())
.await
.unwrap();
let old_ts = agent.last_seen;
// Small sleep to ensure timestamp differs.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
assert!(heartbeat_agent(&state, &agent.id).await);
let agents = state.joined_agents.read().await;
assert!(agents[0].last_seen >= old_ts);
}
#[tokio::test]
async fn init_project_scaffolds_directory() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(&[("test", "http://test:3001")]);
let state = GatewayState::new(config, PathBuf::new(), 3000).unwrap();
let result = init_project(&state, dir.path().to_str().unwrap(), None, None).await;
assert!(result.is_ok());
assert!(dir.path().join(".huskies").exists());
}
#[tokio::test]
async fn init_project_already_exists_fails() {
let dir = tempfile::tempdir().unwrap();
std::fs::create_dir_all(dir.path().join(".huskies")).unwrap();
let config = make_config(&[("test", "http://test:3001")]);
let state = GatewayState::new(config, PathBuf::new(), 3000).unwrap();
let result = init_project(&state, dir.path().to_str().unwrap(), None, None).await;
assert!(result.is_err());
}
}
+91
View File
@@ -0,0 +1,91 @@
//! Gateway notification polling — pure event formatting.
//!
//! Formats pipeline events from project containers into gateway notifications
//! with `[project-name]` prefixes. The actual I/O (HTTP polling, spawning
//! tasks, sending messages) lives in `io.rs`.
use crate::service::events::StoredEvent;
use crate::service::notifications::{
format_blocked_notification, format_error_notification, format_stage_notification,
stage_display_name,
};
/// Format a [`StoredEvent`] from a project into a gateway notification.
///
/// Prefixes the message with `[project-name]` so users can distinguish which
/// project emitted the event.
pub fn format_gateway_event(project_name: &str, event: &StoredEvent) -> (String, String) {
let prefix = format!("[{project_name}] ");
match event {
StoredEvent::StageTransition {
story_id,
from_stage,
to_stage,
..
} => {
let from_display = stage_display_name(from_stage);
let to_display = stage_display_name(to_stage);
let (plain, html) = format_stage_notification(story_id, None, from_display, to_display);
(format!("{prefix}{plain}"), format!("{prefix}{html}"))
}
StoredEvent::MergeFailure {
story_id, reason, ..
} => {
let (plain, html) = format_error_notification(story_id, None, reason);
(format!("{prefix}{plain}"), format!("{prefix}{html}"))
}
StoredEvent::StoryBlocked {
story_id, reason, ..
} => {
let (plain, html) = format_blocked_notification(story_id, None, reason);
(format!("{prefix}{plain}"), format!("{prefix}{html}"))
}
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stage_transition_prefixes_project_name() {
let event = StoredEvent::StageTransition {
story_id: "42_story_my_feature".to_string(),
from_stage: "2_current".to_string(),
to_stage: "3_qa".to_string(),
timestamp_ms: 1000,
};
let (plain, html) = format_gateway_event("huskies", &event);
assert!(plain.starts_with("[huskies] "));
assert!(html.starts_with("[huskies] "));
assert!(plain.contains("Current"));
assert!(plain.contains("QA"));
}
#[test]
fn merge_failure_prefixes_project_name() {
let event = StoredEvent::MergeFailure {
story_id: "42_story_my_feature".to_string(),
reason: "merge conflict".to_string(),
timestamp_ms: 1000,
};
let (plain, _html) = format_gateway_event("robot-studio", &event);
assert!(plain.starts_with("[robot-studio] "));
assert!(plain.contains("merge conflict"));
}
#[test]
fn story_blocked_prefixes_project_name() {
let event = StoredEvent::StoryBlocked {
story_id: "43_story_bar".to_string(),
reason: "retry limit exceeded".to_string(),
timestamp_ms: 2000,
};
let (plain, _html) = format_gateway_event("huskies", &event);
assert!(plain.starts_with("[huskies] "));
assert!(plain.contains("BLOCKED"));
}
}
+165
View File
@@ -0,0 +1,165 @@
//! Gateway agent registration — pure logic for managing build agents.
//!
//! Contains `JoinedAgent` and functions that validate and manipulate agent
//! state in memory. All persistence (disk I/O) lives in `io.rs`.
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use super::config::ProjectEntry;
/// A build agent that has registered with this gateway.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JoinedAgent {
/// Unique ID assigned by the gateway on registration.
pub id: String,
/// Human-readable label provided by the agent (e.g. `build-agent-abc123`).
pub label: String,
/// The agent's CRDT-sync WebSocket address (e.g. `ws://host:3001/crdt-sync`).
pub address: String,
/// Unix timestamp when the agent registered.
pub registered_at: f64,
/// Unix timestamp of the last heartbeat from this agent.
#[serde(default)]
pub last_seen: f64,
/// Project this agent is assigned to, if any.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub assigned_project: Option<String>,
}
/// Create a new `JoinedAgent` from registration data.
pub fn create_agent(id: String, label: String, address: String, now: f64) -> JoinedAgent {
JoinedAgent {
id,
label,
address,
registered_at: now,
last_seen: now,
assigned_project: None,
}
}
/// Remove an agent by ID from the list. Returns `true` if found and removed.
pub fn remove_agent(agents: &mut Vec<JoinedAgent>, id: &str) -> bool {
let before = agents.len();
agents.retain(|a| a.id != id);
agents.len() < before
}
/// Assign (or unassign) an agent to a project.
///
/// Returns the updated agent on success, or an error if the agent or project
/// is not found.
pub fn assign_agent(
agents: &mut [JoinedAgent],
id: &str,
project: Option<String>,
projects: &BTreeMap<String, ProjectEntry>,
) -> Result<JoinedAgent, super::Error> {
// Validate project exists if assigning.
if let Some(ref p) = project
&& !projects.contains_key(p.as_str())
{
return Err(super::Error::ProjectNotFound(format!(
"unknown project '{p}'"
)));
}
match agents.iter_mut().find(|a| a.id == id) {
None => Err(super::Error::InvalidAgent(format!("agent not found: {id}"))),
Some(a) => {
a.assigned_project = project;
Ok(a.clone())
}
}
}
/// Update an agent's last-seen timestamp. Returns `true` if the agent was found.
pub fn heartbeat(agents: &mut [JoinedAgent], id: &str, now: f64) -> bool {
match agents.iter_mut().find(|a| a.id == id) {
None => false,
Some(a) => {
a.last_seen = now;
true
}
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn create_agent_sets_fields() {
let agent = create_agent("id-1".into(), "lbl".into(), "ws://a".into(), 100.0);
assert_eq!(agent.id, "id-1");
assert_eq!(agent.label, "lbl");
assert_eq!(agent.address, "ws://a");
assert_eq!(agent.registered_at, 100.0);
assert_eq!(agent.last_seen, 100.0);
assert!(agent.assigned_project.is_none());
}
#[test]
fn remove_agent_by_id() {
let mut agents = vec![
create_agent("a".into(), "A".into(), "ws://a".into(), 0.0),
create_agent("b".into(), "B".into(), "ws://b".into(), 0.0),
];
assert!(remove_agent(&mut agents, "a"));
assert_eq!(agents.len(), 1);
assert_eq!(agents[0].id, "b");
}
#[test]
fn remove_agent_missing_returns_false() {
let mut agents = vec![];
assert!(!remove_agent(&mut agents, "x"));
}
#[test]
fn assign_agent_to_valid_project() {
let mut projects = BTreeMap::new();
projects.insert(
"proj".into(),
ProjectEntry {
url: "http://p".into(),
},
);
let mut agents = vec![create_agent("a".into(), "A".into(), "ws://a".into(), 0.0)];
let result = assign_agent(&mut agents, "a", Some("proj".into()), &projects);
assert!(result.is_ok());
assert_eq!(result.unwrap().assigned_project, Some("proj".into()));
}
#[test]
fn assign_agent_to_unknown_project_fails() {
let projects = BTreeMap::new();
let mut agents = vec![create_agent("a".into(), "A".into(), "ws://a".into(), 0.0)];
let result = assign_agent(&mut agents, "a", Some("nope".into()), &projects);
assert!(result.is_err());
}
#[test]
fn assign_agent_unknown_id_fails() {
let projects = BTreeMap::new();
let mut agents: Vec<JoinedAgent> = vec![];
let result = assign_agent(&mut agents, "x", None, &projects);
assert!(result.is_err());
}
#[test]
fn heartbeat_updates_last_seen() {
let mut agents = vec![create_agent("a".into(), "A".into(), "ws://a".into(), 0.0)];
assert!(heartbeat(&mut agents, "a", 999.0));
assert_eq!(agents[0].last_seen, 999.0);
}
#[test]
fn heartbeat_unknown_id_returns_false() {
let mut agents: Vec<JoinedAgent> = vec![];
assert!(!heartbeat(&mut agents, "x", 1.0));
}
}
+3
View File
@@ -18,3 +18,6 @@ pub mod settings;
pub mod timer;
pub mod wizard;
pub mod ws;
pub mod gateway;
pub mod pipeline;
+155
View File
@@ -0,0 +1,155 @@
//! Pipeline service — shared pipeline-domain logic.
//!
//! Contains pure functions for parsing and aggregating pipeline status data.
//! Used by the gateway service for cross-project aggregation and potentially
//! by other consumers that need to reason about pipeline stage counts.
use serde_json::{Value, json};
/// Parse a `get_pipeline_status` JSON payload and produce aggregated counts
/// plus a list of blocked/failing items.
pub fn aggregate_pipeline_counts(pipeline: &Value) -> Value {
let active = pipeline
.get("active")
.and_then(|a| a.as_array())
.cloned()
.unwrap_or_default();
let backlog_count = pipeline
.get("backlog_count")
.and_then(|n| n.as_u64())
.unwrap_or(0);
let mut current = 0u64;
let mut qa = 0u64;
let mut merge = 0u64;
let mut done = 0u64;
let mut blocked: Vec<Value> = Vec::new();
for item in &active {
let stage = item
.get("stage")
.and_then(|s| s.as_str())
.unwrap_or("unknown");
match stage {
"current" => current += 1,
"qa" => qa += 1,
"merge" => merge += 1,
"done" => done += 1,
_ => {}
}
let is_blocked = item
.get("blocked")
.and_then(|b| b.as_bool())
.unwrap_or(false);
let merge_failure = item.get("merge_failure");
let has_merge_failure = merge_failure
.map(|f| !f.is_null() && f != "")
.unwrap_or(false);
if is_blocked || has_merge_failure {
let story_id = item
.get("story_id")
.and_then(|s| s.as_str())
.unwrap_or("?")
.to_string();
let story_name = item
.get("name")
.and_then(|s| s.as_str())
.unwrap_or("")
.to_string();
let reason = if has_merge_failure {
format!(
"merge failure: {}",
merge_failure.and_then(|f| f.as_str()).unwrap_or("unknown")
)
} else {
let rc = item
.get("retry_count")
.and_then(|n| n.as_u64())
.unwrap_or(0);
format!("blocked after {rc} retries")
};
blocked.push(json!({
"story_id": story_id,
"name": story_name,
"stage": stage,
"reason": reason,
}));
}
}
json!({
"counts": {
"backlog": backlog_count,
"current": current,
"qa": qa,
"merge": merge,
"done": done,
},
"blocked": blocked,
})
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn aggregate_empty_pipeline() {
let pipeline = json!({ "active": [], "backlog": [], "backlog_count": 0 });
let result = aggregate_pipeline_counts(&pipeline);
assert_eq!(result["counts"]["backlog"], 0);
assert_eq!(result["counts"]["current"], 0);
assert_eq!(result["counts"]["qa"], 0);
assert_eq!(result["counts"]["merge"], 0);
assert_eq!(result["counts"]["done"], 0);
assert_eq!(result["blocked"].as_array().unwrap().len(), 0);
}
#[test]
fn aggregate_stage_counts_correct() {
let pipeline = json!({
"active": [
{ "story_id": "1_story_a", "name": "A", "stage": "current" },
{ "story_id": "2_story_b", "name": "B", "stage": "current" },
{ "story_id": "3_story_c", "name": "C", "stage": "qa" },
{ "story_id": "4_story_d", "name": "D", "stage": "done" },
],
"backlog": [{ "story_id": "5_story_e", "name": "E" }, { "story_id": "6_story_f", "name": "F" }],
"backlog_count": 2
});
let result = aggregate_pipeline_counts(&pipeline);
assert_eq!(result["counts"]["backlog"], 2);
assert_eq!(result["counts"]["current"], 2);
assert_eq!(result["counts"]["qa"], 1);
assert_eq!(result["counts"]["merge"], 0);
assert_eq!(result["counts"]["done"], 1);
assert_eq!(result["blocked"].as_array().unwrap().len(), 0);
}
#[test]
fn aggregate_blocked_items_captured() {
let pipeline = json!({
"active": [
{ "story_id": "10_story_blocked", "name": "Blocked", "stage": "current", "blocked": true, "retry_count": 3 },
{ "story_id": "20_story_ok", "name": "OK", "stage": "qa" },
],
"backlog": [],
"backlog_count": 0
});
let result = aggregate_pipeline_counts(&pipeline);
let blocked = result["blocked"].as_array().unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0]["story_id"], "10_story_blocked");
assert_eq!(blocked[0]["stage"], "current");
assert!(
blocked[0]["reason"]
.as_str()
.unwrap()
.contains("blocked after 3 retries"),
);
}
}