Files
huskies/server/src/service/gateway/io.rs
T
2026-05-14 16:20:07 +00:00

626 lines
24 KiB
Rust

//! Gateway I/O — the ONLY place in `service/gateway/` that may perform side effects.
//!
//! Side effects here include: reading/writing config and agent state files,
//! HTTP requests to project containers (proxying, health checks, polling),
//! spawning the Matrix bot task, and the notification poller background task.
use super::config::{GatewayConfig, ProjectEntry};
pub use reqwest::Client;
use serde_json::{Value, json};
use std::collections::{BTreeMap, HashMap};
use std::path::Path;
// ── Config I/O ───────────────────────────────────────────────────────────────
/// Load gateway config from a `projects.toml` file.
pub fn load_config(path: &Path) -> Result<GatewayConfig, String> {
let contents = std::fs::read_to_string(path)
.map_err(|e| format!("cannot read {}: {e}", path.display()))?;
toml::from_str(&contents).map_err(|e| format!("invalid projects.toml: {e}"))
}
/// Persist the current projects map to `<config_dir>/projects.toml`.
/// Silently ignores write errors or skips when `config_dir` is empty.
///
/// Existing `[sled_tokens]` entries are preserved so that adding or removing
/// projects via the UI does not wipe the sled authentication tokens.
pub async fn save_config(projects: &BTreeMap<String, ProjectEntry>, config_dir: &Path) {
if config_dir.as_os_str().is_empty() {
return;
}
let path = config_dir.join("projects.toml");
let sled_tokens = tokio::fs::read_to_string(&path)
.await
.ok()
.and_then(|data| toml::from_str::<GatewayConfig>(&data).ok())
.map(|c| c.sled_tokens)
.unwrap_or_default();
let config = GatewayConfig {
projects: projects.clone(),
sled_tokens,
};
if let Ok(data) = toml::to_string_pretty(&config) {
let _ = tokio::fs::write(&path, data).await;
}
}
// ── Bot config I/O ──────────────────────────────────────────────────────────
/// Read the current raw bot.toml as key/value pairs for the configuration UI.
/// Returns `None` values if the file does not exist.
pub fn read_bot_config_raw(config_dir: &Path) -> BotConfigFields {
let path = config_dir.join(".huskies").join("bot.toml");
let content = match std::fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => return BotConfigFields::default(),
};
let table: toml::Value = match toml::from_str(&content) {
Ok(v) => v,
Err(_) => return BotConfigFields::default(),
};
let s = |key: &str| -> Option<String> {
table
.get(key)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
};
BotConfigFields {
transport: s("transport").unwrap_or_else(|| "matrix".to_string()),
homeserver: s("homeserver"),
username: s("username"),
password: s("password"),
slack_bot_token: s("slack_bot_token"),
slack_signing_secret: s("slack_signing_secret"),
}
}
/// Raw bot.toml fields for the configuration UI.
#[derive(Default)]
pub struct BotConfigFields {
pub transport: String,
pub homeserver: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub slack_bot_token: Option<String>,
pub slack_signing_secret: Option<String>,
}
/// Write a `bot.toml` from the given content string.
pub fn write_bot_config(config_dir: &Path, content: &str) -> Result<(), String> {
let huskies_dir = config_dir.join(".huskies");
std::fs::create_dir_all(&huskies_dir)
.map_err(|e| format!("cannot create .huskies dir: {e}"))?;
let path = huskies_dir.join("bot.toml");
std::fs::write(&path, content).map_err(|e| format!("cannot write bot.toml: {e}"))
}
// ── MCP proxy I/O ───────────────────────────────────────────────────────────
/// Proxy a raw MCP request body to the given project URL.
pub async fn proxy_mcp_call(
client: &Client,
base_url: &str,
request_bytes: &[u8],
) -> Result<Vec<u8>, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
let resp = client
.post(&mcp_url)
.header("Content-Type", "application/json")
.body(request_bytes.to_vec())
.send()
.await
.map_err(|e| format!("failed to reach {mcp_url}: {e}"))?;
resp.bytes()
.await
.map(|b| b.to_vec())
.map_err(|e| format!("failed to read response from {mcp_url}: {e}"))
}
/// Proxy an MCP `tools/call` request to the sled with `Accept: text/event-stream`
/// and return the raw response for streaming. No per-request timeout is applied
/// so long-running tool calls (e.g. `run_tests`, up to 1200 s) are not cut short.
///
/// The caller reads `.bytes_stream()` from the returned response and re-emits
/// each SSE `data:` line as a new event to the originating client.
pub async fn proxy_mcp_call_sse(
client: &Client,
base_url: &str,
request_bytes: &[u8],
) -> Result<reqwest::Response, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
client
.post(&mcp_url)
.header("Content-Type", "application/json")
.header("Accept", "text/event-stream")
.body(request_bytes.to_vec())
.send()
.await
.map_err(|e| format!("failed to reach {mcp_url}: {e}"))
}
/// Fetch and aggregate pipeline status for a single project URL.
pub async fn fetch_one_project_pipeline_status(url: &str, client: &Client) -> Value {
let mcp_url = format!("{}/mcp", url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_pipeline_status",
"arguments": {}
}
});
match client.post(&mcp_url).json(&rpc_body).send().await {
Ok(resp) => match resp.json::<Value>().await {
Ok(upstream) => {
if let Some(text) = upstream
.get("result")
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
{
match serde_json::from_str::<Value>(text) {
Ok(pipeline) => {
crate::service::pipeline::aggregate_pipeline_counts(&pipeline)
}
Err(_) => json!({ "error": "invalid pipeline JSON" }),
}
} else {
json!({ "error": "unexpected response shape" })
}
}
Err(e) => json!({ "error": format!("invalid response: {e}") }),
},
Err(e) => json!({ "error": format!("unreachable: {e}") }),
}
}
/// Fetch `get_pipeline_status` from every registered project URL in parallel.
pub async fn fetch_all_project_pipeline_statuses(
project_urls: &BTreeMap<String, String>,
client: &Client,
) -> BTreeMap<String, Value> {
use futures::future::join_all;
let futures: Vec<_> = project_urls
.iter()
.map(|(name, url)| {
let name = name.clone();
let url = url.clone();
let client = client.clone();
async move {
let result = fetch_one_project_pipeline_status(&url, &client).await;
(name, result)
}
})
.collect();
join_all(futures).await.into_iter().collect()
}
/// Fetch pipeline items for a single project URL.
///
/// Returns `{ "active": [...], "backlog_count": N }` preserving individual
/// story items so the gateway UI can render them. On error returns
/// `{ "error": "..." }`. This is distinct from
/// `fetch_one_project_pipeline_status` which discards items and returns
/// aggregated counts.
pub async fn fetch_one_project_pipeline_items(url: &str, client: &Client) -> Value {
let mcp_url = format!("{}/mcp", url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_pipeline_status",
"arguments": {}
}
});
match client.post(&mcp_url).json(&rpc_body).send().await {
Ok(resp) => match resp.json::<Value>().await {
Ok(upstream) => {
if let Some(text) = upstream
.get("result")
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
{
match serde_json::from_str::<Value>(text) {
Ok(pipeline) => {
let active = pipeline.get("active").cloned().unwrap_or(json!([]));
let backlog = pipeline.get("backlog").cloned().unwrap_or(json!([]));
let backlog_count = pipeline
.get("backlog_count")
.and_then(|n| n.as_u64())
.unwrap_or(0);
let archived = pipeline.get("archived").cloned().unwrap_or(json!([]));
json!({ "active": active, "backlog": backlog, "backlog_count": backlog_count, "archived": archived })
}
Err(_) => json!({ "error": "invalid pipeline JSON" }),
}
} else {
json!({ "error": "unexpected response shape" })
}
}
Err(e) => json!({ "error": format!("invalid response: {e}") }),
},
Err(e) => json!({ "error": format!("unreachable: {e}") }),
}
}
/// Fetch pipeline items from every registered project URL in parallel.
///
/// Returns per-project `{ "active": [...], "backlog_count": N }` objects
/// suitable for the gateway web UI.
pub async fn fetch_all_project_pipeline_items(
project_urls: &BTreeMap<String, String>,
client: &Client,
) -> BTreeMap<String, Value> {
use futures::future::join_all;
let futures: Vec<_> = project_urls
.iter()
.map(|(name, url)| {
let name = name.clone();
let url = url.clone();
let client = client.clone();
async move {
let result = fetch_one_project_pipeline_items(&url, &client).await;
(name, result)
}
})
.collect();
join_all(futures).await.into_iter().collect()
}
/// Fetch the pipeline status from a single project for the `gateway_status` tool.
pub async fn fetch_pipeline_status_for_project(
client: &Client,
base_url: &str,
) -> Result<Value, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_pipeline_status",
"arguments": {}
}
});
let resp = client
.post(&mcp_url)
.json(&rpc_body)
.send()
.await
.map_err(|e| format!("failed to reach {mcp_url}: {e}"))?;
resp.json()
.await
.map_err(|e| format!("invalid upstream response: {e}"))
}
/// Check health of a single project URL via the read-RPC `health.check` method.
///
/// Sends an RPC request to the project's `/mcp` endpoint. A successful
/// response (HTTP 2xx) indicates the project container is reachable and
/// serving requests.
pub async fn check_project_health(client: &Client, base_url: &str) -> Result<bool, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
});
match client.post(&mcp_url).json(&rpc_body).send().await {
Ok(resp) => Ok(resp.status().is_success()),
Err(e) => Err(format!("unreachable: {e}")),
}
}
// ── Gateway MCP JSON ────────────────────────────────────────────────────────
/// Write (or overwrite) a `.mcp.json` in `config_dir` that points Claude Code
/// CLI at the gateway's own `/mcp` endpoint.
pub fn write_gateway_mcp_json(config_dir: &Path, port: u16) -> Result<(), std::io::Error> {
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let url = format!("http://{host}:{port}/mcp");
let content = json!({
"mcpServers": {
"huskies": {
"type": "http",
"url": url
}
}
});
let path = config_dir.join(".mcp.json");
std::fs::write(&path, serde_json::to_string_pretty(&content).unwrap())?;
crate::slog!("[gateway] Wrote {} pointing to {}", path.display(), url);
Ok(())
}
// ── Init project I/O ────────────────────────────────────────────────────────
/// Check if a path already has a `.huskies/` directory.
pub fn has_huskies_dir(path: &Path) -> bool {
path.join(".huskies").exists()
}
/// Create a directory (and parents) if it does not exist.
pub fn ensure_directory(path: &Path) -> Result<(), String> {
if !path.exists() {
std::fs::create_dir_all(path)
.map_err(|e| format!("failed to create directory '{}': {e}", path.display()))?;
}
Ok(())
}
/// Scaffold a huskies project at the given path.
pub fn scaffold_project(path: &Path) -> Result<(), String> {
crate::io::fs::scaffold::scaffold_story_kit(path, 3001)
}
/// Initialise wizard state at the given path.
pub fn init_wizard_state(path: &Path) {
crate::io::wizard::WizardState::init_if_missing(path);
}
// ── Notification poller ─────────────────────────────────────────────────────
/// Spawn a background task that reads [`super::GatewayStatusEvent`]s from the
/// gateway broadcast channel and forwards each one to the configured rooms via
/// `transport`, formatted with a `[project-name]` prefix.
///
/// Survives broadcaster back-pressure: when the receiver falls behind
/// ([`tokio::sync::broadcast::error::RecvError::Lagged`]), the task
/// re-subscribes so it does not permanently stall.
///
/// The task exits cleanly when the broadcast channel is closed (i.e. when
/// `GatewayState` is dropped).
pub fn spawn_gateway_broadcaster_forwarder(
transport: std::sync::Arc<dyn crate::chat::ChatTransport>,
room_ids: Vec<String>,
mut rx: tokio::sync::broadcast::Receiver<super::GatewayStatusEvent>,
) {
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let (plain, html) =
super::polling::format_gateway_event(&event.project, &event.event);
for room_id in &room_ids {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
crate::slog!(
"[gateway-forwarder] Failed to forward event to {room_id}: {e}"
);
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
crate::slog!(
"[gateway-forwarder] Broadcaster lagged by {n} messages; resubscribing"
);
rx = rx.resubscribe();
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
crate::slog!("[gateway-forwarder] Broadcast channel closed; forwarder exiting");
break;
}
}
}
});
}
/// Spawn a background task that polls events from all project servers.
pub fn spawn_gateway_notification_poller(
transport: std::sync::Arc<dyn crate::chat::ChatTransport>,
room_ids: Vec<String>,
project_urls: BTreeMap<String, String>,
poll_interval_secs: u64,
) {
tokio::spawn(async move {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_else(|_| Client::new());
let interval = std::time::Duration::from_secs(poll_interval_secs.max(1));
let mut last_ts: HashMap<String, u64> = project_urls
.keys()
.map(|name| (name.clone(), 0u64))
.collect();
loop {
for (project_name, base_url) in &project_urls {
let since = last_ts.get(project_name).copied().unwrap_or(0);
let url = format!("{base_url}/api/events?since={since}");
let response = match client.get(&url).send().await {
Ok(r) => r,
Err(e) => {
crate::slog!(
"[gateway-poller] {project_name}: unreachable ({e}); skipping"
);
continue;
}
};
let events: Vec<crate::service::events::StoredEvent> = match response.json().await {
Ok(v) => v,
Err(e) => {
crate::slog!(
"[gateway-poller] {project_name}: failed to parse events: {e}"
);
continue;
}
};
for event in &events {
let ts = event.timestamp_ms();
if ts > *last_ts.get(project_name).unwrap_or(&0) {
last_ts.insert(project_name.clone(), ts);
}
let (plain, html) = super::polling::format_gateway_event(project_name, event);
for room_id in &room_ids {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
crate::slog!(
"[gateway-poller] Failed to send notification to {room_id}: {e}"
);
}
}
}
}
tokio::time::sleep(interval).await;
}
});
}
// ── Gateway bot spawn ───────────────────────────────────────────────────────
/// Re-export type alias for the active project lock.
pub type ActiveProject = std::sync::Arc<tokio::sync::RwLock<String>>;
/// Attempt to spawn the Matrix bot against the gateway config directory.
///
/// `gateway_event_tx` — when `Some`, the bot will subscribe to the gateway
/// status broadcaster and forward [`super::GatewayStatusEvent`]s to its
/// configured Matrix rooms with a `[project-name]` prefix.
///
/// Returns `(abort_handle, shutdown_tx)`. The caller **must** hold
/// `shutdown_tx` for the bot's lifetime and send `Some(ShutdownReason)` on it
/// before process exit so the bot can announce "going offline" to its rooms.
pub fn spawn_gateway_bot(
config_dir: &Path,
active_project: ActiveProject,
gateway_projects: Vec<String>,
gateway_project_urls: BTreeMap<String, String>,
port: u16,
gateway_event_tx: Option<tokio::sync::broadcast::Sender<super::GatewayStatusEvent>>,
perm_rx: std::sync::Arc<
tokio::sync::Mutex<
tokio::sync::mpsc::UnboundedReceiver<crate::http::context::PermissionForward>,
>,
>,
) -> (
Option<tokio::task::AbortHandle>,
tokio::sync::watch::Sender<Option<crate::rebuild::ShutdownReason>>,
) {
use crate::agents::AgentPool;
use crate::services::Services;
use tokio::sync::broadcast;
let (watcher_tx, _) = broadcast::channel::<crate::io::watcher::WatcherEvent>(16);
let (shutdown_tx, shutdown_rx) =
tokio::sync::watch::channel::<Option<crate::rebuild::ShutdownReason>>(None);
// shutdown_tx is intentionally NOT forgotten — the caller holds it and
// sends Some(reason) on gateway shutdown so the bot announces "going offline".
let agents = std::sync::Arc::new(AgentPool::new(port, watcher_tx.clone()));
// Read the gateway's bot.toml so display_name, ambient_rooms, and
// permission_timeout_secs are honoured (matches the standard-mode
// initialisation in main.rs). Previously this path hardcoded
// `bot_name = "Assistant"` regardless of bot.toml's display_name,
// breaking @-addressing for users who configured a different name.
let bot_cfg = crate::chat::transport::matrix::BotConfig::load(config_dir);
let services = std::sync::Arc::new(Services {
project_root: config_dir.to_path_buf(),
status: agents.status_broadcaster(),
agents,
bot_name: bot_cfg
.as_ref()
.and_then(|c| c.display_name.clone())
.unwrap_or_else(|| "Assistant".to_string()),
bot_user_id: String::new(),
ambient_rooms: std::sync::Arc::new(std::sync::Mutex::new(
bot_cfg
.as_ref()
.map(|c| c.ambient_rooms.iter().cloned().collect())
.unwrap_or_default(),
)),
perm_rx,
pending_perm_replies: std::sync::Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
)),
permission_timeout_secs: bot_cfg
.as_ref()
.map(|c| c.permission_timeout_secs)
.unwrap_or(120),
});
let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load(
config_dir.join(".huskies").join("timers.json"),
));
let gateway_event_rx = gateway_event_tx.map(|tx| tx.subscribe());
let handle = crate::chat::transport::matrix::spawn_bot(
config_dir,
watcher_tx,
services,
shutdown_rx,
Some(active_project),
gateway_projects,
gateway_project_urls,
timer_store,
gateway_event_rx,
);
(handle, shutdown_tx)
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
/// Regression test for story 918: `spawn_gateway_bot` must return a live
/// `shutdown_tx` (not one dropped via `std::mem::forget`) so that callers
/// can signal `Some(reason)` and the bot's shutdown watcher receives it.
#[tokio::test]
async fn spawn_gateway_bot_shutdown_tx_not_forgotten() {
let tmp = tempfile::tempdir().unwrap();
let active = std::sync::Arc::new(tokio::sync::RwLock::new("proj".to_string()));
let (event_tx, _) = tokio::sync::broadcast::channel(4);
let (_perm_tx, perm_rx) =
tokio::sync::mpsc::unbounded_channel::<crate::http::context::PermissionForward>();
let perm_rx = std::sync::Arc::new(tokio::sync::Mutex::new(perm_rx));
let (handle, shutdown_tx) = spawn_gateway_bot(
tmp.path(),
active,
vec!["proj".to_string()],
std::collections::BTreeMap::new(),
3001,
Some(event_tx),
perm_rx,
);
// No bot.toml in tmp → no abort handle spawned.
assert!(handle.is_none());
// The shutdown_tx must be live: subscribe a receiver and verify that
// sending Some(reason) is observed — this would fail if the sender
// had been forgotten (channel closed, send returns Err).
let rx = shutdown_tx.subscribe();
shutdown_tx
.send(Some(crate::rebuild::ShutdownReason::Manual))
.expect("shutdown_tx must not be closed (was previously std::mem::forget'd)");
assert_eq!(
*rx.borrow(),
Some(crate::rebuild::ShutdownReason::Manual),
"shutdown receiver must see the Manual reason"
);
}
}