//! Gateway I/O — the ONLY place in `service/gateway/` that may perform side effects. //! //! Side effects here include: reading/writing config and agent state files, //! HTTP requests to project containers (proxying, health checks, polling), //! spawning the Matrix bot task, and the notification poller background task. use super::config::{GatewayConfig, ProjectEntry}; pub use reqwest::Client; use serde_json::{Value, json}; use std::collections::{BTreeMap, HashMap}; use std::path::Path; // ── Config I/O ─────────────────────────────────────────────────────────────── /// Load gateway config from a `projects.toml` file. pub fn load_config(path: &Path) -> Result { let contents = std::fs::read_to_string(path) .map_err(|e| format!("cannot read {}: {e}", path.display()))?; toml::from_str(&contents).map_err(|e| format!("invalid projects.toml: {e}")) } /// Persist the current projects map to `/projects.toml`. /// Silently ignores write errors or skips when `config_dir` is empty. /// /// Existing `[sled_tokens]` entries are preserved so that adding or removing /// projects via the UI does not wipe the sled authentication tokens. pub async fn save_config(projects: &BTreeMap, config_dir: &Path) { if config_dir.as_os_str().is_empty() { return; } let path = config_dir.join("projects.toml"); let sled_tokens = tokio::fs::read_to_string(&path) .await .ok() .and_then(|data| toml::from_str::(&data).ok()) .map(|c| c.sled_tokens) .unwrap_or_default(); let config = GatewayConfig { projects: projects.clone(), sled_tokens, }; if let Ok(data) = toml::to_string_pretty(&config) { let _ = tokio::fs::write(&path, data).await; } } // ── Bot config I/O ────────────────────────────────────────────────────────── /// Read the current raw bot.toml as key/value pairs for the configuration UI. /// Returns `None` values if the file does not exist. pub fn read_bot_config_raw(config_dir: &Path) -> BotConfigFields { let path = config_dir.join(".huskies").join("bot.toml"); let content = match std::fs::read_to_string(&path) { Ok(c) => c, Err(_) => return BotConfigFields::default(), }; let table: toml::Value = match toml::from_str(&content) { Ok(v) => v, Err(_) => return BotConfigFields::default(), }; let s = |key: &str| -> Option { table .get(key) .and_then(|v| v.as_str()) .map(|s| s.to_string()) }; BotConfigFields { transport: s("transport").unwrap_or_else(|| "matrix".to_string()), homeserver: s("homeserver"), username: s("username"), password: s("password"), slack_bot_token: s("slack_bot_token"), slack_signing_secret: s("slack_signing_secret"), } } /// Raw bot.toml fields for the configuration UI. #[derive(Default)] pub struct BotConfigFields { pub transport: String, pub homeserver: Option, pub username: Option, pub password: Option, pub slack_bot_token: Option, pub slack_signing_secret: Option, } /// Write a `bot.toml` from the given content string. pub fn write_bot_config(config_dir: &Path, content: &str) -> Result<(), String> { let huskies_dir = config_dir.join(".huskies"); std::fs::create_dir_all(&huskies_dir) .map_err(|e| format!("cannot create .huskies dir: {e}"))?; let path = huskies_dir.join("bot.toml"); std::fs::write(&path, content).map_err(|e| format!("cannot write bot.toml: {e}")) } // ── MCP proxy I/O ─────────────────────────────────────────────────────────── /// Proxy a raw MCP request body to the given project URL. pub async fn proxy_mcp_call( client: &Client, base_url: &str, request_bytes: &[u8], ) -> Result, String> { let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/')); let resp = client .post(&mcp_url) .header("Content-Type", "application/json") .body(request_bytes.to_vec()) .send() .await .map_err(|e| format!("failed to reach {mcp_url}: {e}"))?; resp.bytes() .await .map(|b| b.to_vec()) .map_err(|e| format!("failed to read response from {mcp_url}: {e}")) } /// Proxy an MCP `tools/call` request to the sled with `Accept: text/event-stream` /// and return the raw response for streaming. No per-request timeout is applied /// so long-running tool calls (e.g. `run_tests`, up to 1200 s) are not cut short. /// /// The caller reads `.bytes_stream()` from the returned response and re-emits /// each SSE `data:` line as a new event to the originating client. pub async fn proxy_mcp_call_sse( client: &Client, base_url: &str, request_bytes: &[u8], ) -> Result { let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/')); client .post(&mcp_url) .header("Content-Type", "application/json") .header("Accept", "text/event-stream") .body(request_bytes.to_vec()) .send() .await .map_err(|e| format!("failed to reach {mcp_url}: {e}")) } /// Fetch and aggregate pipeline status for a single project URL. pub async fn fetch_one_project_pipeline_status(url: &str, client: &Client) -> Value { let mcp_url = format!("{}/mcp", url.trim_end_matches('/')); let rpc_body = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "get_pipeline_status", "arguments": {} } }); match client.post(&mcp_url).json(&rpc_body).send().await { Ok(resp) => match resp.json::().await { Ok(upstream) => { if let Some(text) = upstream .get("result") .and_then(|r| r.get("content")) .and_then(|c| c.get(0)) .and_then(|c| c.get("text")) .and_then(|t| t.as_str()) { match serde_json::from_str::(text) { Ok(pipeline) => { crate::service::pipeline::aggregate_pipeline_counts(&pipeline) } Err(_) => json!({ "error": "invalid pipeline JSON" }), } } else { json!({ "error": "unexpected response shape" }) } } Err(e) => json!({ "error": format!("invalid response: {e}") }), }, Err(e) => json!({ "error": format!("unreachable: {e}") }), } } /// Fetch `get_pipeline_status` from every registered project URL in parallel. pub async fn fetch_all_project_pipeline_statuses( project_urls: &BTreeMap, client: &Client, ) -> BTreeMap { use futures::future::join_all; let futures: Vec<_> = project_urls .iter() .map(|(name, url)| { let name = name.clone(); let url = url.clone(); let client = client.clone(); async move { let result = fetch_one_project_pipeline_status(&url, &client).await; (name, result) } }) .collect(); join_all(futures).await.into_iter().collect() } /// Fetch pipeline items for a single project URL. /// /// Returns `{ "active": [...], "backlog_count": N }` preserving individual /// story items so the gateway UI can render them. On error returns /// `{ "error": "..." }`. This is distinct from /// `fetch_one_project_pipeline_status` which discards items and returns /// aggregated counts. pub async fn fetch_one_project_pipeline_items(url: &str, client: &Client) -> Value { let mcp_url = format!("{}/mcp", url.trim_end_matches('/')); let rpc_body = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "get_pipeline_status", "arguments": {} } }); match client.post(&mcp_url).json(&rpc_body).send().await { Ok(resp) => match resp.json::().await { Ok(upstream) => { if let Some(text) = upstream .get("result") .and_then(|r| r.get("content")) .and_then(|c| c.get(0)) .and_then(|c| c.get("text")) .and_then(|t| t.as_str()) { match serde_json::from_str::(text) { Ok(pipeline) => { let active = pipeline.get("active").cloned().unwrap_or(json!([])); let backlog = pipeline.get("backlog").cloned().unwrap_or(json!([])); let backlog_count = pipeline .get("backlog_count") .and_then(|n| n.as_u64()) .unwrap_or(0); let archived = pipeline.get("archived").cloned().unwrap_or(json!([])); json!({ "active": active, "backlog": backlog, "backlog_count": backlog_count, "archived": archived }) } Err(_) => json!({ "error": "invalid pipeline JSON" }), } } else { json!({ "error": "unexpected response shape" }) } } Err(e) => json!({ "error": format!("invalid response: {e}") }), }, Err(e) => json!({ "error": format!("unreachable: {e}") }), } } /// Fetch pipeline items from every registered project URL in parallel. /// /// Returns per-project `{ "active": [...], "backlog_count": N }` objects /// suitable for the gateway web UI. pub async fn fetch_all_project_pipeline_items( project_urls: &BTreeMap, client: &Client, ) -> BTreeMap { use futures::future::join_all; let futures: Vec<_> = project_urls .iter() .map(|(name, url)| { let name = name.clone(); let url = url.clone(); let client = client.clone(); async move { let result = fetch_one_project_pipeline_items(&url, &client).await; (name, result) } }) .collect(); join_all(futures).await.into_iter().collect() } /// Fetch the pipeline status from a single project for the `gateway_status` tool. pub async fn fetch_pipeline_status_for_project( client: &Client, base_url: &str, ) -> Result { let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/')); let rpc_body = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "get_pipeline_status", "arguments": {} } }); let resp = client .post(&mcp_url) .json(&rpc_body) .send() .await .map_err(|e| format!("failed to reach {mcp_url}: {e}"))?; resp.json() .await .map_err(|e| format!("invalid upstream response: {e}")) } /// Check health of a single project URL via the read-RPC `health.check` method. /// /// Sends an RPC request to the project's `/mcp` endpoint. A successful /// response (HTTP 2xx) indicates the project container is reachable and /// serving requests. pub async fn check_project_health(client: &Client, base_url: &str) -> Result { let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/')); let rpc_body = json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {} }); match client.post(&mcp_url).json(&rpc_body).send().await { Ok(resp) => Ok(resp.status().is_success()), Err(e) => Err(format!("unreachable: {e}")), } } // ── Gateway MCP JSON ──────────────────────────────────────────────────────── /// Write (or overwrite) a `.mcp.json` in `config_dir` that points Claude Code /// CLI at the gateway's own `/mcp` endpoint. pub fn write_gateway_mcp_json(config_dir: &Path, port: u16) -> Result<(), std::io::Error> { let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let url = format!("http://{host}:{port}/mcp"); let content = json!({ "mcpServers": { "huskies": { "type": "http", "url": url } } }); let path = config_dir.join(".mcp.json"); std::fs::write(&path, serde_json::to_string_pretty(&content).unwrap())?; crate::slog!("[gateway] Wrote {} pointing to {}", path.display(), url); Ok(()) } // ── Init project I/O ──────────────────────────────────────────────────────── /// Check if a path already has a `.huskies/` directory. pub fn has_huskies_dir(path: &Path) -> bool { path.join(".huskies").exists() } /// Create a directory (and parents) if it does not exist. pub fn ensure_directory(path: &Path) -> Result<(), String> { if !path.exists() { std::fs::create_dir_all(path) .map_err(|e| format!("failed to create directory '{}': {e}", path.display()))?; } Ok(()) } /// Scaffold a huskies project at the given path. pub fn scaffold_project(path: &Path) -> Result<(), String> { crate::io::fs::scaffold::scaffold_story_kit(path, 3001) } /// Initialise wizard state at the given path. pub fn init_wizard_state(path: &Path) { crate::io::wizard::WizardState::init_if_missing(path); } // ── Notification poller ───────────────────────────────────────────────────── /// Spawn a background task that reads [`super::GatewayStatusEvent`]s from the /// gateway broadcast channel and forwards each one to the configured rooms via /// `transport`, formatted with a `[project-name]` prefix. /// /// Survives broadcaster back-pressure: when the receiver falls behind /// ([`tokio::sync::broadcast::error::RecvError::Lagged`]), the task /// re-subscribes so it does not permanently stall. /// /// The task exits cleanly when the broadcast channel is closed (i.e. when /// `GatewayState` is dropped). pub fn spawn_gateway_broadcaster_forwarder( transport: std::sync::Arc, room_ids: Vec, mut rx: tokio::sync::broadcast::Receiver, ) { tokio::spawn(async move { loop { match rx.recv().await { Ok(event) => { let (plain, html) = super::polling::format_gateway_event(&event.project, &event.event); for room_id in &room_ids { if let Err(e) = transport.send_message(room_id, &plain, &html).await { crate::slog!( "[gateway-forwarder] Failed to forward event to {room_id}: {e}" ); } } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { crate::slog!( "[gateway-forwarder] Broadcaster lagged by {n} messages; resubscribing" ); rx = rx.resubscribe(); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { crate::slog!("[gateway-forwarder] Broadcast channel closed; forwarder exiting"); break; } } } }); } /// Spawn a background task that polls events from all project servers. pub fn spawn_gateway_notification_poller( transport: std::sync::Arc, room_ids: Vec, project_urls: BTreeMap, poll_interval_secs: u64, ) { tokio::spawn(async move { let client = Client::builder() .timeout(std::time::Duration::from_secs(10)) .build() .unwrap_or_else(|_| Client::new()); let interval = std::time::Duration::from_secs(poll_interval_secs.max(1)); let mut last_ts: HashMap = project_urls .keys() .map(|name| (name.clone(), 0u64)) .collect(); loop { for (project_name, base_url) in &project_urls { let since = last_ts.get(project_name).copied().unwrap_or(0); let url = format!("{base_url}/api/events?since={since}"); let response = match client.get(&url).send().await { Ok(r) => r, Err(e) => { crate::slog!( "[gateway-poller] {project_name}: unreachable ({e}); skipping" ); continue; } }; let events: Vec = match response.json().await { Ok(v) => v, Err(e) => { crate::slog!( "[gateway-poller] {project_name}: failed to parse events: {e}" ); continue; } }; for event in &events { let ts = event.timestamp_ms(); if ts > *last_ts.get(project_name).unwrap_or(&0) { last_ts.insert(project_name.clone(), ts); } let (plain, html) = super::polling::format_gateway_event(project_name, event); for room_id in &room_ids { if let Err(e) = transport.send_message(room_id, &plain, &html).await { crate::slog!( "[gateway-poller] Failed to send notification to {room_id}: {e}" ); } } } } tokio::time::sleep(interval).await; } }); } // ── Gateway bot spawn ─────────────────────────────────────────────────────── /// Re-export type alias for the active project lock. pub type ActiveProject = std::sync::Arc>; /// Attempt to spawn the Matrix bot against the gateway config directory. /// /// `gateway_event_tx` — when `Some`, the bot will subscribe to the gateway /// status broadcaster and forward [`super::GatewayStatusEvent`]s to its /// configured Matrix rooms with a `[project-name]` prefix. /// /// Returns `(abort_handle, shutdown_tx)`. The caller **must** hold /// `shutdown_tx` for the bot's lifetime and send `Some(ShutdownReason)` on it /// before process exit so the bot can announce "going offline" to its rooms. #[allow(clippy::too_many_arguments)] pub fn spawn_gateway_bot( config_dir: &Path, active_project: ActiveProject, gateway_project_urls: BTreeMap, gateway_projects_store: std::sync::Arc>>, port: u16, gateway_event_tx: Option>, perm_rx: std::sync::Arc< tokio::sync::Mutex< tokio::sync::mpsc::UnboundedReceiver, >, >, ) -> ( Option, tokio::sync::watch::Sender>, ) { use crate::agents::AgentPool; use crate::services::Services; use tokio::sync::broadcast; let (watcher_tx, _) = broadcast::channel::(16); let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel::>(None); // shutdown_tx is intentionally NOT forgotten — the caller holds it and // sends Some(reason) on gateway shutdown so the bot announces "going offline". let agents = std::sync::Arc::new(AgentPool::new(port, watcher_tx.clone())); // Read the gateway's bot.toml so display_name, ambient_rooms, and // permission_timeout_secs are honoured (matches the standard-mode // initialisation in main.rs). Previously this path hardcoded // `bot_name = "Assistant"` regardless of bot.toml's display_name, // breaking @-addressing for users who configured a different name. let bot_cfg = crate::chat::transport::matrix::BotConfig::load(config_dir); let services = std::sync::Arc::new(Services { project_root: config_dir.to_path_buf(), status: agents.status_broadcaster(), agents, bot_name: bot_cfg .as_ref() .and_then(|c| c.display_name.clone()) .unwrap_or_else(|| "Assistant".to_string()), bot_user_id: String::new(), ambient_rooms: std::sync::Arc::new(std::sync::Mutex::new( bot_cfg .as_ref() .map(|c| c.ambient_rooms.iter().cloned().collect()) .unwrap_or_default(), )), perm_rx, pending_perm_replies: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashMap::new(), )), permission_timeout_secs: bot_cfg .as_ref() .map(|c| c.permission_timeout_secs) .unwrap_or(120), chat_dispatcher: std::sync::Arc::new(crate::chat::dispatcher::ChatDispatcher::new( bot_cfg .as_ref() .map(|c| c.coalesce_window_ms) .unwrap_or(1_500), )), }); let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load( config_dir.join(".huskies").join("timers.json"), )); let gateway_event_rx = gateway_event_tx.map(|tx| tx.subscribe()); let handle = crate::chat::transport::matrix::spawn_bot( config_dir, watcher_tx, services, shutdown_rx, Some(active_project), gateway_project_urls, Some(gateway_projects_store), timer_store, gateway_event_rx, ); (handle, shutdown_tx) } // ── Tests ──────────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; /// Regression test for story 918: `spawn_gateway_bot` must return a live /// `shutdown_tx` (not one dropped via `std::mem::forget`) so that callers /// can signal `Some(reason)` and the bot's shutdown watcher receives it. #[tokio::test] async fn spawn_gateway_bot_shutdown_tx_not_forgotten() { let tmp = tempfile::tempdir().unwrap(); let active = std::sync::Arc::new(tokio::sync::RwLock::new("proj".to_string())); let (event_tx, _) = tokio::sync::broadcast::channel(4); let (_perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel::(); let perm_rx = std::sync::Arc::new(tokio::sync::Mutex::new(perm_rx)); let projects_store = std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::BTreeMap::new())); let (handle, shutdown_tx) = spawn_gateway_bot( tmp.path(), active, std::collections::BTreeMap::new(), projects_store, 3001, Some(event_tx), perm_rx, ); // No bot.toml in tmp → no abort handle spawned. assert!(handle.is_none()); // The shutdown_tx must be live: subscribe a receiver and verify that // sending Some(reason) is observed — this would fail if the sender // had been forgotten (channel closed, send returns Err). let rx = shutdown_tx.subscribe(); shutdown_tx .send(Some(crate::rebuild::ShutdownReason::Manual)) .expect("shutdown_tx must not be closed (was previously std::mem::forget'd)"); assert_eq!( *rx.borrow(), Some(crate::rebuild::ShutdownReason::Manual), "shutdown receiver must see the Manual reason" ); } }