//! Multi-project gateway — entrypoint wiring and route tree. //! //! When `huskies --gateway` is used, the server starts in gateway mode. //! Business logic lives in `service::gateway`, HTTP handlers in `http::gateway`. //! This file contains only the `run` entrypoint and `build_gateway_route` wiring. use crate::http::gateway::*; use crate::service::gateway::{self, GatewayState}; use poem::EndpointExt; use std::path::Path; use std::sync::Arc; // Re-export public types that callers reference as `crate::gateway::*`. pub use crate::service::gateway::{ GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, ProjectEntry, broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact, spawn_gateway_broadcaster_forwarder, spawn_gateway_notification_poller, subscribe_status_events, }; /// Build the complete gateway route tree. /// /// Extracted from `run` so that tests can construct the full route tree and /// catch duplicate-route panics before they reach production. pub fn build_gateway_route(state_arc: Arc) -> impl poem::Endpoint { poem::Route::new() .at("/bot-config", poem::get(gateway_bot_config_page_handler)) .at("/api/gateway", poem::get(gateway_api_handler)) .at("/api/gateway/switch", poem::post(gateway_switch_handler)) .at( "/api/gateway/pipeline", poem::get(gateway_all_pipeline_handler), ) .at( "/api/gateway/projects", poem::post(gateway_add_project_handler), ) .at( "/api/gateway/projects/:name", poem::delete(gateway_remove_project_handler), ) .at( "/api/gateway/bot-config", poem::get(gateway_bot_config_get_handler).post(gateway_bot_config_save_handler), ) .at( "/mcp", poem::post(gateway_mcp_post_handler).get(gateway_mcp_get_handler), ) // Agent join endpoints. .at("/gateway/mode", poem::get(gateway_mode_handler)) .at( "/gateway/tokens", poem::post(gateway_generate_token_handler), ) .at( "/gateway/events/push", poem::get(gateway_event_push_handler), ) // Agent registration via CRDT-sync WebSocket. .at("/crdt-sync", poem::get(gateway_crdt_sync_handler)) // Agent management REST endpoints. .at("/gateway/agents", poem::get(gateway_list_agents_handler)) .at( "/gateway/agents/:id/assign", poem::post(gateway_assign_agent_handler), ) // Serve the embedded React frontend so the gateway has a UI. .at( "/assets/*path", poem::get(crate::http::assets::embedded_asset), ) .at("/*path", poem::get(crate::http::assets::embedded_file)) .at("/", poem::get(crate::http::assets::embedded_index)) .data(state_arc) } /// Start the gateway HTTP server. This is the entry point when `--gateway` is used. pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> { let config_dir = config_path .parent() .unwrap_or(std::path::Path::new(".")) .to_path_buf(); let config = gateway::io::load_config(config_path).map_err(std::io::Error::other)?; let state = GatewayState::new(config, config_dir.clone(), port).map_err(std::io::Error::other)?; let state_arc = Arc::new(state); let active = state_arc.active_project.read().await.clone(); crate::slog!("[gateway] Starting gateway on port {port}, active project: {active}"); crate::slog!( "[gateway] Registered projects: {}", state_arc .projects .read() .await .keys() .cloned() .collect::>() .join(", ") ); // Write `.mcp.json` so that the gateway's bot connects to this gateway's MCP endpoint. if let Err(e) = gateway::io::write_gateway_mcp_json(&config_dir, port) { crate::slog!("[gateway] Warning: could not write .mcp.json: {e}"); } // Spawn the Matrix bot if `.huskies/bot.toml` exists in the config directory. let gateway_projects: Vec = state_arc.projects.read().await.keys().cloned().collect(); let gateway_project_urls: std::collections::BTreeMap = state_arc .projects .read() .await .iter() .map(|(name, entry)| (name.clone(), entry.url.clone())) .collect(); let bot_abort = gateway::io::spawn_gateway_bot( &config_dir, Arc::clone(&state_arc.active_project), gateway_projects, gateway_project_urls, port, Some(state_arc.event_tx.clone()), ); *state_arc.bot_handle.lock().await = bot_abort; let route = build_gateway_route(state_arc); let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let addr = format!("{host}:{port}"); crate::slog!("[gateway] Listening on {addr}"); poem::Server::new(poem::listener::TcpListener::bind(&addr)) .run(route) .await } // ── Tests ──────────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; use crate::service::gateway::{GatewayConfig, GatewayState, ProjectEntry}; use std::collections::BTreeMap; use std::path::PathBuf; fn make_test_state() -> Arc { let mut projects = BTreeMap::new(); projects.insert( "test".into(), ProjectEntry { url: "http://test:3001".into(), }, ); let config = GatewayConfig { projects }; Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()) } #[test] fn gateway_route_tree_builds_without_panic() { let state = make_test_state(); let _route = build_gateway_route(state); } // ── Tests that exercised internal functions have been moved to their // ── respective service/gateway modules. The integration tests that use // ── poem::test::TestClient and mock HTTP servers remain here since they // ── test the combined HTTP + service interaction through real routes. #[tokio::test] async fn crdt_sync_handshake_then_register_writes_crdt_node() { crate::crdt_state::init_for_test(); let state = make_test_state(); // Generate a valid join token via the tokens endpoint. let token_app = poem::Route::new() .at( "/gateway/tokens", poem::post(gateway_generate_token_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(token_app); let resp = cli.post("/gateway/tokens").send().await; assert_eq!(resp.0.status(), poem::http::StatusCode::OK); let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); let token = body["token"].as_str().unwrap().to_string(); // Token must be pending before the upgrade. assert!(state.pending_tokens.read().await.contains_key(&token)); // Call register_agent directly (the service function exercised by the handler). let node = crate::service::gateway::register_agent( &state, &token, "test-label".into(), "ws://test:9000".into(), ) .await .unwrap(); // Token is consumed after registration. assert!(state.pending_tokens.read().await.is_empty()); // CRDT node was written and is visible via list_agents. let agents = crate::service::gateway::list_agents(); assert!( agents.iter().any(|n| n.node_id == node.node_id), "Registered node must appear in list_agents" ); // remove_agent tombstones the node. assert!(crate::service::gateway::remove_agent(&node.node_id)); let alive = crate::service::gateway::list_agents(); assert!( !alive.iter().any(|n| n.node_id == node.node_id), "Tombstoned node must not appear in list_agents" ); } #[tokio::test] async fn generate_token_creates_pending_token() { let state = make_test_state(); let app = poem::Route::new() .at( "/gateway/tokens", poem::post(gateway_generate_token_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli.post("/gateway/tokens").send().await; assert_eq!(resp.0.status(), poem::http::StatusCode::OK); let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); let token = body["token"].as_str().unwrap(); assert!(!token.is_empty()); let tokens = state.pending_tokens.read().await; assert!(tokens.contains_key(token)); } // ── Notification poller integration tests ──────────────────────────── #[tokio::test] async fn gateway_notification_poller_continues_when_one_project_unreachable() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; use tokio::io::{AsyncReadExt, AsyncWriteExt}; type CallLog = Arc>>; struct MockTransport { calls: CallLog, } #[async_trait] impl ChatTransport for MockTransport { async fn send_message( &self, _room_id: &str, plain: &str, _html: &str, ) -> Result { self.calls.lock().unwrap().push(plain.to_string()); Ok("id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); let transport = Arc::new(MockTransport { calls: Arc::clone(&calls), }); let event = vec![StoredEvent::StoryBlocked { story_id: "10_story_ok".to_string(), reason: "retry limit".to_string(), timestamp_ms: 500, }]; let event_body = serde_json::to_vec(&event).unwrap(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let good_port = listener.local_addr().unwrap().port(); let good_url = format!("http://127.0.0.1:{good_port}"); tokio::spawn(async move { for _ in 0..4 { if let Ok((mut stream, _)) = listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", event_body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&event_body).await; } } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; let bad_url = "http://127.0.0.1:1".to_string(); let mut project_urls = BTreeMap::new(); project_urls.insert("good-project".to_string(), good_url); project_urls.insert("unreachable-project".to_string(), bad_url); gateway::spawn_gateway_notification_poller( transport as Arc, vec!["!room:example.org".to_string()], project_urls, 1, ); tokio::time::sleep(std::time::Duration::from_millis(1500)).await; let messages = calls.lock().unwrap(); assert!( !messages.is_empty(), "Expected notifications from the reachable project; got none" ); let has_good = messages .iter() .any(|m| m.contains("[good-project]") && m.contains("10_story_ok")); assert!( has_good, "Expected a notification from [good-project]; got: {messages:?}" ); let has_bad = messages.iter().any(|m| m.contains("[unreachable-project]")); assert!( !has_bad, "Unreachable project must not produce notifications; got: {messages:?}" ); } #[tokio::test] async fn gateway_notification_poller_sends_only_to_configured_gateway_rooms() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; use tokio::io::{AsyncReadExt, AsyncWriteExt}; type RoomLog = Arc>>; struct RoomCapture { rooms: RoomLog, } #[async_trait] impl ChatTransport for RoomCapture { async fn send_message( &self, room_id: &str, _plain: &str, _html: &str, ) -> Result { self.rooms.lock().unwrap().push(room_id.to_string()); Ok("id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let rooms: RoomLog = Arc::new(std::sync::Mutex::new(Vec::new())); let transport = Arc::new(RoomCapture { rooms: Arc::clone(&rooms), }); let event = vec![StoredEvent::MergeFailure { story_id: "5_story_x".to_string(), reason: "conflict".to_string(), timestamp_ms: 300, }]; let event_body = serde_json::to_vec(&event).unwrap(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let port = listener.local_addr().unwrap().port(); let url = format!("http://127.0.0.1:{port}"); tokio::spawn(async move { for _ in 0..4 { if let Ok((mut stream, _)) = listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", event_body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&event_body).await; } } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; const GATEWAY_ROOM: &str = "!gateway:example.org"; #[allow(dead_code)] const PER_PROJECT_ROOM: &str = "!project:example.org"; let mut project_urls = BTreeMap::new(); project_urls.insert("myproj".to_string(), url); gateway::spawn_gateway_notification_poller( transport as Arc, vec![GATEWAY_ROOM.to_string()], project_urls, 1, ); tokio::time::sleep(std::time::Duration::from_millis(1500)).await; let room_calls = rooms.lock().unwrap(); assert!( !room_calls.is_empty(), "Expected at least one notification; got none" ); for room in room_calls.iter() { assert_eq!( room, GATEWAY_ROOM, "Notification must only go to the gateway room, not {room}" ); } assert!( !room_calls.iter().any(|r| r == PER_PROJECT_ROOM), "Per-project room must not receive gateway aggregated notifications" ); } // ── init_project integration tests ────────────────────────────────── #[tokio::test] async fn init_project_scaffolds_huskies_dir() { let dir = tempfile::tempdir().unwrap(); let state = make_test_state(); let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await; assert!( result.is_ok(), "init_project should succeed: {:?}", result.err() ); assert!(dir.path().join(".huskies").exists()); assert!(dir.path().join(".huskies/project.toml").exists()); assert!(dir.path().join(".huskies/agents.toml").exists()); assert!(dir.path().join("script/test").exists()); } #[tokio::test] async fn init_project_creates_wizard_state() { let dir = tempfile::tempdir().unwrap(); let state = make_test_state(); gateway::init_project(&state, dir.path().to_str().unwrap(), None, None) .await .unwrap(); let wizard_state_path = dir.path().join(".huskies/wizard_state.json"); assert!(wizard_state_path.exists()); let content = std::fs::read_to_string(&wizard_state_path).unwrap(); let v: serde_json::Value = serde_json::from_str(&content).unwrap(); assert!(v.get("steps").is_some()); assert!(v.get("completed").is_some()); } #[tokio::test] async fn init_project_already_initialised_returns_error() { let dir = tempfile::tempdir().unwrap(); std::fs::create_dir_all(dir.path().join(".huskies")).unwrap(); let state = make_test_state(); let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await; assert!(result.is_err()); } #[tokio::test] async fn init_project_missing_path_returns_error() { let state = make_test_state(); let result = gateway::init_project(&state, "", None, None).await; assert!(result.is_err()); } #[tokio::test] async fn init_project_registers_in_projects_toml_when_name_and_url_given() { let dir = tempfile::tempdir().unwrap(); let config_dir = tempfile::tempdir().unwrap(); let mut projects = BTreeMap::new(); projects.insert( "existing".into(), ProjectEntry { url: "http://existing:3001".into(), }, ); let config = GatewayConfig { projects }; let state = Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap()); let result = gateway::init_project( &state, dir.path().to_str().unwrap(), Some("new-project"), Some("http://new-project:3002"), ) .await; assert!(result.is_ok()); let projects = state.projects.read().await; assert!(projects.contains_key("new-project")); assert_eq!(projects["new-project"].url, "http://new-project:3002"); } #[tokio::test] async fn init_project_duplicate_name_returns_error() { let dir = tempfile::tempdir().unwrap(); let mut projects = BTreeMap::new(); projects.insert( "taken".into(), ProjectEntry { url: "http://taken:3001".into(), }, ); let config = GatewayConfig { projects }; let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()); let result = gateway::init_project( &state, dir.path().to_str().unwrap(), Some("taken"), Some("http://new:3002"), ) .await; assert!(result.is_err()); } #[tokio::test] async fn init_project_then_wizard_status_integration() { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let mock_port = listener.local_addr().unwrap().port(); let mock_url = format!("http://127.0.0.1:{mock_port}"); tokio::spawn(async move { if let Ok((mut stream, _)) = listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let body = serde_json::json!({ "jsonrpc": "2.0", "id": 1, "result": { "content": [{ "type": "text", "text": "{\"steps\":[{\"id\":\"scaffold\",\"title\":\"Scaffold\",\"status\":\"confirmed\"}],\"completed\":false}" }] } }); let body_bytes = serde_json::to_vec(&body).unwrap(); let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", body_bytes.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&body_bytes).await; } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; let mut projects = BTreeMap::new(); projects.insert("mock-project".into(), ProjectEntry { url: mock_url }); let config = GatewayConfig { projects }; let config_dir = tempfile::tempdir().unwrap(); let state = Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap()); let project_dir = tempfile::tempdir().unwrap(); let result = gateway::init_project(&state, project_dir.path().to_str().unwrap(), None, None).await; assert!(result.is_ok()); assert!(project_dir.path().join(".huskies").exists()); let wizard_path = project_dir.path().join(".huskies/wizard_state.json"); assert!(wizard_path.exists()); // Proxy call to the mock server. let active_url = state.active_url().await.unwrap(); let proxy_body = serde_json::to_vec(&serde_json::json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "wizard_status", "arguments": {} } })) .unwrap(); let proxy_resp = gateway::io::proxy_mcp_call(&state.client, &active_url, &proxy_body).await; assert!(proxy_resp.is_ok()); let resp_json: serde_json::Value = serde_json::from_slice(&proxy_resp.unwrap()).unwrap(); let result = resp_json.get("result"); assert!(result.is_some()); let text = result .and_then(|r| r.get("content")) .and_then(|c| c.get(0)) .and_then(|c| c.get("text")) .and_then(|t| t.as_str()) .unwrap_or(""); let wizard: serde_json::Value = serde_json::from_str(text).unwrap(); assert!(wizard.get("steps").is_some()); } // ── Aggregate pipeline status integration tests ───────────────────── #[tokio::test] async fn aggregate_pipeline_status_integration_healthy_and_unreachable() { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let mock_port = listener.local_addr().unwrap().port(); let healthy_url = format!("http://127.0.0.1:{mock_port}"); tokio::spawn(async move { if let Ok((mut stream, _)) = listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let pipeline_json = serde_json::to_string(&serde_json::json!({ "active": [ { "story_id": "1_story_a", "name": "A", "stage": "current" }, { "story_id": "2_story_b", "name": "B", "stage": "qa" }, { "story_id": "3_story_c", "name": "C", "stage": "current", "blocked": true, "retry_count": 5 }, ], "backlog": [{ "story_id": "4_story_d", "name": "D" }], "backlog_count": 1 })) .unwrap(); let body = serde_json::to_vec(&serde_json::json!({ "jsonrpc": "2.0", "id": 1, "result": { "content": [{ "type": "text", "text": pipeline_json }] } })) .unwrap(); let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&body).await; } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; let unreachable_url = "http://127.0.0.1:1".to_string(); let mut project_urls = BTreeMap::new(); project_urls.insert("healthy-project".to_string(), healthy_url); project_urls.insert("broken-project".to_string(), unreachable_url); let client = reqwest::Client::new(); let statuses = gateway::fetch_all_project_pipeline_statuses(&project_urls, &client).await; assert!(statuses.contains_key("healthy-project")); assert!(statuses.contains_key("broken-project")); let healthy = &statuses["healthy-project"]; assert!(healthy.get("error").is_none()); assert_eq!(healthy["counts"]["backlog"], 1); assert_eq!(healthy["counts"]["current"], 2); assert_eq!(healthy["counts"]["qa"], 1); let blocked = healthy["blocked"].as_array().unwrap(); assert_eq!(blocked.len(), 1); assert_eq!(blocked[0]["story_id"], "3_story_c"); let broken = &statuses["broken-project"]; assert!(broken.get("error").is_some()); } // ── Multi-project notification poller integration ──────────────────── #[tokio::test] async fn gateway_notification_poller_delivers_events_from_two_projects_with_project_tags() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; use tokio::io::{AsyncReadExt, AsyncWriteExt}; type CallLog = Arc>>; struct MockTransport { calls: CallLog, } #[async_trait] impl ChatTransport for MockTransport { async fn send_message( &self, room_id: &str, plain: &str, html: &str, ) -> Result { self.calls.lock().unwrap().push(( room_id.to_string(), plain.to_string(), html.to_string(), )); Ok("mock-id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); let transport = Arc::new(MockTransport { calls: Arc::clone(&calls), }); let alpha_events = vec![StoredEvent::StageTransition { story_id: "1_story_alpha".to_string(), from_stage: "2_current".to_string(), to_stage: "3_qa".to_string(), timestamp_ms: 100, }]; let alpha_body = serde_json::to_vec(&alpha_events).unwrap(); let alpha_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let alpha_port = alpha_listener.local_addr().unwrap().port(); let alpha_url = format!("http://127.0.0.1:{alpha_port}"); tokio::spawn(async move { for _ in 0..4 { if let Ok((mut stream, _)) = alpha_listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", alpha_body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&alpha_body).await; } } }); let beta_events = vec![StoredEvent::MergeFailure { story_id: "2_story_beta".to_string(), reason: "merge conflict in lib.rs".to_string(), timestamp_ms: 200, }]; let beta_body = serde_json::to_vec(&beta_events).unwrap(); let beta_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let beta_port = beta_listener.local_addr().unwrap().port(); let beta_url = format!("http://127.0.0.1:{beta_port}"); tokio::spawn(async move { for _ in 0..4 { if let Ok((mut stream, _)) = beta_listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", beta_body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&beta_body).await; } } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; let mut project_urls = BTreeMap::new(); project_urls.insert("alpha".to_string(), alpha_url); project_urls.insert("beta".to_string(), beta_url); gateway::spawn_gateway_notification_poller( transport as Arc, vec!["!room:example.org".to_string()], project_urls, 1, ); tokio::time::sleep(std::time::Duration::from_millis(1500)).await; let calls = calls.lock().unwrap(); assert!( !calls.is_empty(), "Expected at least one notification; got none" ); let plains: Vec<&str> = calls.iter().map(|(_, p, _)| p.as_str()).collect(); let alpha_notification = plains .iter() .any(|p| p.contains("[alpha]") && p.contains("1")); let beta_notification = plains .iter() .any(|p| p.contains("[beta]") && p.contains("merge conflict")); assert!( alpha_notification, "Expected a notification from [alpha] containing story ID '1'; got: {plains:?}" ); assert!( beta_notification, "Expected a notification from [beta] containing 'merge conflict'; got: {plains:?}" ); for (room_id, _, _) in calls.iter() { assert_eq!( room_id, "!room:example.org", "All notifications must go to the gateway room" ); } } // ── Gateway broadcaster forwarder tests ───────────────────────────── #[tokio::test] async fn broadcaster_forwarder_forwards_events_with_project_tag() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; type CallLog = Arc>>; struct MockTransport { calls: CallLog, } #[async_trait] impl ChatTransport for MockTransport { async fn send_message( &self, room_id: &str, plain: &str, _html: &str, ) -> Result { self.calls .lock() .unwrap() .push((room_id.to_string(), plain.to_string())); Ok("id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); let transport = Arc::new(MockTransport { calls: Arc::clone(&calls), }); let (tx, rx) = tokio::sync::broadcast::channel::(16); gateway::spawn_gateway_broadcaster_forwarder( transport as Arc, vec!["!room:example.org".to_string()], rx, ); // Give the forwarder task a moment to start. tokio::time::sleep(std::time::Duration::from_millis(10)).await; let event = crate::service::gateway::GatewayStatusEvent { project: "my-project".to_string(), event: StoredEvent::StageTransition { story_id: "7_story_x".to_string(), from_stage: "2_current".to_string(), to_stage: "3_qa".to_string(), timestamp_ms: 100, }, }; tx.send(event).unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let messages = calls.lock().unwrap(); assert_eq!(messages.len(), 1, "Expected exactly one notification"); let (room, plain) = &messages[0]; assert_eq!(room, "!room:example.org"); assert!( plain.starts_with("[my-project]"), "Expected [my-project] prefix; got: {plain}" ); assert!( plain.contains("7_story_x"), "Expected story ID; got: {plain}" ); } #[tokio::test] async fn broadcaster_forwarder_resubscribes_on_lag() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; type Counter = Arc>; struct CountTransport { count: Counter, } #[async_trait] impl ChatTransport for CountTransport { async fn send_message( &self, _room_id: &str, _plain: &str, _html: &str, ) -> Result { *self.count.lock().unwrap() += 1; Ok("id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let count: Counter = Arc::new(std::sync::Mutex::new(0)); let transport = Arc::new(CountTransport { count: Arc::clone(&count), }); // Use a tiny channel (capacity 1) so the second send causes a Lagged error. let (tx, rx) = tokio::sync::broadcast::channel::(1); // Flood the channel to trigger Lagged before the forwarder task starts. let make_event = |n: u64| crate::service::gateway::GatewayStatusEvent { project: "p".to_string(), event: StoredEvent::StageTransition { story_id: format!("{n}_story"), from_stage: "2_current".to_string(), to_stage: "3_qa".to_string(), timestamp_ms: n, }, }; // Send 3 events to overflow the capacity-1 channel before the task runs. let _ = tx.send(make_event(1)); let _ = tx.send(make_event(2)); let _ = tx.send(make_event(3)); gateway::spawn_gateway_broadcaster_forwarder( transport as Arc, vec!["!r:x.org".to_string()], rx, ); // Send one more event after the forwarder subscribes; it should arrive. tokio::time::sleep(std::time::Duration::from_millis(20)).await; tx.send(make_event(4)).unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; // After Lagged + resubscribe, the forwarder must still process event 4. let received = *count.lock().unwrap(); assert!( received >= 1, "Expected at least one event after Lagged resubscribe; got {received}" ); } // ── BotConfig tests ───────────────────────────────────────────────── #[test] fn bot_config_loads_from_gateway_config_dir() { use crate::chat::transport::matrix::BotConfig; let tmp = tempfile::tempdir().unwrap(); let huskies_dir = tmp.path().join(".huskies"); std::fs::create_dir_all(&huskies_dir).unwrap(); std::fs::write( huskies_dir.join("bot.toml"), r#" homeserver = "https://matrix.example.com" username = "@bot:example.com" password = "secret" room_ids = ["!abc:example.com"] enabled = true "#, ) .unwrap(); let config = BotConfig::load(tmp.path()); assert!(config.is_some()); let config = config.unwrap(); assert_eq!( config.homeserver.as_deref(), Some("https://matrix.example.com") ); } #[test] fn bot_config_absent_returns_none_in_gateway_mode() { use crate::chat::transport::matrix::BotConfig; let tmp = tempfile::tempdir().unwrap(); let config = BotConfig::load(tmp.path()); assert!(config.is_none()); } #[test] fn bot_config_disabled_returns_none_in_gateway_mode() { use crate::chat::transport::matrix::BotConfig; let tmp = tempfile::tempdir().unwrap(); let huskies_dir = tmp.path().join(".huskies"); std::fs::create_dir_all(&huskies_dir).unwrap(); std::fs::write( huskies_dir.join("bot.toml"), r#" homeserver = "https://matrix.example.com" username = "@bot:example.com" password = "secret" room_ids = ["!abc:example.com"] enabled = false "#, ) .unwrap(); let config = BotConfig::load(tmp.path()); assert!(config.is_none()); } }