diff --git a/server/src/config/mod.rs b/server/src/config/mod.rs index 54633e59..ae45c170 100644 --- a/server/src/config/mod.rs +++ b/server/src/config/mod.rs @@ -110,6 +110,20 @@ pub struct ProjectConfig { /// Default: 3. Set to 0 to disable mesh discovery entirely. #[serde(default = "default_max_mesh_peers")] pub max_mesh_peers: usize, + /// Base URL of the gateway this project should push status events to. + /// + /// When set, a relay task is started that connects to the gateway's + /// `/gateway/events/push` WebSocket and forwards every [`StatusEvent`] from + /// the local broadcaster. Example: `gateway_url = "http://gateway:3000"`. + /// Disabled when absent. Also readable from the `HUSKIES_GATEWAY_URL` + /// environment variable. + #[serde(default)] + pub gateway_url: Option, + /// Project name this instance identifies as when pushing events to the + /// gateway. Defaults to the project root directory name when not set. + /// Example: `gateway_project = "huskies"`. + #[serde(default)] + pub gateway_project: Option, } /// Configuration for the filesystem watcher's sweep behaviour. @@ -317,6 +331,8 @@ impl Default for ProjectConfig { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: default_max_mesh_peers(), + gateway_url: None, + gateway_project: None, } } } @@ -403,6 +419,8 @@ impl ProjectConfig { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: default_max_mesh_peers(), + gateway_url: None, + gateway_project: None, }; validate_agents(&config.agent)?; return Ok(config); @@ -440,6 +458,8 @@ impl ProjectConfig { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: default_max_mesh_peers(), + gateway_url: None, + gateway_project: None, }; validate_agents(&config.agent)?; Ok(config) @@ -465,6 +485,8 @@ impl ProjectConfig { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: default_max_mesh_peers(), + gateway_url: None, + gateway_project: None, }) } } diff --git a/server/src/gateway_relay.rs b/server/src/gateway_relay.rs new file mode 100644 index 00000000..dcc166f7 --- /dev/null +++ b/server/src/gateway_relay.rs @@ -0,0 +1,267 @@ +//! Gateway relay task — pushes project status events to the gateway via WebSocket. +//! +//! When `gateway_url` is configured in `project.toml` (or the +//! `HUSKIES_GATEWAY_URL` environment variable is set), this module spawns a +//! background task that: +//! +//! 1. Obtains a one-time join token from the gateway via `POST /gateway/tokens`. +//! 2. Connects to the gateway's `/gateway/events/push` WebSocket endpoint. +//! 3. Forwards every [`StatusEvent`] from the local broadcaster as a +//! JSON-encoded [`StoredEvent`] text frame. +//! 4. Reconnects with exponential back-off when the connection drops. + +use crate::service::events::StoredEvent; +use crate::service::status::{StatusBroadcaster, StatusEvent}; +use crate::slog; +use futures::SinkExt as _; +use futures::StreamExt as _; +use std::sync::Arc; +use tokio_tungstenite::tungstenite::Message as WsMessage; + +// ── Back-off constants ──────────────────────────────────────────────────────── + +/// Initial reconnect delay in seconds. +const INITIAL_BACKOFF_SECS: u64 = 1; +/// Maximum reconnect delay cap in seconds. +const MAX_BACKOFF_SECS: u64 = 60; +/// Multiplier applied after each failed attempt. +const BACKOFF_MULTIPLIER: u64 = 2; + +// ── Public API ──────────────────────────────────────────────────────────────── + +/// Spawn the gateway relay background task. +/// +/// Does nothing when `gateway_url` is empty. When running, the task holds a +/// persistent WebSocket connection to `{gateway_url}/gateway/events/push` and +/// forwards every [`StatusEvent`] the local broadcaster emits as a +/// JSON-encoded [`StoredEvent`] text frame. On disconnect the task +/// reconnects automatically with exponential back-off (initial 1 s, cap 60 s). +pub fn spawn_relay_task( + gateway_url: String, + project_name: String, + broadcaster: Arc, + client: reqwest::Client, +) { + if gateway_url.is_empty() { + return; + } + slog!("[relay] Spawning gateway relay task (project={project_name}, gateway={gateway_url})"); + tokio::spawn(async move { + let mut backoff = INITIAL_BACKOFF_SECS; + loop { + match relay_once(&gateway_url, &project_name, &broadcaster, &client).await { + Ok(()) => { + slog!("[relay] Gateway connection closed cleanly; reconnecting in {backoff}s"); + } + Err(e) => { + slog!("[relay] Relay error: {e}; reconnecting in {backoff}s"); + } + } + tokio::time::sleep(std::time::Duration::from_secs(backoff)).await; + // Exponential back-off with a hard cap. + backoff = (backoff.saturating_mul(BACKOFF_MULTIPLIER)).min(MAX_BACKOFF_SECS); + } + }); +} + +// ── Private helpers ─────────────────────────────────────────────────────────── + +/// Run a single relay session: obtain a token, connect, forward events until +/// disconnect or broadcaster close. +async fn relay_once( + gateway_url: &str, + project_name: &str, + broadcaster: &StatusBroadcaster, + client: &reqwest::Client, +) -> Result<(), String> { + // Subscribe before initiating the network round-trip so no events are + // missed during the connection setup window. + let mut sub = broadcaster.subscribe(); + + // Step 1: obtain a one-time join token from the gateway. + let token_url = format!("{}/gateway/tokens", gateway_url.trim_end_matches('/')); + let resp = client + .post(&token_url) + .send() + .await + .map_err(|e| format!("token request: {e}"))?; + if !resp.status().is_success() { + return Err(format!("token request returned HTTP {}", resp.status())); + } + let body: serde_json::Value = resp.json().await.map_err(|e| format!("token parse: {e}"))?; + let token = body + .get("token") + .and_then(|t| t.as_str()) + .ok_or_else(|| "no 'token' field in gateway response".to_string())? + .to_string(); + + // Step 2: connect to the WebSocket push endpoint. + let ws_base = to_ws_url(gateway_url.trim_end_matches('/')); + let ws_url = format!("{ws_base}/gateway/events/push?token={token}&project={project_name}"); + slog!("[relay] Connecting to gateway events endpoint (project={project_name})"); + + let (ws_stream, _) = tokio_tungstenite::connect_async(ws_url.as_str()) + .await + .map_err(|e| format!("WebSocket connect: {e}"))?; + + let (mut sink, _rx) = ws_stream.split(); + slog!("[relay] Connected to gateway events endpoint (project={project_name})"); + + // Step 3: forward StatusEvents until the broadcaster or connection closes. + loop { + match sub.recv().await { + None => { + // Broadcaster was dropped — server is shutting down. + return Ok(()); + } + Some(event) => { + let Some(stored) = status_to_stored(event) else { + continue; + }; + let text = serde_json::to_string(&stored).map_err(|e| format!("serialise: {e}"))?; + sink.send(WsMessage::Text(text.into())) + .await + .map_err(|e| format!("WebSocket send: {e}"))?; + } + } + } +} + +/// Convert a [`StatusEvent`] to a [`StoredEvent`] stamped with the current +/// wall-clock time, or `None` when the event has no `StoredEvent` equivalent +/// (e.g. rate-limit variants). +fn status_to_stored(event: StatusEvent) -> Option { + let now_ms = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + match event { + StatusEvent::StageTransition { + story_id, + from_stage, + to_stage, + .. + } => Some(StoredEvent::StageTransition { + story_id, + from_stage, + to_stage, + timestamp_ms: now_ms, + }), + StatusEvent::MergeFailure { + story_id, reason, .. + } => Some(StoredEvent::MergeFailure { + story_id, + reason, + timestamp_ms: now_ms, + }), + StatusEvent::StoryBlocked { + story_id, reason, .. + } => Some(StoredEvent::StoryBlocked { + story_id, + reason, + timestamp_ms: now_ms, + }), + // Rate-limit events have no StoredEvent equivalent — skip them. + StatusEvent::RateLimitWarning { .. } | StatusEvent::RateLimitHardBlock { .. } => None, + } +} + +/// Convert an `http://` or `https://` base URL to its `ws://` / `wss://` +/// equivalent. Returns the input unchanged if it does not start with `http`. +fn to_ws_url(base: &str) -> String { + if let Some(rest) = base.strip_prefix("https://") { + format!("wss://{rest}") + } else if let Some(rest) = base.strip_prefix("http://") { + format!("ws://{rest}") + } else { + base.to_string() + } +} + +// ── Tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn to_ws_url_converts_http() { + assert_eq!(to_ws_url("http://gateway:3000"), "ws://gateway:3000"); + } + + #[test] + fn to_ws_url_converts_https() { + assert_eq!(to_ws_url("https://gateway:3000"), "wss://gateway:3000"); + } + + #[test] + fn to_ws_url_passes_through_ws() { + assert_eq!(to_ws_url("ws://already:3000"), "ws://already:3000"); + } + + #[test] + fn status_to_stored_stage_transition() { + let ev = StatusEvent::StageTransition { + story_id: "42".into(), + story_name: None, + from_stage: "1_backlog".into(), + to_stage: "2_current".into(), + }; + let stored = status_to_stored(ev).unwrap(); + assert!( + matches!(stored, StoredEvent::StageTransition { story_id, .. } if story_id == "42") + ); + } + + #[test] + fn status_to_stored_merge_failure() { + let ev = StatusEvent::MergeFailure { + story_id: "7".into(), + story_name: None, + reason: "conflict".into(), + }; + let stored = status_to_stored(ev).unwrap(); + assert!(matches!(stored, StoredEvent::MergeFailure { story_id, .. } if story_id == "7")); + } + + #[test] + fn status_to_stored_story_blocked() { + let ev = StatusEvent::StoryBlocked { + story_id: "3".into(), + story_name: None, + reason: "retry limit".into(), + }; + let stored = status_to_stored(ev).unwrap(); + assert!(matches!(stored, StoredEvent::StoryBlocked { story_id, .. } if story_id == "3")); + } + + #[test] + fn status_to_stored_rate_limit_warning_is_none() { + let ev = StatusEvent::RateLimitWarning { + story_id: "1".into(), + story_name: None, + agent_name: "coder".into(), + }; + assert!(status_to_stored(ev).is_none()); + } + + #[test] + fn status_to_stored_rate_limit_hard_block_is_none() { + let ev = StatusEvent::RateLimitHardBlock { + story_id: "2".into(), + story_name: None, + agent_name: "coder".into(), + reset_at: chrono::Utc::now(), + }; + assert!(status_to_stored(ev).is_none()); + } + + #[test] + fn spawn_relay_task_noop_when_url_empty() { + // Should not panic or spawn anything meaningful. + let broadcaster = Arc::new(StatusBroadcaster::new()); + let client = reqwest::Client::new(); + spawn_relay_task(String::new(), "test".into(), broadcaster, client); + // If we reach here without panic, the guard worked. + } +} diff --git a/server/src/main.rs b/server/src/main.rs index cbbbad6f..f95fe5b0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -15,6 +15,7 @@ pub mod crdt_sync; pub mod crdt_wire; mod db; pub mod gateway; +mod gateway_relay; mod http; mod io; mod llm; @@ -699,6 +700,44 @@ async fn main() -> Result<(), std::io::Error> { let event_buffer = crate::http::events::EventBuffer::new(); crate::http::events::subscribe_to_watcher(event_buffer.clone(), watcher_rx_for_events); + // ── Gateway relay task ─────────────────────────────────────────────────── + // + // When `gateway_url` is configured (via project.toml or HUSKIES_GATEWAY_URL) + // start a background task that pushes StatusEvents to the gateway's + // /gateway/events/push WebSocket endpoint. The project name sent to the + // gateway defaults to the project root directory name when `gateway_project` + // is not explicitly set. + { + let relay_gateway_url = startup_root + .as_ref() + .and_then(|r| config::ProjectConfig::load(r).ok()) + .and_then(|c| c.gateway_url) + .or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok()) + .unwrap_or_default(); + + if !relay_gateway_url.is_empty() { + let relay_project_name = startup_root + .as_ref() + .and_then(|r| config::ProjectConfig::load(r).ok()) + .and_then(|c| c.gateway_project) + .or_else(|| std::env::var("HUSKIES_GATEWAY_PROJECT").ok()) + .or_else(|| { + startup_root + .as_ref() + .and_then(|r| r.file_name()) + .map(|n| n.to_string_lossy().into_owned()) + }) + .unwrap_or_else(|| "project".to_string()); + + gateway_relay::spawn_relay_task( + relay_gateway_url, + relay_project_name, + Arc::clone(&services.status), + reqwest::Client::new(), + ); + } + } + let app = build_routes( ctx, whatsapp_ctx.clone(), diff --git a/server/src/worktree.rs b/server/src/worktree.rs index 7a69639e..40412880 100644 --- a/server/src/worktree.rs +++ b/server/src/worktree.rs @@ -638,6 +638,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -672,6 +674,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; // Should complete without panic run_setup_commands(tmp.path(), &config).await; @@ -706,6 +710,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; // Setup command failures are non-fatal — should not panic or propagate run_setup_commands(tmp.path(), &config).await; @@ -740,6 +746,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; // Teardown failures are best-effort — should not propagate assert!(run_teardown_commands(tmp.path(), &config).await.is_ok()); @@ -773,6 +781,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; let info = create_worktree(&project_root, "42_fresh_test", &config, 3001) .await @@ -813,6 +823,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; // First creation let _info1 = create_worktree(&project_root, "43_reuse_test", &config, 3001) @@ -894,6 +906,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; let result = remove_worktree_by_story_id(tmp.path(), "99_nonexistent", &config).await; @@ -933,6 +947,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; create_worktree(&project_root, "88_remove_by_id", &config, 3001) .await @@ -1019,6 +1035,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; // Even though setup commands fail, create_worktree must succeed // so the agent can start and fix the problem itself. @@ -1061,6 +1079,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; // First creation — no setup commands, should succeed create_worktree(&project_root, "173_reuse_fail", &empty_config, 3001) @@ -1093,6 +1113,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; // Second call — worktree exists, setup commands fail, must still succeed let result = create_worktree(&project_root, "173_reuse_fail", &failing_config, 3002).await; @@ -1131,6 +1153,8 @@ mod tests { crdt_require_token: false, crdt_tokens: Vec::new(), max_mesh_peers: 3, + gateway_url: None, + gateway_project: None, }; let info = create_worktree(&project_root, "77_remove_async", &config, 3001) .await