//! Integration tests for the gateway route tree and service interactions. use super::*; use crate::service::gateway::{GatewayConfig, GatewayState, ProjectEntry}; use std::collections::BTreeMap; use std::path::PathBuf; fn make_test_state() -> Arc { let mut projects = BTreeMap::new(); projects.insert("test".into(), ProjectEntry::with_url("http://test:3001")); let config = GatewayConfig { projects, sled_tokens: BTreeMap::new(), }; Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()) } #[test] fn gateway_route_tree_builds_without_panic() { let state = make_test_state(); let _route = build_gateway_route(state); } // ── Tests that exercised internal functions have been moved to their // ── respective service/gateway modules. The integration tests that use // ── poem::test::TestClient and mock HTTP servers remain here since they // ── test the combined HTTP + service interaction through real routes. #[tokio::test] async fn crdt_sync_handshake_then_register_writes_crdt_node() { crate::crdt_state::init_for_test(); let state = make_test_state(); // Generate a valid join token via the tokens endpoint. let token_app = poem::Route::new() .at( "/gateway/tokens", poem::post(gateway_generate_token_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(token_app); let resp = cli.post("/gateway/tokens").send().await; assert_eq!(resp.0.status(), poem::http::StatusCode::OK); let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); let token = body["token"].as_str().unwrap().to_string(); // Token must be pending before the upgrade. assert!(state.pending_tokens.read().await.contains_key(&token)); // Call register_agent directly (the service function exercised by the handler). let node = crate::service::gateway::register_agent( &state, &token, "test-label".into(), "ws://test:9000".into(), ) .await .unwrap(); // Token is consumed after registration. assert!(state.pending_tokens.read().await.is_empty()); // CRDT node was written and is visible via list_agents. let agents = crate::service::gateway::list_agents(); assert!( agents.iter().any(|n| n.node_id == node.node_id), "Registered node must appear in list_agents" ); // remove_agent tombstones the node. assert!(crate::service::gateway::remove_agent(&node.node_id)); let alive = crate::service::gateway::list_agents(); assert!( !alive.iter().any(|n| n.node_id == node.node_id), "Tombstoned node must not appear in list_agents" ); } #[tokio::test] async fn generate_token_creates_pending_token() { let state = make_test_state(); let app = poem::Route::new() .at( "/gateway/tokens", poem::post(gateway_generate_token_handler), ) .data(state.clone()); let cli = poem::test::TestClient::new(app); let resp = cli.post("/gateway/tokens").send().await; assert_eq!(resp.0.status(), poem::http::StatusCode::OK); let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); let token = body["token"].as_str().unwrap(); assert!(!token.is_empty()); let tokens = state.pending_tokens.read().await; assert!(tokens.contains_key(token)); } // ── Notification poller integration tests ──────────────────────────── #[tokio::test] async fn gateway_notification_poller_continues_when_one_project_unreachable() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; use tokio::io::{AsyncReadExt, AsyncWriteExt}; type CallLog = Arc>>; struct MockTransport { calls: CallLog, } #[async_trait] impl ChatTransport for MockTransport { async fn send_message( &self, _room_id: &str, plain: &str, _html: &str, ) -> Result { self.calls.lock().unwrap().push(plain.to_string()); Ok("id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); let transport = Arc::new(MockTransport { calls: Arc::clone(&calls), }); let event = vec![StoredEvent::StoryBlocked { story_id: "10_story_ok".to_string(), reason: "retry limit".to_string(), timestamp_ms: 500, }]; let event_body = serde_json::to_vec(&event).unwrap(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let good_port = listener.local_addr().unwrap().port(); let good_url = format!("http://127.0.0.1:{good_port}"); tokio::spawn(async move { for _ in 0..4 { if let Ok((mut stream, _)) = listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", event_body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&event_body).await; } } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; let bad_url = "http://127.0.0.1:1".to_string(); let mut project_urls = BTreeMap::new(); project_urls.insert("good-project".to_string(), good_url); project_urls.insert("unreachable-project".to_string(), bad_url); gateway::spawn_gateway_notification_poller( transport as Arc, vec!["!room:example.org".to_string()], project_urls, 1, ); tokio::time::sleep(std::time::Duration::from_millis(1500)).await; let messages = calls.lock().unwrap(); assert!( !messages.is_empty(), "Expected notifications from the reachable project; got none" ); let has_good = messages .iter() .any(|m| m.contains("[good-project]") && m.contains("#10")); assert!( has_good, "Expected a notification from [good-project]; got: {messages:?}" ); let has_bad = messages.iter().any(|m| m.contains("[unreachable-project]")); assert!( !has_bad, "Unreachable project must not produce notifications; got: {messages:?}" ); } #[tokio::test] async fn gateway_notification_poller_sends_only_to_configured_gateway_rooms() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; use tokio::io::{AsyncReadExt, AsyncWriteExt}; type RoomLog = Arc>>; struct RoomCapture { rooms: RoomLog, } #[async_trait] impl ChatTransport for RoomCapture { async fn send_message( &self, room_id: &str, _plain: &str, _html: &str, ) -> Result { self.rooms.lock().unwrap().push(room_id.to_string()); Ok("id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let rooms: RoomLog = Arc::new(std::sync::Mutex::new(Vec::new())); let transport = Arc::new(RoomCapture { rooms: Arc::clone(&rooms), }); let event = vec![StoredEvent::MergeFailure { story_id: "5_story_x".to_string(), reason: "conflict".to_string(), timestamp_ms: 300, }]; let event_body = serde_json::to_vec(&event).unwrap(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let port = listener.local_addr().unwrap().port(); let url = format!("http://127.0.0.1:{port}"); tokio::spawn(async move { for _ in 0..4 { if let Ok((mut stream, _)) = listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", event_body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&event_body).await; } } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; const GATEWAY_ROOM: &str = "!gateway:example.org"; #[allow(dead_code)] const PER_PROJECT_ROOM: &str = "!project:example.org"; let mut project_urls = BTreeMap::new(); project_urls.insert("myproj".to_string(), url); gateway::spawn_gateway_notification_poller( transport as Arc, vec![GATEWAY_ROOM.to_string()], project_urls, 1, ); tokio::time::sleep(std::time::Duration::from_millis(1500)).await; let room_calls = rooms.lock().unwrap(); assert!( !room_calls.is_empty(), "Expected at least one notification; got none" ); for room in room_calls.iter() { assert_eq!( room, GATEWAY_ROOM, "Notification must only go to the gateway room, not {room}" ); } assert!( !room_calls.iter().any(|r| r == PER_PROJECT_ROOM), "Per-project room must not receive gateway aggregated notifications" ); } // ── init_project integration tests ────────────────────────────────── #[tokio::test] async fn init_project_scaffolds_huskies_dir() { let dir = tempfile::tempdir().unwrap(); let state = make_test_state(); let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await; assert!( result.is_ok(), "init_project should succeed: {:?}", result.err() ); assert!(dir.path().join(".huskies").exists()); assert!(dir.path().join(".huskies/project.toml").exists()); assert!(dir.path().join(".huskies/agents.toml").exists()); assert!(dir.path().join("script/test").exists()); } #[tokio::test] async fn init_project_creates_wizard_state() { let dir = tempfile::tempdir().unwrap(); let state = make_test_state(); gateway::init_project(&state, dir.path().to_str().unwrap(), None, None) .await .unwrap(); let wizard_state_path = dir.path().join(".huskies/wizard_state.json"); assert!(wizard_state_path.exists()); let content = std::fs::read_to_string(&wizard_state_path).unwrap(); let v: serde_json::Value = serde_json::from_str(&content).unwrap(); assert!(v.get("steps").is_some()); assert!(v.get("completed").is_some()); } #[tokio::test] async fn init_project_already_initialised_returns_error() { let dir = tempfile::tempdir().unwrap(); std::fs::create_dir_all(dir.path().join(".huskies")).unwrap(); let state = make_test_state(); let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await; assert!(result.is_err()); } #[tokio::test] async fn init_project_missing_path_returns_error() { let state = make_test_state(); let result = gateway::init_project(&state, "", None, None).await; assert!(result.is_err()); } #[tokio::test] async fn init_project_registers_in_projects_toml_when_name_and_url_given() { let dir = tempfile::tempdir().unwrap(); let config_dir = tempfile::tempdir().unwrap(); let mut projects = BTreeMap::new(); projects.insert( "existing".into(), ProjectEntry::with_url("http://existing:3001"), ); let config = GatewayConfig { projects, sled_tokens: BTreeMap::new(), }; let state = Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap()); let result = gateway::init_project( &state, dir.path().to_str().unwrap(), Some("new-project"), Some("http://new-project:3002"), ) .await; assert!(result.is_ok()); let projects = state.projects.read().await; assert!(projects.contains_key("new-project")); assert_eq!( projects["new-project"].url.as_deref(), Some("http://new-project:3002") ); } #[tokio::test] async fn init_project_duplicate_name_returns_error() { let dir = tempfile::tempdir().unwrap(); let mut projects = BTreeMap::new(); projects.insert("taken".into(), ProjectEntry::with_url("http://taken:3001")); let config = GatewayConfig { projects, sled_tokens: BTreeMap::new(), }; let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()); let result = gateway::init_project( &state, dir.path().to_str().unwrap(), Some("taken"), Some("http://new:3002"), ) .await; assert!(result.is_err()); } #[tokio::test] async fn init_project_then_wizard_status_integration() { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let mock_port = listener.local_addr().unwrap().port(); let mock_url = format!("http://127.0.0.1:{mock_port}"); tokio::spawn(async move { if let Ok((mut stream, _)) = listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let body = serde_json::json!({ "jsonrpc": "2.0", "id": 1, "result": { "content": [{ "type": "text", "text": "{\"steps\":[{\"id\":\"scaffold\",\"title\":\"Scaffold\",\"status\":\"confirmed\"}],\"completed\":false}" }] } }); let body_bytes = serde_json::to_vec(&body).unwrap(); let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", body_bytes.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&body_bytes).await; } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; let mut projects = BTreeMap::new(); projects.insert("mock-project".into(), ProjectEntry::with_url(mock_url)); let config = GatewayConfig { projects, sled_tokens: BTreeMap::new(), }; let config_dir = tempfile::tempdir().unwrap(); let state = Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap()); let project_dir = tempfile::tempdir().unwrap(); let result = gateway::init_project(&state, project_dir.path().to_str().unwrap(), None, None).await; assert!(result.is_ok()); assert!(project_dir.path().join(".huskies").exists()); let wizard_path = project_dir.path().join(".huskies/wizard_state.json"); assert!(wizard_path.exists()); // Proxy call to the mock server. let active_url = state.active_url().await.unwrap(); let proxy_body = serde_json::to_vec(&serde_json::json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "wizard_status", "arguments": {} } })) .unwrap(); let proxy_resp = gateway::io::proxy_mcp_call(&state.client, &active_url, &proxy_body).await; assert!(proxy_resp.is_ok()); let resp_json: serde_json::Value = serde_json::from_slice(&proxy_resp.unwrap()).unwrap(); let result = resp_json.get("result"); assert!(result.is_some()); let text = result .and_then(|r| r.get("content")) .and_then(|c| c.get(0)) .and_then(|c| c.get("text")) .and_then(|t| t.as_str()) .unwrap_or(""); let wizard: serde_json::Value = serde_json::from_str(text).unwrap(); assert!(wizard.get("steps").is_some()); } // ── Aggregate pipeline status integration tests ───────────────────── #[tokio::test] async fn aggregate_pipeline_status_integration_healthy_and_unreachable() { use tokio::io::{AsyncReadExt, AsyncWriteExt}; let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let mock_port = listener.local_addr().unwrap().port(); let healthy_url = format!("http://127.0.0.1:{mock_port}"); tokio::spawn(async move { if let Ok((mut stream, _)) = listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let pipeline_json = serde_json::to_string(&serde_json::json!({ "active": [ { "story_id": "1_story_a", "name": "A", "stage": "current" }, { "story_id": "2_story_b", "name": "B", "stage": "qa" }, { "story_id": "3_story_c", "name": "C", "stage": "current", "blocked": true, "retry_count": 5 }, ], "backlog": [{ "story_id": "4_story_d", "name": "D" }], "backlog_count": 1 })) .unwrap(); let body = serde_json::to_vec(&serde_json::json!({ "jsonrpc": "2.0", "id": 1, "result": { "content": [{ "type": "text", "text": pipeline_json }] } })) .unwrap(); let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&body).await; } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; let unreachable_url = "http://127.0.0.1:1".to_string(); let mut project_urls = BTreeMap::new(); project_urls.insert("healthy-project".to_string(), healthy_url); project_urls.insert("broken-project".to_string(), unreachable_url); let client = reqwest::Client::new(); let statuses = gateway::fetch_all_project_pipeline_statuses(&project_urls, &client).await; assert!(statuses.contains_key("healthy-project")); assert!(statuses.contains_key("broken-project")); let healthy = &statuses["healthy-project"]; assert!(healthy.get("error").is_none()); assert_eq!(healthy["counts"]["backlog"], 1); assert_eq!(healthy["counts"]["current"], 2); assert_eq!(healthy["counts"]["qa"], 1); let blocked = healthy["blocked"].as_array().unwrap(); assert_eq!(blocked.len(), 1); assert_eq!(blocked[0]["story_id"], "3_story_c"); let broken = &statuses["broken-project"]; assert!(broken.get("error").is_some()); } // ── Multi-project notification poller integration ──────────────────── #[tokio::test] async fn gateway_notification_poller_delivers_events_from_two_projects_with_project_tags() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; use tokio::io::{AsyncReadExt, AsyncWriteExt}; type CallLog = Arc>>; struct MockTransport { calls: CallLog, } #[async_trait] impl ChatTransport for MockTransport { async fn send_message( &self, room_id: &str, plain: &str, html: &str, ) -> Result { self.calls.lock().unwrap().push(( room_id.to_string(), plain.to_string(), html.to_string(), )); Ok("mock-id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); let transport = Arc::new(MockTransport { calls: Arc::clone(&calls), }); let alpha_events = vec![StoredEvent::StageTransition { story_id: "1_story_alpha".to_string(), from_stage: "2_current".to_string(), to_stage: "3_qa".to_string(), timestamp_ms: 100, }]; let alpha_body = serde_json::to_vec(&alpha_events).unwrap(); let alpha_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let alpha_port = alpha_listener.local_addr().unwrap().port(); let alpha_url = format!("http://127.0.0.1:{alpha_port}"); tokio::spawn(async move { for _ in 0..4 { if let Ok((mut stream, _)) = alpha_listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", alpha_body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&alpha_body).await; } } }); let beta_events = vec![StoredEvent::MergeFailure { story_id: "2_story_beta".to_string(), reason: "merge conflict in lib.rs".to_string(), timestamp_ms: 200, }]; let beta_body = serde_json::to_vec(&beta_events).unwrap(); let beta_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let beta_port = beta_listener.local_addr().unwrap().port(); let beta_url = format!("http://127.0.0.1:{beta_port}"); tokio::spawn(async move { for _ in 0..4 { if let Ok((mut stream, _)) = beta_listener.accept().await { let mut buf = vec![0u8; 4096]; let _ = stream.read(&mut buf).await; let header = format!( "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", beta_body.len() ); let _ = stream.write_all(header.as_bytes()).await; let _ = stream.write_all(&beta_body).await; } } }); tokio::time::sleep(std::time::Duration::from_millis(10)).await; let mut project_urls = BTreeMap::new(); project_urls.insert("alpha".to_string(), alpha_url); project_urls.insert("beta".to_string(), beta_url); gateway::spawn_gateway_notification_poller( transport as Arc, vec!["!room:example.org".to_string()], project_urls, 1, ); tokio::time::sleep(std::time::Duration::from_millis(1500)).await; let calls = calls.lock().unwrap(); assert!( !calls.is_empty(), "Expected at least one notification; got none" ); let plains: Vec<&str> = calls.iter().map(|(_, p, _)| p.as_str()).collect(); let alpha_notification = plains .iter() .any(|p| p.contains("[alpha]") && p.contains("1")); let beta_notification = plains .iter() .any(|p| p.contains("[beta]") && p.contains("merge conflict")); assert!( alpha_notification, "Expected a notification from [alpha] containing story ID '1'; got: {plains:?}" ); assert!( beta_notification, "Expected a notification from [beta] containing 'merge conflict'; got: {plains:?}" ); for (room_id, _, _) in calls.iter() { assert_eq!( room_id, "!room:example.org", "All notifications must go to the gateway room" ); } } // ── Gateway broadcaster forwarder tests ───────────────────────────── #[tokio::test] async fn broadcaster_forwarder_forwards_events_with_project_tag() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; type CallLog = Arc>>; struct MockTransport { calls: CallLog, } #[async_trait] impl ChatTransport for MockTransport { async fn send_message( &self, room_id: &str, plain: &str, _html: &str, ) -> Result { self.calls .lock() .unwrap() .push((room_id.to_string(), plain.to_string())); Ok("id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new())); let transport = Arc::new(MockTransport { calls: Arc::clone(&calls), }); let (tx, rx) = tokio::sync::broadcast::channel::(16); gateway::spawn_gateway_broadcaster_forwarder( transport as Arc, vec!["!room:example.org".to_string()], rx, ); // Give the forwarder task a moment to start. tokio::time::sleep(std::time::Duration::from_millis(10)).await; let event = crate::service::gateway::GatewayStatusEvent { project: "my-project".to_string(), event: StoredEvent::StageTransition { story_id: "7_story_x".to_string(), from_stage: "2_current".to_string(), to_stage: "3_qa".to_string(), timestamp_ms: 100, }, }; tx.send(event).unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; let messages = calls.lock().unwrap(); assert_eq!(messages.len(), 1, "Expected exactly one notification"); let (room, plain) = &messages[0]; assert_eq!(room, "!room:example.org"); assert!( plain.starts_with("[my-project]"), "Expected [my-project] prefix; got: {plain}" ); assert!( plain.contains("#7"), "Expected story number #7; got: {plain}" ); } #[tokio::test] async fn broadcaster_forwarder_resubscribes_on_lag() { use crate::chat::{ChatTransport, MessageId}; use crate::service::events::StoredEvent; use async_trait::async_trait; type Counter = Arc>; struct CountTransport { count: Counter, } #[async_trait] impl ChatTransport for CountTransport { async fn send_message( &self, _room_id: &str, _plain: &str, _html: &str, ) -> Result { *self.count.lock().unwrap() += 1; Ok("id".to_string()) } async fn edit_message( &self, _room_id: &str, _id: &str, _plain: &str, _html: &str, ) -> Result<(), String> { Ok(()) } async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> { Ok(()) } } let count: Counter = Arc::new(std::sync::Mutex::new(0)); let transport = Arc::new(CountTransport { count: Arc::clone(&count), }); // Use a tiny channel (capacity 1) so the second send causes a Lagged error. let (tx, rx) = tokio::sync::broadcast::channel::(1); // Flood the channel to trigger Lagged before the forwarder task starts. let make_event = |n: u64| crate::service::gateway::GatewayStatusEvent { project: "p".to_string(), event: StoredEvent::StageTransition { story_id: format!("{n}_story"), from_stage: "2_current".to_string(), to_stage: "3_qa".to_string(), timestamp_ms: n, }, }; // Send 3 events to overflow the capacity-1 channel before the task runs. let _ = tx.send(make_event(1)); let _ = tx.send(make_event(2)); let _ = tx.send(make_event(3)); gateway::spawn_gateway_broadcaster_forwarder( transport as Arc, vec!["!r:x.org".to_string()], rx, ); // Send one more event after the forwarder subscribes; it should arrive. tokio::time::sleep(std::time::Duration::from_millis(20)).await; tx.send(make_event(4)).unwrap(); tokio::time::sleep(std::time::Duration::from_millis(100)).await; // After Lagged + resubscribe, the forwarder must still process event 4. let received = *count.lock().unwrap(); assert!( received >= 1, "Expected at least one event after Lagged resubscribe; got {received}" ); } // ── BotConfig tests ───────────────────────────────────────────────── #[test] fn bot_config_loads_from_gateway_config_dir() { use crate::chat::transport::matrix::BotConfig; let tmp = tempfile::tempdir().unwrap(); let huskies_dir = tmp.path().join(".huskies"); std::fs::create_dir_all(&huskies_dir).unwrap(); std::fs::write( huskies_dir.join("bot.toml"), r#" homeserver = "https://matrix.example.com" username = "@bot:example.com" password = "secret" room_ids = ["!abc:example.com"] enabled = true "#, ) .unwrap(); let config = BotConfig::load(tmp.path()); assert!(config.is_some()); let config = config.unwrap(); assert_eq!( config.homeserver.as_deref(), Some("https://matrix.example.com") ); } #[test] fn bot_config_absent_returns_none_in_gateway_mode() { use crate::chat::transport::matrix::BotConfig; let tmp = tempfile::tempdir().unwrap(); let config = BotConfig::load(tmp.path()); assert!(config.is_none()); } #[test] fn bot_config_disabled_returns_none_in_gateway_mode() { use crate::chat::transport::matrix::BotConfig; let tmp = tempfile::tempdir().unwrap(); let huskies_dir = tmp.path().join(".huskies"); std::fs::create_dir_all(&huskies_dir).unwrap(); std::fs::write( huskies_dir.join("bot.toml"), r#" homeserver = "https://matrix.example.com" username = "@bot:example.com" password = "secret" room_ids = ["!abc:example.com"] enabled = false "#, ) .unwrap(); let config = BotConfig::load(tmp.path()); assert!(config.is_none()); } // ── Gateway MCP SSE proxy integration tests ────────────────────────── #[tokio::test] async fn gateway_mcp_sse_proxy_streams_progress_and_final_response() { let mut mock_sled = mockito::Server::new_async().await; let prog1 = serde_json::json!({ "jsonrpc": "2.0", "method": "notifications/progress", "params": { "progressToken": "tok1", "progress": 1.0 } }); let prog2 = serde_json::json!({ "jsonrpc": "2.0", "method": "notifications/progress", "params": { "progressToken": "tok1", "progress": 2.0 } }); let final_resp = serde_json::json!({ "jsonrpc": "2.0", "id": 1, "result": { "content": [{ "type": "text", "text": "tests passed" }] } }); let sse_body = format!("data: {prog1}\n\ndata: {prog2}\n\ndata: {final_resp}\n\n"); let _mock = mock_sled .mock("POST", "/mcp") .with_status(200) .with_header("content-type", "text/event-stream") .with_body(&sse_body) .create_async() .await; let mut projects = BTreeMap::new(); projects.insert("sled".to_string(), ProjectEntry::with_url(mock_sled.url())); let config = GatewayConfig { projects, sled_tokens: BTreeMap::new(), }; let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()); let app = poem::Route::new() .at("/mcp", poem::post(gateway_mcp_post_handler)) .data(state.clone()); let cli = poem::test::TestClient::new(app); let rpc_body = serde_json::to_vec(&serde_json::json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "run_tests", "arguments": {}, "_meta": { "progressToken": "tok1" } } })) .unwrap(); let resp = cli .post("/mcp") .header("content-type", "application/json") .header("accept", "text/event-stream") .body(rpc_body) .send() .await; let body = resp.0.into_body().into_string().await.unwrap(); let data_lines: Vec<&str> = body .lines() .filter_map(|l| l.strip_prefix("data: ")) .collect(); assert_eq!( data_lines.len(), 3, "Expected 3 SSE events (2 progress + 1 final); got {}: {:?}", data_lines.len(), body ); let ev1: serde_json::Value = serde_json::from_str(data_lines[0]).expect("event 1 is valid JSON"); assert_eq!( ev1["method"], "notifications/progress", "event 1 must be a progress notification" ); assert_eq!(ev1["params"]["progress"], 1.0); let ev2: serde_json::Value = serde_json::from_str(data_lines[1]).expect("event 2 is valid JSON"); assert_eq!( ev2["method"], "notifications/progress", "event 2 must be a progress notification" ); assert_eq!(ev2["params"]["progress"], 2.0); let ev3: serde_json::Value = serde_json::from_str(data_lines[2]).expect("event 3 is valid JSON"); assert_eq!(ev3["id"], 1, "event 3 must be the final JSON-RPC response"); assert!( ev3.get("result").is_some(), "event 3 must carry a result field" ); } #[tokio::test] async fn gateway_mcp_post_without_sse_returns_plain_json() { let mut mock_sled = mockito::Server::new_async().await; let json_resp = serde_json::json!({ "jsonrpc": "2.0", "id": 2, "result": { "content": [{ "type": "text", "text": "done" }] } }); let _mock = mock_sled .mock("POST", "/mcp") .with_status(200) .with_header("content-type", "application/json") .with_body(serde_json::to_string(&json_resp).unwrap()) .create_async() .await; let mut projects = BTreeMap::new(); projects.insert("sled".to_string(), ProjectEntry::with_url(mock_sled.url())); let config = GatewayConfig { projects, sled_tokens: BTreeMap::new(), }; let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()); let app = poem::Route::new() .at("/mcp", poem::post(gateway_mcp_post_handler)) .data(state.clone()); let cli = poem::test::TestClient::new(app); let rpc_body = serde_json::to_vec(&serde_json::json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "run_tests", "arguments": {} } })) .unwrap(); let resp = cli .post("/mcp") .header("content-type", "application/json") .body(rpc_body) .send() .await; let ct = resp .0 .headers() .get("content-type") .and_then(|v| v.to_str().ok()) .unwrap_or(""); assert!( ct.contains("application/json"), "Non-SSE path must return application/json; got: {ct}" ); let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap(); assert_eq!(body["id"], 2); assert!( body.get("result").is_some(), "Expected result in plain JSON response" ); } // ── Story 899: MCP-over-WS uplink integration ──────────────────────────────── /// Build a `SledConnection` plus a spawned "mock sled" task that: /// /// * Reads outbound `mcp_request` envelopes off the connection's channel. /// * Invokes the supplied closure to build a `result` value for each request. /// * Resolves the matching in-flight oneshot directly (the same effect the WS /// handler has when it receives an `mcp_response` from a real sled). /// /// Returns the registered `SledConnection`. fn spawn_mock_sled(handler: F) -> crate::service::gateway::SledConnection where F: Fn(&serde_json::Value) -> serde_json::Value + Send + Sync + 'static, { use crate::service::gateway::SledConnection; use std::sync::atomic::AtomicI64; let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); let last_heartbeat_ms = Arc::new(AtomicI64::new(chrono::Utc::now().timestamp_millis())); let in_flight = Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())); let conn = SledConnection { tx, last_heartbeat_ms, in_flight: Arc::clone(&in_flight), }; let handler = Arc::new(handler); tokio::spawn(async move { while let Some(env) = rx.recv().await { if env.msg_type != "mcp_request" { continue; } let body_str = env .payload .get("body") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); let body_json: serde_json::Value = match serde_json::from_str(&body_str) { Ok(v) => v, Err(_) => serde_json::Value::Null, }; let result = handler(&body_json); let response = serde_json::json!({ "jsonrpc": "2.0", "id": body_json.get("id").cloned().unwrap_or(serde_json::Value::Null), "result": result, }); if let Some(oneshot_tx) = in_flight.lock().await.remove(&env.req_id) { let _ = oneshot_tx.send(response); } } }); conn } /// AC 9: gateway switches active project, calls tools/list and tools/call /// against a sled connected only via WS uplink, gets correct responses. #[tokio::test] async fn ws_only_sled_handles_tools_list_and_tools_call() { use crate::service::gateway::ProjectEntry; // Project entry with NO url — WS-only. let mut projects = BTreeMap::new(); projects.insert( "ws-only".into(), ProjectEntry { url: None, auth_token: Some("secret".into()), }, ); let config = GatewayConfig { projects, sled_tokens: BTreeMap::new(), }; let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()); let conn = spawn_mock_sled(|body| { let method = body.get("method").and_then(|m| m.as_str()).unwrap_or(""); match method { "tools/list" => serde_json::json!({ "tools": [ { "name": "my_tool", "description": "test" } ] }), "tools/call" => serde_json::json!({ "content": [{ "type": "text", "text": "called ok" }] }), _ => serde_json::json!({ "echo": method }), } }); state .register_sled_connection("ws-only".to_string(), conn) .await; // tools/list via proxy_active_mcp. let body = serde_json::to_vec(&serde_json::json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {} })) .unwrap(); let resp = state.proxy_active_mcp(&body).await.expect("ws proxy works"); let resp_json: serde_json::Value = serde_json::from_slice(&resp).unwrap(); assert_eq!( resp_json["result"]["tools"][0]["name"], "my_tool", "tools/list response must come from the WS-connected sled, not HTTP" ); // tools/call via proxy_active_mcp. let body = serde_json::to_vec(&serde_json::json!({ "jsonrpc": "2.0", "id": 2, "method": "tools/call", "params": { "name": "my_tool", "arguments": {} } })) .unwrap(); let resp = state.proxy_active_mcp(&body).await.expect("ws proxy works"); let resp_json: serde_json::Value = serde_json::from_slice(&resp).unwrap(); assert_eq!( resp_json["result"]["content"][0]["text"], "called ok", "tools/call response must come from the WS-connected sled, not HTTP" ); } /// AC 10: two sleds connected to one gateway concurrently; gateway routes /// calls to the right sled based on active project. #[tokio::test] async fn two_concurrent_sleds_are_routed_by_active_project() { use crate::service::gateway::ProjectEntry; let mut projects = BTreeMap::new(); projects.insert( "alpha".into(), ProjectEntry { url: None, auth_token: Some("alpha-tok".into()), }, ); projects.insert( "beta".into(), ProjectEntry { url: None, auth_token: Some("beta-tok".into()), }, ); let config = GatewayConfig { projects, sled_tokens: BTreeMap::new(), }; let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap()); let alpha_conn = spawn_mock_sled(|_body| { serde_json::json!({ "content": [{ "type": "text", "text": "from-alpha" }] }) }); let beta_conn = spawn_mock_sled(|_body| { serde_json::json!({ "content": [{ "type": "text", "text": "from-beta" }] }) }); state .register_sled_connection("alpha".to_string(), alpha_conn) .await; state .register_sled_connection("beta".to_string(), beta_conn) .await; // Switch to alpha. gateway::switch_project(&state, "alpha").await.unwrap(); let body = serde_json::to_vec(&serde_json::json!({ "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "noop", "arguments": {} } })) .unwrap(); let resp = state.proxy_active_mcp(&body).await.expect("ws proxy works"); let resp_json: serde_json::Value = serde_json::from_slice(&resp).unwrap(); assert_eq!( resp_json["result"]["content"][0]["text"], "from-alpha", "When active project is alpha, calls must route to the alpha sled" ); // Switch to beta. gateway::switch_project(&state, "beta").await.unwrap(); let resp = state.proxy_active_mcp(&body).await.expect("ws proxy works"); let resp_json: serde_json::Value = serde_json::from_slice(&resp).unwrap(); assert_eq!( resp_json["result"]["content"][0]["text"], "from-beta", "When active project is beta, calls must route to the beta sled" ); }