From 283bbc8658bfee77558683fdbb2f3f6b783c193a Mon Sep 17 00:00:00 2001 From: dave Date: Wed, 29 Apr 2026 00:29:54 +0000 Subject: [PATCH] huskies: merge 832 --- server/src/gateway.rs | 1075 ----------------------------------- server/src/gateway/mod.rs | 143 +++++ server/src/gateway/tests.rs | 929 ++++++++++++++++++++++++++++++ 3 files changed, 1072 insertions(+), 1075 deletions(-) delete mode 100644 server/src/gateway.rs create mode 100644 server/src/gateway/mod.rs create mode 100644 server/src/gateway/tests.rs diff --git a/server/src/gateway.rs b/server/src/gateway.rs deleted file mode 100644 index 17f4ef98..00000000 --- a/server/src/gateway.rs +++ /dev/null @@ -1,1075 +0,0 @@ -//! Multi-project gateway — entrypoint wiring and route tree. -//! -//! When `huskies --gateway` is used, the server starts in gateway mode. -//! Business logic lives in `service::gateway`, HTTP handlers in `http::gateway`. -//! This file contains only the `run` entrypoint and `build_gateway_route` wiring. - -use crate::http::gateway::*; -use crate::service::gateway::{self, GatewayState}; -use poem::EndpointExt; -use std::path::Path; -use std::sync::Arc; - -// Re-export public types that callers reference as `crate::gateway::*`. -pub use crate::service::gateway::{ - GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, ProjectEntry, - broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact, - spawn_gateway_broadcaster_forwarder, spawn_gateway_notification_poller, - subscribe_status_events, -}; - -/// Build the complete gateway route tree. -/// -/// Extracted from `run` so that tests can construct the full route tree and -/// catch duplicate-route panics before they reach production. -pub fn build_gateway_route(state_arc: Arc) -> impl poem::Endpoint { - poem::Route::new() - .at("/bot-config", poem::get(gateway_bot_config_page_handler)) - .at("/api/gateway", poem::get(gateway_api_handler)) - .at( - "/api/gateway/projects", - poem::post(gateway_add_project_handler), - ) - .at( - "/api/gateway/projects/:name", - poem::delete(gateway_remove_project_handler), - ) - .at( - "/api/gateway/bot-config", - poem::get(gateway_bot_config_get_handler).post(gateway_bot_config_save_handler), - ) - .at( - "/mcp", - poem::post(gateway_mcp_post_handler).get(gateway_mcp_get_handler), - ) - // Agent join endpoints. - .at("/gateway/mode", poem::get(gateway_mode_handler)) - .at( - "/gateway/tokens", - poem::post(gateway_generate_token_handler), - ) - .at( - "/gateway/events/push", - poem::get(gateway_event_push_handler), - ) - // Agent registration via CRDT-sync WebSocket. - .at("/crdt-sync", poem::get(gateway_crdt_sync_handler)) - // Agent management REST endpoints. - .at( - "/gateway/agents/:id/assign", - poem::post(gateway_assign_agent_handler), - ) - // Serve the embedded React frontend so the gateway has a UI. - .at( - "/assets/*path", - poem::get(crate::http::assets::embedded_asset), - ) - .at("/*path", poem::get(crate::http::assets::embedded_file)) - .at("/", poem::get(crate::http::assets::embedded_index)) - .data(state_arc) -} - -/// Start the gateway HTTP server. This is the entry point when `--gateway` is used. -pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> { - let config_dir = config_path - .parent() - .unwrap_or(std::path::Path::new(".")) - .to_path_buf(); - - let config = gateway::io::load_config(config_path).map_err(std::io::Error::other)?; - - // Initialise the CRDT so gateway_config.active_project is persisted across restarts. - let crdt_db = config_dir.join("gateway.db"); - if let Err(e) = crate::crdt_state::init(&crdt_db).await { - crate::slog!( - "[gateway] Warning: CRDT init failed ({e}); active-project selection will not persist" - ); - } - - let state = - GatewayState::new(config, config_dir.clone(), port).map_err(std::io::Error::other)?; - let state_arc = Arc::new(state); - - let active = state_arc.active_project.read().await.clone(); - crate::slog!("[gateway] Starting gateway on port {port}, active project: {active}"); - crate::slog!( - "[gateway] Registered projects: {}", - state_arc - .projects - .read() - .await - .keys() - .cloned() - .collect::>() - .join(", ") - ); - - // Write `.mcp.json` so that the gateway's bot connects to this gateway's MCP endpoint. - if let Err(e) = gateway::io::write_gateway_mcp_json(&config_dir, port) { - crate::slog!("[gateway] Warning: could not write .mcp.json: {e}"); - } - - // Spawn the Matrix bot if `.huskies/bot.toml` exists in the config directory. - let gateway_projects: Vec = state_arc.projects.read().await.keys().cloned().collect(); - let gateway_project_urls: std::collections::BTreeMap = state_arc - .projects - .read() - .await - .iter() - .map(|(name, entry)| (name.clone(), entry.url.clone())) - .collect(); - let bot_abort = gateway::io::spawn_gateway_bot( - &config_dir, - Arc::clone(&state_arc.active_project), - gateway_projects, - gateway_project_urls, - port, - Some(state_arc.event_tx.clone()), - ); - *state_arc.bot_handle.lock().await = bot_abort; - - let route = build_gateway_route(state_arc); - - let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); - let addr = format!("{host}:{port}"); - crate::slog!("[gateway] Listening on {addr}"); - - poem::Server::new(poem::listener::TcpListener::bind(&addr)) - .run(route) - .await -} - -// ── Tests ──────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - use crate::service::gateway::{GatewayConfig, GatewayState, ProjectEntry}; - use std::collections::BTreeMap; - use std::path::PathBuf; - - fn make_test_state() -> Arc { - let mut projects = BTreeMap::new(); - projects.insert( - "test".into(), - ProjectEntry { - url: "http://test:3001".into(), - }, - ); - let config = GatewayConfig { projects }; - Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()) - } - - #[test] - fn gateway_route_tree_builds_without_panic() { - let state = make_test_state(); - let _route = build_gateway_route(state); - } - - // ── Tests that exercised internal functions have been moved to their - // ── respective service/gateway modules. The integration tests that use - // ── poem::test::TestClient and mock HTTP servers remain here since they - // ── test the combined HTTP + service interaction through real routes. - - #[tokio::test] - async fn crdt_sync_handshake_then_register_writes_crdt_node() { - crate::crdt_state::init_for_test(); - let state = make_test_state(); - - // Generate a valid join token via the tokens endpoint. - let token_app = poem::Route::new() - .at( - "/gateway/tokens", - poem::post(gateway_generate_token_handler), - ) - .data(state.clone()); - let cli = poem::test::TestClient::new(token_app); - let resp = cli.post("/gateway/tokens").send().await; - assert_eq!(resp.0.status(), poem::http::StatusCode::OK); - let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); - let token = body["token"].as_str().unwrap().to_string(); - - // Token must be pending before the upgrade. - assert!(state.pending_tokens.read().await.contains_key(&token)); - - // Call register_agent directly (the service function exercised by the handler). - let node = crate::service::gateway::register_agent( - &state, - &token, - "test-label".into(), - "ws://test:9000".into(), - ) - .await - .unwrap(); - - // Token is consumed after registration. - assert!(state.pending_tokens.read().await.is_empty()); - - // CRDT node was written and is visible via list_agents. - let agents = crate::service::gateway::list_agents(); - assert!( - agents.iter().any(|n| n.node_id == node.node_id), - "Registered node must appear in list_agents" - ); - - // remove_agent tombstones the node. - assert!(crate::service::gateway::remove_agent(&node.node_id)); - let alive = crate::service::gateway::list_agents(); - assert!( - !alive.iter().any(|n| n.node_id == node.node_id), - "Tombstoned node must not appear in list_agents" - ); - } - - #[tokio::test] - async fn generate_token_creates_pending_token() { - let state = make_test_state(); - let app = poem::Route::new() - .at( - "/gateway/tokens", - poem::post(gateway_generate_token_handler), - ) - .data(state.clone()); - let cli = poem::test::TestClient::new(app); - let resp = cli.post("/gateway/tokens").send().await; - assert_eq!(resp.0.status(), poem::http::StatusCode::OK); - let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); - let token = body["token"].as_str().unwrap(); - assert!(!token.is_empty()); - let tokens = state.pending_tokens.read().await; - assert!(tokens.contains_key(token)); - } - - // ── Notification poller integration tests ──────────────────────────── - - #[tokio::test] - async fn gateway_notification_poller_continues_when_one_project_unreachable() { - use crate::chat::{ChatTransport, MessageId}; - use crate::service::events::StoredEvent; - use async_trait::async_trait; - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - - type CallLog = Arc>>; - - struct MockTransport { - calls: CallLog, - } - - #[async_trait] - impl ChatTransport for MockTransport { - async fn send_message( - &self, - _room_id: &str, - plain: &str, - _html: &str, - ) -> Result { - self.calls.lock().unwrap().push(plain.to_string()); - Ok("id".to_string()) - } - - async fn edit_message( - &self, - _room_id: &str, - _id: &str, - _plain: &str, - _html: &str, - ) -> Result<(), String> { - Ok(()) - } - - async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { - Ok(()) - } - } - - let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); - let transport = Arc::new(MockTransport { - calls: Arc::clone(&calls), - }); - - let event = vec![StoredEvent::StoryBlocked { - story_id: "10_story_ok".to_string(), - reason: "retry limit".to_string(), - timestamp_ms: 500, - }]; - let event_body = serde_json::to_vec(&event).unwrap(); - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let good_port = listener.local_addr().unwrap().port(); - let good_url = format!("http://127.0.0.1:{good_port}"); - tokio::spawn(async move { - for _ in 0..4 { - if let Ok((mut stream, _)) = listener.accept().await { - let mut buf = vec![0u8; 4096]; - let _ = stream.read(&mut buf).await; - let header = format!( - "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", - event_body.len() - ); - let _ = stream.write_all(header.as_bytes()).await; - let _ = stream.write_all(&event_body).await; - } - } - }); - - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - - let bad_url = "http://127.0.0.1:1".to_string(); - - let mut project_urls = BTreeMap::new(); - project_urls.insert("good-project".to_string(), good_url); - project_urls.insert("unreachable-project".to_string(), bad_url); - - gateway::spawn_gateway_notification_poller( - transport as Arc, - vec!["!room:example.org".to_string()], - project_urls, - 1, - ); - - tokio::time::sleep(std::time::Duration::from_millis(1500)).await; - - let messages = calls.lock().unwrap(); - assert!( - !messages.is_empty(), - "Expected notifications from the reachable project; got none" - ); - let has_good = messages - .iter() - .any(|m| m.contains("[good-project]") && m.contains("10_story_ok")); - assert!( - has_good, - "Expected a notification from [good-project]; got: {messages:?}" - ); - let has_bad = messages.iter().any(|m| m.contains("[unreachable-project]")); - assert!( - !has_bad, - "Unreachable project must not produce notifications; got: {messages:?}" - ); - } - - #[tokio::test] - async fn gateway_notification_poller_sends_only_to_configured_gateway_rooms() { - use crate::chat::{ChatTransport, MessageId}; - use crate::service::events::StoredEvent; - use async_trait::async_trait; - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - - type RoomLog = Arc>>; - - struct RoomCapture { - rooms: RoomLog, - } - - #[async_trait] - impl ChatTransport for RoomCapture { - async fn send_message( - &self, - room_id: &str, - _plain: &str, - _html: &str, - ) -> Result { - self.rooms.lock().unwrap().push(room_id.to_string()); - Ok("id".to_string()) - } - - async fn edit_message( - &self, - _room_id: &str, - _id: &str, - _plain: &str, - _html: &str, - ) -> Result<(), String> { - Ok(()) - } - - async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { - Ok(()) - } - } - - let rooms: RoomLog = Arc::new(std::sync::Mutex::new(Vec::new())); - let transport = Arc::new(RoomCapture { - rooms: Arc::clone(&rooms), - }); - - let event = vec![StoredEvent::MergeFailure { - story_id: "5_story_x".to_string(), - reason: "conflict".to_string(), - timestamp_ms: 300, - }]; - let event_body = serde_json::to_vec(&event).unwrap(); - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let port = listener.local_addr().unwrap().port(); - let url = format!("http://127.0.0.1:{port}"); - tokio::spawn(async move { - for _ in 0..4 { - if let Ok((mut stream, _)) = listener.accept().await { - let mut buf = vec![0u8; 4096]; - let _ = stream.read(&mut buf).await; - let header = format!( - "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", - event_body.len() - ); - let _ = stream.write_all(header.as_bytes()).await; - let _ = stream.write_all(&event_body).await; - } - } - }); - - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - - const GATEWAY_ROOM: &str = "!gateway:example.org"; - #[allow(dead_code)] - const PER_PROJECT_ROOM: &str = "!project:example.org"; - - let mut project_urls = BTreeMap::new(); - project_urls.insert("myproj".to_string(), url); - - gateway::spawn_gateway_notification_poller( - transport as Arc, - vec![GATEWAY_ROOM.to_string()], - project_urls, - 1, - ); - - tokio::time::sleep(std::time::Duration::from_millis(1500)).await; - - let room_calls = rooms.lock().unwrap(); - assert!( - !room_calls.is_empty(), - "Expected at least one notification; got none" - ); - for room in room_calls.iter() { - assert_eq!( - room, GATEWAY_ROOM, - "Notification must only go to the gateway room, not {room}" - ); - } - assert!( - !room_calls.iter().any(|r| r == PER_PROJECT_ROOM), - "Per-project room must not receive gateway aggregated notifications" - ); - } - - // ── init_project integration tests ────────────────────────────────── - - #[tokio::test] - async fn init_project_scaffolds_huskies_dir() { - let dir = tempfile::tempdir().unwrap(); - let state = make_test_state(); - let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await; - assert!( - result.is_ok(), - "init_project should succeed: {:?}", - result.err() - ); - assert!(dir.path().join(".huskies").exists()); - assert!(dir.path().join(".huskies/project.toml").exists()); - assert!(dir.path().join(".huskies/agents.toml").exists()); - assert!(dir.path().join("script/test").exists()); - } - - #[tokio::test] - async fn init_project_creates_wizard_state() { - let dir = tempfile::tempdir().unwrap(); - let state = make_test_state(); - gateway::init_project(&state, dir.path().to_str().unwrap(), None, None) - .await - .unwrap(); - let wizard_state_path = dir.path().join(".huskies/wizard_state.json"); - assert!(wizard_state_path.exists()); - let content = std::fs::read_to_string(&wizard_state_path).unwrap(); - let v: serde_json::Value = serde_json::from_str(&content).unwrap(); - assert!(v.get("steps").is_some()); - assert!(v.get("completed").is_some()); - } - - #[tokio::test] - async fn init_project_already_initialised_returns_error() { - let dir = tempfile::tempdir().unwrap(); - std::fs::create_dir_all(dir.path().join(".huskies")).unwrap(); - let state = make_test_state(); - let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await; - assert!(result.is_err()); - } - - #[tokio::test] - async fn init_project_missing_path_returns_error() { - let state = make_test_state(); - let result = gateway::init_project(&state, "", None, None).await; - assert!(result.is_err()); - } - - #[tokio::test] - async fn init_project_registers_in_projects_toml_when_name_and_url_given() { - let dir = tempfile::tempdir().unwrap(); - let config_dir = tempfile::tempdir().unwrap(); - let mut projects = BTreeMap::new(); - projects.insert( - "existing".into(), - ProjectEntry { - url: "http://existing:3001".into(), - }, - ); - let config = GatewayConfig { projects }; - let state = - Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap()); - - let result = gateway::init_project( - &state, - dir.path().to_str().unwrap(), - Some("new-project"), - Some("http://new-project:3002"), - ) - .await; - assert!(result.is_ok()); - - let projects = state.projects.read().await; - assert!(projects.contains_key("new-project")); - assert_eq!(projects["new-project"].url, "http://new-project:3002"); - } - - #[tokio::test] - async fn init_project_duplicate_name_returns_error() { - let dir = tempfile::tempdir().unwrap(); - let mut projects = BTreeMap::new(); - projects.insert( - "taken".into(), - ProjectEntry { - url: "http://taken:3001".into(), - }, - ); - let config = GatewayConfig { projects }; - let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()); - - let result = gateway::init_project( - &state, - dir.path().to_str().unwrap(), - Some("taken"), - Some("http://new:3002"), - ) - .await; - assert!(result.is_err()); - } - - #[tokio::test] - async fn init_project_then_wizard_status_integration() { - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let mock_port = listener.local_addr().unwrap().port(); - let mock_url = format!("http://127.0.0.1:{mock_port}"); - - tokio::spawn(async move { - if let Ok((mut stream, _)) = listener.accept().await { - let mut buf = vec![0u8; 4096]; - let _ = stream.read(&mut buf).await; - let body = serde_json::json!({ - "jsonrpc": "2.0", - "id": 1, - "result": { - "content": [{ - "type": "text", - "text": "{\"steps\":[{\"id\":\"scaffold\",\"title\":\"Scaffold\",\"status\":\"confirmed\"}],\"completed\":false}" - }] - } - }); - let body_bytes = serde_json::to_vec(&body).unwrap(); - let header = format!( - "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", - body_bytes.len() - ); - let _ = stream.write_all(header.as_bytes()).await; - let _ = stream.write_all(&body_bytes).await; - } - }); - - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - - let mut projects = BTreeMap::new(); - projects.insert("mock-project".into(), ProjectEntry { url: mock_url }); - let config = GatewayConfig { projects }; - let config_dir = tempfile::tempdir().unwrap(); - let state = - Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap()); - - let project_dir = tempfile::tempdir().unwrap(); - let result = - gateway::init_project(&state, project_dir.path().to_str().unwrap(), None, None).await; - assert!(result.is_ok()); - assert!(project_dir.path().join(".huskies").exists()); - - let wizard_path = project_dir.path().join(".huskies/wizard_state.json"); - assert!(wizard_path.exists()); - - // Proxy call to the mock server. - let active_url = state.active_url().await.unwrap(); - let proxy_body = serde_json::to_vec(&serde_json::json!({ - "jsonrpc": "2.0", - "id": 2, - "method": "tools/call", - "params": { "name": "wizard_status", "arguments": {} } - })) - .unwrap(); - let proxy_resp = gateway::io::proxy_mcp_call(&state.client, &active_url, &proxy_body).await; - assert!(proxy_resp.is_ok()); - - let resp_json: serde_json::Value = serde_json::from_slice(&proxy_resp.unwrap()).unwrap(); - let result = resp_json.get("result"); - assert!(result.is_some()); - let text = result - .and_then(|r| r.get("content")) - .and_then(|c| c.get(0)) - .and_then(|c| c.get("text")) - .and_then(|t| t.as_str()) - .unwrap_or(""); - let wizard: serde_json::Value = serde_json::from_str(text).unwrap(); - assert!(wizard.get("steps").is_some()); - } - - // ── Aggregate pipeline status integration tests ───────────────────── - - #[tokio::test] - async fn aggregate_pipeline_status_integration_healthy_and_unreachable() { - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let mock_port = listener.local_addr().unwrap().port(); - let healthy_url = format!("http://127.0.0.1:{mock_port}"); - - tokio::spawn(async move { - if let Ok((mut stream, _)) = listener.accept().await { - let mut buf = vec![0u8; 4096]; - let _ = stream.read(&mut buf).await; - let pipeline_json = serde_json::to_string(&serde_json::json!({ - "active": [ - { "story_id": "1_story_a", "name": "A", "stage": "current" }, - { "story_id": "2_story_b", "name": "B", "stage": "qa" }, - { "story_id": "3_story_c", "name": "C", "stage": "current", "blocked": true, "retry_count": 5 }, - ], - "backlog": [{ "story_id": "4_story_d", "name": "D" }], - "backlog_count": 1 - })) - .unwrap(); - let body = serde_json::to_vec(&serde_json::json!({ - "jsonrpc": "2.0", - "id": 1, - "result": { - "content": [{ "type": "text", "text": pipeline_json }] - } - })) - .unwrap(); - let header = format!( - "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", - body.len() - ); - let _ = stream.write_all(header.as_bytes()).await; - let _ = stream.write_all(&body).await; - } - }); - - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - - let unreachable_url = "http://127.0.0.1:1".to_string(); - - let mut project_urls = BTreeMap::new(); - project_urls.insert("healthy-project".to_string(), healthy_url); - project_urls.insert("broken-project".to_string(), unreachable_url); - - let client = reqwest::Client::new(); - let statuses = gateway::fetch_all_project_pipeline_statuses(&project_urls, &client).await; - - assert!(statuses.contains_key("healthy-project")); - assert!(statuses.contains_key("broken-project")); - - let healthy = &statuses["healthy-project"]; - assert!(healthy.get("error").is_none()); - assert_eq!(healthy["counts"]["backlog"], 1); - assert_eq!(healthy["counts"]["current"], 2); - assert_eq!(healthy["counts"]["qa"], 1); - - let blocked = healthy["blocked"].as_array().unwrap(); - assert_eq!(blocked.len(), 1); - assert_eq!(blocked[0]["story_id"], "3_story_c"); - - let broken = &statuses["broken-project"]; - assert!(broken.get("error").is_some()); - } - - // ── Multi-project notification poller integration ──────────────────── - - #[tokio::test] - async fn gateway_notification_poller_delivers_events_from_two_projects_with_project_tags() { - use crate::chat::{ChatTransport, MessageId}; - use crate::service::events::StoredEvent; - use async_trait::async_trait; - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - - type CallLog = Arc>>; - - struct MockTransport { - calls: CallLog, - } - - #[async_trait] - impl ChatTransport for MockTransport { - async fn send_message( - &self, - room_id: &str, - plain: &str, - html: &str, - ) -> Result { - self.calls.lock().unwrap().push(( - room_id.to_string(), - plain.to_string(), - html.to_string(), - )); - Ok("mock-id".to_string()) - } - - async fn edit_message( - &self, - _room_id: &str, - _id: &str, - _plain: &str, - _html: &str, - ) -> Result<(), String> { - Ok(()) - } - - async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { - Ok(()) - } - } - - let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); - let transport = Arc::new(MockTransport { - calls: Arc::clone(&calls), - }); - - let alpha_events = vec![StoredEvent::StageTransition { - story_id: "1_story_alpha".to_string(), - from_stage: "2_current".to_string(), - to_stage: "3_qa".to_string(), - timestamp_ms: 100, - }]; - let alpha_body = serde_json::to_vec(&alpha_events).unwrap(); - let alpha_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let alpha_port = alpha_listener.local_addr().unwrap().port(); - let alpha_url = format!("http://127.0.0.1:{alpha_port}"); - tokio::spawn(async move { - for _ in 0..4 { - if let Ok((mut stream, _)) = alpha_listener.accept().await { - let mut buf = vec![0u8; 4096]; - let _ = stream.read(&mut buf).await; - let header = format!( - "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", - alpha_body.len() - ); - let _ = stream.write_all(header.as_bytes()).await; - let _ = stream.write_all(&alpha_body).await; - } - } - }); - - let beta_events = vec![StoredEvent::MergeFailure { - story_id: "2_story_beta".to_string(), - reason: "merge conflict in lib.rs".to_string(), - timestamp_ms: 200, - }]; - let beta_body = serde_json::to_vec(&beta_events).unwrap(); - let beta_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let beta_port = beta_listener.local_addr().unwrap().port(); - let beta_url = format!("http://127.0.0.1:{beta_port}"); - tokio::spawn(async move { - for _ in 0..4 { - if let Ok((mut stream, _)) = beta_listener.accept().await { - let mut buf = vec![0u8; 4096]; - let _ = stream.read(&mut buf).await; - let header = format!( - "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", - beta_body.len() - ); - let _ = stream.write_all(header.as_bytes()).await; - let _ = stream.write_all(&beta_body).await; - } - } - }); - - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - - let mut project_urls = BTreeMap::new(); - project_urls.insert("alpha".to_string(), alpha_url); - project_urls.insert("beta".to_string(), beta_url); - - gateway::spawn_gateway_notification_poller( - transport as Arc, - vec!["!room:example.org".to_string()], - project_urls, - 1, - ); - - tokio::time::sleep(std::time::Duration::from_millis(1500)).await; - - let calls = calls.lock().unwrap(); - assert!( - !calls.is_empty(), - "Expected at least one notification; got none" - ); - - let plains: Vec<&str> = calls.iter().map(|(_, p, _)| p.as_str()).collect(); - - let alpha_notification = plains - .iter() - .any(|p| p.contains("[alpha]") && p.contains("1")); - let beta_notification = plains - .iter() - .any(|p| p.contains("[beta]") && p.contains("merge conflict")); - - assert!( - alpha_notification, - "Expected a notification from [alpha] containing story ID '1'; got: {plains:?}" - ); - assert!( - beta_notification, - "Expected a notification from [beta] containing 'merge conflict'; got: {plains:?}" - ); - - for (room_id, _, _) in calls.iter() { - assert_eq!( - room_id, "!room:example.org", - "All notifications must go to the gateway room" - ); - } - } - - // ── Gateway broadcaster forwarder tests ───────────────────────────── - - #[tokio::test] - async fn broadcaster_forwarder_forwards_events_with_project_tag() { - use crate::chat::{ChatTransport, MessageId}; - use crate::service::events::StoredEvent; - use async_trait::async_trait; - - type CallLog = Arc>>; - - struct MockTransport { - calls: CallLog, - } - - #[async_trait] - impl ChatTransport for MockTransport { - async fn send_message( - &self, - room_id: &str, - plain: &str, - _html: &str, - ) -> Result { - self.calls - .lock() - .unwrap() - .push((room_id.to_string(), plain.to_string())); - Ok("id".to_string()) - } - - async fn edit_message( - &self, - _room_id: &str, - _id: &str, - _plain: &str, - _html: &str, - ) -> Result<(), String> { - Ok(()) - } - - async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { - Ok(()) - } - } - - let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); - let transport = Arc::new(MockTransport { - calls: Arc::clone(&calls), - }); - - let (tx, rx) = - tokio::sync::broadcast::channel::(16); - gateway::spawn_gateway_broadcaster_forwarder( - transport as Arc, - vec!["!room:example.org".to_string()], - rx, - ); - - // Give the forwarder task a moment to start. - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - - let event = crate::service::gateway::GatewayStatusEvent { - project: "my-project".to_string(), - event: StoredEvent::StageTransition { - story_id: "7_story_x".to_string(), - from_stage: "2_current".to_string(), - to_stage: "3_qa".to_string(), - timestamp_ms: 100, - }, - }; - tx.send(event).unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let messages = calls.lock().unwrap(); - assert_eq!(messages.len(), 1, "Expected exactly one notification"); - let (room, plain) = &messages[0]; - assert_eq!(room, "!room:example.org"); - assert!( - plain.starts_with("[my-project]"), - "Expected [my-project] prefix; got: {plain}" - ); - assert!( - plain.contains("7_story_x"), - "Expected story ID; got: {plain}" - ); - } - - #[tokio::test] - async fn broadcaster_forwarder_resubscribes_on_lag() { - use crate::chat::{ChatTransport, MessageId}; - use crate::service::events::StoredEvent; - use async_trait::async_trait; - - type Counter = Arc>; - - struct CountTransport { - count: Counter, - } - - #[async_trait] - impl ChatTransport for CountTransport { - async fn send_message( - &self, - _room_id: &str, - _plain: &str, - _html: &str, - ) -> Result { - *self.count.lock().unwrap() += 1; - Ok("id".to_string()) - } - - async fn edit_message( - &self, - _room_id: &str, - _id: &str, - _plain: &str, - _html: &str, - ) -> Result<(), String> { - Ok(()) - } - - async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { - Ok(()) - } - } - - let count: Counter = Arc::new(std::sync::Mutex::new(0)); - let transport = Arc::new(CountTransport { - count: Arc::clone(&count), - }); - - // Use a tiny channel (capacity 1) so the second send causes a Lagged error. - let (tx, rx) = - tokio::sync::broadcast::channel::(1); - - // Flood the channel to trigger Lagged before the forwarder task starts. - let make_event = |n: u64| crate::service::gateway::GatewayStatusEvent { - project: "p".to_string(), - event: StoredEvent::StageTransition { - story_id: format!("{n}_story"), - from_stage: "2_current".to_string(), - to_stage: "3_qa".to_string(), - timestamp_ms: n, - }, - }; - // Send 3 events to overflow the capacity-1 channel before the task runs. - let _ = tx.send(make_event(1)); - let _ = tx.send(make_event(2)); - let _ = tx.send(make_event(3)); - - gateway::spawn_gateway_broadcaster_forwarder( - transport as Arc, - vec!["!r:x.org".to_string()], - rx, - ); - - // Send one more event after the forwarder subscribes; it should arrive. - tokio::time::sleep(std::time::Duration::from_millis(20)).await; - tx.send(make_event(4)).unwrap(); - - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - // After Lagged + resubscribe, the forwarder must still process event 4. - let received = *count.lock().unwrap(); - assert!( - received >= 1, - "Expected at least one event after Lagged resubscribe; got {received}" - ); - } - - // ── BotConfig tests ───────────────────────────────────────────────── - - #[test] - fn bot_config_loads_from_gateway_config_dir() { - use crate::chat::transport::matrix::BotConfig; - - let tmp = tempfile::tempdir().unwrap(); - let huskies_dir = tmp.path().join(".huskies"); - std::fs::create_dir_all(&huskies_dir).unwrap(); - std::fs::write( - huskies_dir.join("bot.toml"), - r#" -homeserver = "https://matrix.example.com" -username = "@bot:example.com" -password = "secret" -room_ids = ["!abc:example.com"] -enabled = true -"#, - ) - .unwrap(); - - let config = BotConfig::load(tmp.path()); - assert!(config.is_some()); - let config = config.unwrap(); - assert_eq!( - config.homeserver.as_deref(), - Some("https://matrix.example.com") - ); - } - - #[test] - fn bot_config_absent_returns_none_in_gateway_mode() { - use crate::chat::transport::matrix::BotConfig; - let tmp = tempfile::tempdir().unwrap(); - let config = BotConfig::load(tmp.path()); - assert!(config.is_none()); - } - - #[test] - fn bot_config_disabled_returns_none_in_gateway_mode() { - use crate::chat::transport::matrix::BotConfig; - let tmp = tempfile::tempdir().unwrap(); - let huskies_dir = tmp.path().join(".huskies"); - std::fs::create_dir_all(&huskies_dir).unwrap(); - std::fs::write( - huskies_dir.join("bot.toml"), - r#" -homeserver = "https://matrix.example.com" -username = "@bot:example.com" -password = "secret" -room_ids = ["!abc:example.com"] -enabled = false -"#, - ) - .unwrap(); - - let config = BotConfig::load(tmp.path()); - assert!(config.is_none()); - } -} diff --git a/server/src/gateway/mod.rs b/server/src/gateway/mod.rs new file mode 100644 index 00000000..be00f6ef --- /dev/null +++ b/server/src/gateway/mod.rs @@ -0,0 +1,143 @@ +//! Multi-project gateway — entrypoint wiring and route tree. +//! +//! When `huskies --gateway` is used, the server starts in gateway mode. +//! Business logic lives in `service::gateway`, HTTP handlers in `http::gateway`. +//! This file contains only the `run` entrypoint and `build_gateway_route` wiring. + +use crate::http::gateway::*; +use crate::service::gateway::{self, GatewayState}; +use poem::EndpointExt; +use std::path::Path; +use std::sync::Arc; + +// Re-export public types that callers reference as `crate::gateway::*`. +pub use crate::service::gateway::{ + GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, ProjectEntry, + broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact, + spawn_gateway_broadcaster_forwarder, spawn_gateway_notification_poller, + subscribe_status_events, +}; + +/// Build the complete gateway route tree. +/// +/// Extracted from `run` so that tests can construct the full route tree and +/// catch duplicate-route panics before they reach production. +pub fn build_gateway_route(state_arc: Arc) -> impl poem::Endpoint { + poem::Route::new() + .at("/bot-config", poem::get(gateway_bot_config_page_handler)) + .at("/api/gateway", poem::get(gateway_api_handler)) + .at( + "/api/gateway/projects", + poem::post(gateway_add_project_handler), + ) + .at( + "/api/gateway/projects/:name", + poem::delete(gateway_remove_project_handler), + ) + .at( + "/api/gateway/bot-config", + poem::get(gateway_bot_config_get_handler).post(gateway_bot_config_save_handler), + ) + .at( + "/mcp", + poem::post(gateway_mcp_post_handler).get(gateway_mcp_get_handler), + ) + // Agent join endpoints. + .at("/gateway/mode", poem::get(gateway_mode_handler)) + .at( + "/gateway/tokens", + poem::post(gateway_generate_token_handler), + ) + .at( + "/gateway/events/push", + poem::get(gateway_event_push_handler), + ) + // Agent registration via CRDT-sync WebSocket. + .at("/crdt-sync", poem::get(gateway_crdt_sync_handler)) + // Agent management REST endpoints. + .at( + "/gateway/agents/:id/assign", + poem::post(gateway_assign_agent_handler), + ) + // Serve the embedded React frontend so the gateway has a UI. + .at( + "/assets/*path", + poem::get(crate::http::assets::embedded_asset), + ) + .at("/*path", poem::get(crate::http::assets::embedded_file)) + .at("/", poem::get(crate::http::assets::embedded_index)) + .data(state_arc) +} + +/// Start the gateway HTTP server. This is the entry point when `--gateway` is used. +pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> { + let config_dir = config_path + .parent() + .unwrap_or(std::path::Path::new(".")) + .to_path_buf(); + + let config = gateway::io::load_config(config_path).map_err(std::io::Error::other)?; + + // Initialise the CRDT so gateway_config.active_project is persisted across restarts. + let crdt_db = config_dir.join("gateway.db"); + if let Err(e) = crate::crdt_state::init(&crdt_db).await { + crate::slog!( + "[gateway] Warning: CRDT init failed ({e}); active-project selection will not persist" + ); + } + + let state = + GatewayState::new(config, config_dir.clone(), port).map_err(std::io::Error::other)?; + let state_arc = Arc::new(state); + + let active = state_arc.active_project.read().await.clone(); + crate::slog!("[gateway] Starting gateway on port {port}, active project: {active}"); + crate::slog!( + "[gateway] Registered projects: {}", + state_arc + .projects + .read() + .await + .keys() + .cloned() + .collect::>() + .join(", ") + ); + + // Write `.mcp.json` so that the gateway's bot connects to this gateway's MCP endpoint. + if let Err(e) = gateway::io::write_gateway_mcp_json(&config_dir, port) { + crate::slog!("[gateway] Warning: could not write .mcp.json: {e}"); + } + + // Spawn the Matrix bot if `.huskies/bot.toml` exists in the config directory. + let gateway_projects: Vec = state_arc.projects.read().await.keys().cloned().collect(); + let gateway_project_urls: std::collections::BTreeMap = state_arc + .projects + .read() + .await + .iter() + .map(|(name, entry)| (name.clone(), entry.url.clone())) + .collect(); + let bot_abort = gateway::io::spawn_gateway_bot( + &config_dir, + Arc::clone(&state_arc.active_project), + gateway_projects, + gateway_project_urls, + port, + Some(state_arc.event_tx.clone()), + ); + *state_arc.bot_handle.lock().await = bot_abort; + + let route = build_gateway_route(state_arc); + + let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); + let addr = format!("{host}:{port}"); + crate::slog!("[gateway] Listening on {addr}"); + + poem::Server::new(poem::listener::TcpListener::bind(&addr)) + .run(route) + .await +} + +#[cfg(test)] +mod tests; diff --git a/server/src/gateway/tests.rs b/server/src/gateway/tests.rs new file mode 100644 index 00000000..5fd87ae1 --- /dev/null +++ b/server/src/gateway/tests.rs @@ -0,0 +1,929 @@ +//! Integration tests for the gateway route tree and service interactions. + +use super::*; +use crate::service::gateway::{GatewayConfig, GatewayState, ProjectEntry}; +use std::collections::BTreeMap; +use std::path::PathBuf; + +fn make_test_state() -> Arc { + let mut projects = BTreeMap::new(); + projects.insert( + "test".into(), + ProjectEntry { + url: "http://test:3001".into(), + }, + ); + let config = GatewayConfig { projects }; + Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()) +} + +#[test] +fn gateway_route_tree_builds_without_panic() { + let state = make_test_state(); + let _route = build_gateway_route(state); +} + +// ── Tests that exercised internal functions have been moved to their +// ── respective service/gateway modules. The integration tests that use +// ── poem::test::TestClient and mock HTTP servers remain here since they +// ── test the combined HTTP + service interaction through real routes. + +#[tokio::test] +async fn crdt_sync_handshake_then_register_writes_crdt_node() { + crate::crdt_state::init_for_test(); + let state = make_test_state(); + + // Generate a valid join token via the tokens endpoint. + let token_app = poem::Route::new() + .at( + "/gateway/tokens", + poem::post(gateway_generate_token_handler), + ) + .data(state.clone()); + let cli = poem::test::TestClient::new(token_app); + let resp = cli.post("/gateway/tokens").send().await; + assert_eq!(resp.0.status(), poem::http::StatusCode::OK); + let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); + let token = body["token"].as_str().unwrap().to_string(); + + // Token must be pending before the upgrade. + assert!(state.pending_tokens.read().await.contains_key(&token)); + + // Call register_agent directly (the service function exercised by the handler). + let node = crate::service::gateway::register_agent( + &state, + &token, + "test-label".into(), + "ws://test:9000".into(), + ) + .await + .unwrap(); + + // Token is consumed after registration. + assert!(state.pending_tokens.read().await.is_empty()); + + // CRDT node was written and is visible via list_agents. + let agents = crate::service::gateway::list_agents(); + assert!( + agents.iter().any(|n| n.node_id == node.node_id), + "Registered node must appear in list_agents" + ); + + // remove_agent tombstones the node. + assert!(crate::service::gateway::remove_agent(&node.node_id)); + let alive = crate::service::gateway::list_agents(); + assert!( + !alive.iter().any(|n| n.node_id == node.node_id), + "Tombstoned node must not appear in list_agents" + ); +} + +#[tokio::test] +async fn generate_token_creates_pending_token() { + let state = make_test_state(); + let app = poem::Route::new() + .at( + "/gateway/tokens", + poem::post(gateway_generate_token_handler), + ) + .data(state.clone()); + let cli = poem::test::TestClient::new(app); + let resp = cli.post("/gateway/tokens").send().await; + assert_eq!(resp.0.status(), poem::http::StatusCode::OK); + let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); + let token = body["token"].as_str().unwrap(); + assert!(!token.is_empty()); + let tokens = state.pending_tokens.read().await; + assert!(tokens.contains_key(token)); +} + +// ── Notification poller integration tests ──────────────────────────── + +#[tokio::test] +async fn gateway_notification_poller_continues_when_one_project_unreachable() { + use crate::chat::{ChatTransport, MessageId}; + use crate::service::events::StoredEvent; + use async_trait::async_trait; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + type CallLog = Arc>>; + + struct MockTransport { + calls: CallLog, + } + + #[async_trait] + impl ChatTransport for MockTransport { + async fn send_message( + &self, + _room_id: &str, + plain: &str, + _html: &str, + ) -> Result { + self.calls.lock().unwrap().push(plain.to_string()); + Ok("id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); + let transport = Arc::new(MockTransport { + calls: Arc::clone(&calls), + }); + + let event = vec![StoredEvent::StoryBlocked { + story_id: "10_story_ok".to_string(), + reason: "retry limit".to_string(), + timestamp_ms: 500, + }]; + let event_body = serde_json::to_vec(&event).unwrap(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let good_port = listener.local_addr().unwrap().port(); + let good_url = format!("http://127.0.0.1:{good_port}"); + tokio::spawn(async move { + for _ in 0..4 { + if let Ok((mut stream, _)) = listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + event_body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&event_body).await; + } + } + }); + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let bad_url = "http://127.0.0.1:1".to_string(); + + let mut project_urls = BTreeMap::new(); + project_urls.insert("good-project".to_string(), good_url); + project_urls.insert("unreachable-project".to_string(), bad_url); + + gateway::spawn_gateway_notification_poller( + transport as Arc, + vec!["!room:example.org".to_string()], + project_urls, + 1, + ); + + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + let messages = calls.lock().unwrap(); + assert!( + !messages.is_empty(), + "Expected notifications from the reachable project; got none" + ); + let has_good = messages + .iter() + .any(|m| m.contains("[good-project]") && m.contains("10_story_ok")); + assert!( + has_good, + "Expected a notification from [good-project]; got: {messages:?}" + ); + let has_bad = messages.iter().any(|m| m.contains("[unreachable-project]")); + assert!( + !has_bad, + "Unreachable project must not produce notifications; got: {messages:?}" + ); +} + +#[tokio::test] +async fn gateway_notification_poller_sends_only_to_configured_gateway_rooms() { + use crate::chat::{ChatTransport, MessageId}; + use crate::service::events::StoredEvent; + use async_trait::async_trait; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + type RoomLog = Arc>>; + + struct RoomCapture { + rooms: RoomLog, + } + + #[async_trait] + impl ChatTransport for RoomCapture { + async fn send_message( + &self, + room_id: &str, + _plain: &str, + _html: &str, + ) -> Result { + self.rooms.lock().unwrap().push(room_id.to_string()); + Ok("id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let rooms: RoomLog = Arc::new(std::sync::Mutex::new(Vec::new())); + let transport = Arc::new(RoomCapture { + rooms: Arc::clone(&rooms), + }); + + let event = vec![StoredEvent::MergeFailure { + story_id: "5_story_x".to_string(), + reason: "conflict".to_string(), + timestamp_ms: 300, + }]; + let event_body = serde_json::to_vec(&event).unwrap(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let url = format!("http://127.0.0.1:{port}"); + tokio::spawn(async move { + for _ in 0..4 { + if let Ok((mut stream, _)) = listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + event_body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&event_body).await; + } + } + }); + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + const GATEWAY_ROOM: &str = "!gateway:example.org"; + #[allow(dead_code)] + const PER_PROJECT_ROOM: &str = "!project:example.org"; + + let mut project_urls = BTreeMap::new(); + project_urls.insert("myproj".to_string(), url); + + gateway::spawn_gateway_notification_poller( + transport as Arc, + vec![GATEWAY_ROOM.to_string()], + project_urls, + 1, + ); + + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + let room_calls = rooms.lock().unwrap(); + assert!( + !room_calls.is_empty(), + "Expected at least one notification; got none" + ); + for room in room_calls.iter() { + assert_eq!( + room, GATEWAY_ROOM, + "Notification must only go to the gateway room, not {room}" + ); + } + assert!( + !room_calls.iter().any(|r| r == PER_PROJECT_ROOM), + "Per-project room must not receive gateway aggregated notifications" + ); +} + +// ── init_project integration tests ────────────────────────────────── + +#[tokio::test] +async fn init_project_scaffolds_huskies_dir() { + let dir = tempfile::tempdir().unwrap(); + let state = make_test_state(); + let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await; + assert!( + result.is_ok(), + "init_project should succeed: {:?}", + result.err() + ); + assert!(dir.path().join(".huskies").exists()); + assert!(dir.path().join(".huskies/project.toml").exists()); + assert!(dir.path().join(".huskies/agents.toml").exists()); + assert!(dir.path().join("script/test").exists()); +} + +#[tokio::test] +async fn init_project_creates_wizard_state() { + let dir = tempfile::tempdir().unwrap(); + let state = make_test_state(); + gateway::init_project(&state, dir.path().to_str().unwrap(), None, None) + .await + .unwrap(); + let wizard_state_path = dir.path().join(".huskies/wizard_state.json"); + assert!(wizard_state_path.exists()); + let content = std::fs::read_to_string(&wizard_state_path).unwrap(); + let v: serde_json::Value = serde_json::from_str(&content).unwrap(); + assert!(v.get("steps").is_some()); + assert!(v.get("completed").is_some()); +} + +#[tokio::test] +async fn init_project_already_initialised_returns_error() { + let dir = tempfile::tempdir().unwrap(); + std::fs::create_dir_all(dir.path().join(".huskies")).unwrap(); + let state = make_test_state(); + let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn init_project_missing_path_returns_error() { + let state = make_test_state(); + let result = gateway::init_project(&state, "", None, None).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn init_project_registers_in_projects_toml_when_name_and_url_given() { + let dir = tempfile::tempdir().unwrap(); + let config_dir = tempfile::tempdir().unwrap(); + let mut projects = BTreeMap::new(); + projects.insert( + "existing".into(), + ProjectEntry { + url: "http://existing:3001".into(), + }, + ); + let config = GatewayConfig { projects }; + let state = Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap()); + + let result = gateway::init_project( + &state, + dir.path().to_str().unwrap(), + Some("new-project"), + Some("http://new-project:3002"), + ) + .await; + assert!(result.is_ok()); + + let projects = state.projects.read().await; + assert!(projects.contains_key("new-project")); + assert_eq!(projects["new-project"].url, "http://new-project:3002"); +} + +#[tokio::test] +async fn init_project_duplicate_name_returns_error() { + let dir = tempfile::tempdir().unwrap(); + let mut projects = BTreeMap::new(); + projects.insert( + "taken".into(), + ProjectEntry { + url: "http://taken:3001".into(), + }, + ); + let config = GatewayConfig { projects }; + let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()); + + let result = gateway::init_project( + &state, + dir.path().to_str().unwrap(), + Some("taken"), + Some("http://new:3002"), + ) + .await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn init_project_then_wizard_status_integration() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let mock_port = listener.local_addr().unwrap().port(); + let mock_url = format!("http://127.0.0.1:{mock_port}"); + + tokio::spawn(async move { + if let Ok((mut stream, _)) = listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let body = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { + "content": [{ + "type": "text", + "text": "{\"steps\":[{\"id\":\"scaffold\",\"title\":\"Scaffold\",\"status\":\"confirmed\"}],\"completed\":false}" + }] + } + }); + let body_bytes = serde_json::to_vec(&body).unwrap(); + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body_bytes.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&body_bytes).await; + } + }); + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let mut projects = BTreeMap::new(); + projects.insert("mock-project".into(), ProjectEntry { url: mock_url }); + let config = GatewayConfig { projects }; + let config_dir = tempfile::tempdir().unwrap(); + let state = Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap()); + + let project_dir = tempfile::tempdir().unwrap(); + let result = + gateway::init_project(&state, project_dir.path().to_str().unwrap(), None, None).await; + assert!(result.is_ok()); + assert!(project_dir.path().join(".huskies").exists()); + + let wizard_path = project_dir.path().join(".huskies/wizard_state.json"); + assert!(wizard_path.exists()); + + // Proxy call to the mock server. + let active_url = state.active_url().await.unwrap(); + let proxy_body = serde_json::to_vec(&serde_json::json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/call", + "params": { "name": "wizard_status", "arguments": {} } + })) + .unwrap(); + let proxy_resp = gateway::io::proxy_mcp_call(&state.client, &active_url, &proxy_body).await; + assert!(proxy_resp.is_ok()); + + let resp_json: serde_json::Value = serde_json::from_slice(&proxy_resp.unwrap()).unwrap(); + let result = resp_json.get("result"); + assert!(result.is_some()); + let text = result + .and_then(|r| r.get("content")) + .and_then(|c| c.get(0)) + .and_then(|c| c.get("text")) + .and_then(|t| t.as_str()) + .unwrap_or(""); + let wizard: serde_json::Value = serde_json::from_str(text).unwrap(); + assert!(wizard.get("steps").is_some()); +} + +// ── Aggregate pipeline status integration tests ───────────────────── + +#[tokio::test] +async fn aggregate_pipeline_status_integration_healthy_and_unreachable() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let mock_port = listener.local_addr().unwrap().port(); + let healthy_url = format!("http://127.0.0.1:{mock_port}"); + + tokio::spawn(async move { + if let Ok((mut stream, _)) = listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let pipeline_json = serde_json::to_string(&serde_json::json!({ + "active": [ + { "story_id": "1_story_a", "name": "A", "stage": "current" }, + { "story_id": "2_story_b", "name": "B", "stage": "qa" }, + { "story_id": "3_story_c", "name": "C", "stage": "current", "blocked": true, "retry_count": 5 }, + ], + "backlog": [{ "story_id": "4_story_d", "name": "D" }], + "backlog_count": 1 + })) + .unwrap(); + let body = serde_json::to_vec(&serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": { + "content": [{ "type": "text", "text": pipeline_json }] + } + })) + .unwrap(); + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&body).await; + } + }); + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let unreachable_url = "http://127.0.0.1:1".to_string(); + + let mut project_urls = BTreeMap::new(); + project_urls.insert("healthy-project".to_string(), healthy_url); + project_urls.insert("broken-project".to_string(), unreachable_url); + + let client = reqwest::Client::new(); + let statuses = gateway::fetch_all_project_pipeline_statuses(&project_urls, &client).await; + + assert!(statuses.contains_key("healthy-project")); + assert!(statuses.contains_key("broken-project")); + + let healthy = &statuses["healthy-project"]; + assert!(healthy.get("error").is_none()); + assert_eq!(healthy["counts"]["backlog"], 1); + assert_eq!(healthy["counts"]["current"], 2); + assert_eq!(healthy["counts"]["qa"], 1); + + let blocked = healthy["blocked"].as_array().unwrap(); + assert_eq!(blocked.len(), 1); + assert_eq!(blocked[0]["story_id"], "3_story_c"); + + let broken = &statuses["broken-project"]; + assert!(broken.get("error").is_some()); +} + +// ── Multi-project notification poller integration ──────────────────── + +#[tokio::test] +async fn gateway_notification_poller_delivers_events_from_two_projects_with_project_tags() { + use crate::chat::{ChatTransport, MessageId}; + use crate::service::events::StoredEvent; + use async_trait::async_trait; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + type CallLog = Arc>>; + + struct MockTransport { + calls: CallLog, + } + + #[async_trait] + impl ChatTransport for MockTransport { + async fn send_message( + &self, + room_id: &str, + plain: &str, + html: &str, + ) -> Result { + self.calls.lock().unwrap().push(( + room_id.to_string(), + plain.to_string(), + html.to_string(), + )); + Ok("mock-id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); + let transport = Arc::new(MockTransport { + calls: Arc::clone(&calls), + }); + + let alpha_events = vec![StoredEvent::StageTransition { + story_id: "1_story_alpha".to_string(), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: 100, + }]; + let alpha_body = serde_json::to_vec(&alpha_events).unwrap(); + let alpha_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let alpha_port = alpha_listener.local_addr().unwrap().port(); + let alpha_url = format!("http://127.0.0.1:{alpha_port}"); + tokio::spawn(async move { + for _ in 0..4 { + if let Ok((mut stream, _)) = alpha_listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + alpha_body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&alpha_body).await; + } + } + }); + + let beta_events = vec![StoredEvent::MergeFailure { + story_id: "2_story_beta".to_string(), + reason: "merge conflict in lib.rs".to_string(), + timestamp_ms: 200, + }]; + let beta_body = serde_json::to_vec(&beta_events).unwrap(); + let beta_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let beta_port = beta_listener.local_addr().unwrap().port(); + let beta_url = format!("http://127.0.0.1:{beta_port}"); + tokio::spawn(async move { + for _ in 0..4 { + if let Ok((mut stream, _)) = beta_listener.accept().await { + let mut buf = vec![0u8; 4096]; + let _ = stream.read(&mut buf).await; + let header = format!( + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + beta_body.len() + ); + let _ = stream.write_all(header.as_bytes()).await; + let _ = stream.write_all(&beta_body).await; + } + } + }); + + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let mut project_urls = BTreeMap::new(); + project_urls.insert("alpha".to_string(), alpha_url); + project_urls.insert("beta".to_string(), beta_url); + + gateway::spawn_gateway_notification_poller( + transport as Arc, + vec!["!room:example.org".to_string()], + project_urls, + 1, + ); + + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + let calls = calls.lock().unwrap(); + assert!( + !calls.is_empty(), + "Expected at least one notification; got none" + ); + + let plains: Vec<&str> = calls.iter().map(|(_, p, _)| p.as_str()).collect(); + + let alpha_notification = plains + .iter() + .any(|p| p.contains("[alpha]") && p.contains("1")); + let beta_notification = plains + .iter() + .any(|p| p.contains("[beta]") && p.contains("merge conflict")); + + assert!( + alpha_notification, + "Expected a notification from [alpha] containing story ID '1'; got: {plains:?}" + ); + assert!( + beta_notification, + "Expected a notification from [beta] containing 'merge conflict'; got: {plains:?}" + ); + + for (room_id, _, _) in calls.iter() { + assert_eq!( + room_id, "!room:example.org", + "All notifications must go to the gateway room" + ); + } +} + +// ── Gateway broadcaster forwarder tests ───────────────────────────── + +#[tokio::test] +async fn broadcaster_forwarder_forwards_events_with_project_tag() { + use crate::chat::{ChatTransport, MessageId}; + use crate::service::events::StoredEvent; + use async_trait::async_trait; + + type CallLog = Arc>>; + + struct MockTransport { + calls: CallLog, + } + + #[async_trait] + impl ChatTransport for MockTransport { + async fn send_message( + &self, + room_id: &str, + plain: &str, + _html: &str, + ) -> Result { + self.calls + .lock() + .unwrap() + .push((room_id.to_string(), plain.to_string())); + Ok("id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); + let transport = Arc::new(MockTransport { + calls: Arc::clone(&calls), + }); + + let (tx, rx) = + tokio::sync::broadcast::channel::(16); + gateway::spawn_gateway_broadcaster_forwarder( + transport as Arc, + vec!["!room:example.org".to_string()], + rx, + ); + + // Give the forwarder task a moment to start. + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + let event = crate::service::gateway::GatewayStatusEvent { + project: "my-project".to_string(), + event: StoredEvent::StageTransition { + story_id: "7_story_x".to_string(), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: 100, + }, + }; + tx.send(event).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let messages = calls.lock().unwrap(); + assert_eq!(messages.len(), 1, "Expected exactly one notification"); + let (room, plain) = &messages[0]; + assert_eq!(room, "!room:example.org"); + assert!( + plain.starts_with("[my-project]"), + "Expected [my-project] prefix; got: {plain}" + ); + assert!( + plain.contains("7_story_x"), + "Expected story ID; got: {plain}" + ); +} + +#[tokio::test] +async fn broadcaster_forwarder_resubscribes_on_lag() { + use crate::chat::{ChatTransport, MessageId}; + use crate::service::events::StoredEvent; + use async_trait::async_trait; + + type Counter = Arc>; + + struct CountTransport { + count: Counter, + } + + #[async_trait] + impl ChatTransport for CountTransport { + async fn send_message( + &self, + _room_id: &str, + _plain: &str, + _html: &str, + ) -> Result { + *self.count.lock().unwrap() += 1; + Ok("id".to_string()) + } + + async fn edit_message( + &self, + _room_id: &str, + _id: &str, + _plain: &str, + _html: &str, + ) -> Result<(), String> { + Ok(()) + } + + async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { + Ok(()) + } + } + + let count: Counter = Arc::new(std::sync::Mutex::new(0)); + let transport = Arc::new(CountTransport { + count: Arc::clone(&count), + }); + + // Use a tiny channel (capacity 1) so the second send causes a Lagged error. + let (tx, rx) = + tokio::sync::broadcast::channel::(1); + + // Flood the channel to trigger Lagged before the forwarder task starts. + let make_event = |n: u64| crate::service::gateway::GatewayStatusEvent { + project: "p".to_string(), + event: StoredEvent::StageTransition { + story_id: format!("{n}_story"), + from_stage: "2_current".to_string(), + to_stage: "3_qa".to_string(), + timestamp_ms: n, + }, + }; + // Send 3 events to overflow the capacity-1 channel before the task runs. + let _ = tx.send(make_event(1)); + let _ = tx.send(make_event(2)); + let _ = tx.send(make_event(3)); + + gateway::spawn_gateway_broadcaster_forwarder( + transport as Arc, + vec!["!r:x.org".to_string()], + rx, + ); + + // Send one more event after the forwarder subscribes; it should arrive. + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + tx.send(make_event(4)).unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // After Lagged + resubscribe, the forwarder must still process event 4. + let received = *count.lock().unwrap(); + assert!( + received >= 1, + "Expected at least one event after Lagged resubscribe; got {received}" + ); +} + +// ── BotConfig tests ───────────────────────────────────────────────── + +#[test] +fn bot_config_loads_from_gateway_config_dir() { + use crate::chat::transport::matrix::BotConfig; + + let tmp = tempfile::tempdir().unwrap(); + let huskies_dir = tmp.path().join(".huskies"); + std::fs::create_dir_all(&huskies_dir).unwrap(); + std::fs::write( + huskies_dir.join("bot.toml"), + r#" +homeserver = "https://matrix.example.com" +username = "@bot:example.com" +password = "secret" +room_ids = ["!abc:example.com"] +enabled = true +"#, + ) + .unwrap(); + + let config = BotConfig::load(tmp.path()); + assert!(config.is_some()); + let config = config.unwrap(); + assert_eq!( + config.homeserver.as_deref(), + Some("https://matrix.example.com") + ); +} + +#[test] +fn bot_config_absent_returns_none_in_gateway_mode() { + use crate::chat::transport::matrix::BotConfig; + let tmp = tempfile::tempdir().unwrap(); + let config = BotConfig::load(tmp.path()); + assert!(config.is_none()); +} + +#[test] +fn bot_config_disabled_returns_none_in_gateway_mode() { + use crate::chat::transport::matrix::BotConfig; + let tmp = tempfile::tempdir().unwrap(); + let huskies_dir = tmp.path().join(".huskies"); + std::fs::create_dir_all(&huskies_dir).unwrap(); + std::fs::write( + huskies_dir.join("bot.toml"), + r#" +homeserver = "https://matrix.example.com" +username = "@bot:example.com" +password = "secret" +room_ids = ["!abc:example.com"] +enabled = false +"#, + ) + .unwrap(); + + let config = BotConfig::load(tmp.path()); + assert!(config.is_none()); +}