Files
huskies/server/src/gateway.rs
T
2026-04-28 08:59:13 +00:00

1016 lines
37 KiB
Rust

//! Multi-project gateway — entrypoint wiring and route tree.
//!
//! When `huskies --gateway` is used, the server starts in gateway mode.
//! Business logic lives in `service::gateway`, HTTP handlers in `http::gateway`.
//! This file contains only the `run` entrypoint and `build_gateway_route` wiring.
use crate::http::gateway::*;
use crate::service::gateway::{self, GatewayState};
use poem::EndpointExt;
use std::path::Path;
use std::sync::Arc;
// Re-export public types that callers reference as `crate::gateway::*`.
pub use crate::service::gateway::{
GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, ProjectEntry,
broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact,
spawn_gateway_broadcaster_forwarder, spawn_gateway_notification_poller,
subscribe_status_events,
};
/// Build the complete gateway route tree.
///
/// Extracted from `run` so that tests can construct the full route tree and
/// catch duplicate-route panics before they reach production.
pub fn build_gateway_route(state_arc: Arc<GatewayState>) -> impl poem::Endpoint {
poem::Route::new()
.at("/bot-config", poem::get(gateway_bot_config_page_handler))
.at("/api/gateway", poem::get(gateway_api_handler))
.at("/api/gateway/switch", poem::post(gateway_switch_handler))
.at(
"/api/gateway/pipeline",
poem::get(gateway_all_pipeline_handler),
)
.at(
"/api/gateway/projects",
poem::post(gateway_add_project_handler),
)
.at(
"/api/gateway/projects/:name",
poem::delete(gateway_remove_project_handler),
)
.at(
"/api/gateway/bot-config",
poem::get(gateway_bot_config_get_handler).post(gateway_bot_config_save_handler),
)
.at(
"/mcp",
poem::post(gateway_mcp_post_handler).get(gateway_mcp_get_handler),
)
.at("/health", poem::get(gateway_health_handler))
// Agent join endpoints.
.at("/gateway/mode", poem::get(gateway_mode_handler))
.at(
"/gateway/tokens",
poem::post(gateway_generate_token_handler),
)
.at(
"/gateway/events/push",
poem::get(gateway_event_push_handler),
)
// Serve the embedded React frontend so the gateway has a UI.
.at(
"/assets/*path",
poem::get(crate::http::assets::embedded_asset),
)
.at("/*path", poem::get(crate::http::assets::embedded_file))
.at("/", poem::get(crate::http::assets::embedded_index))
.data(state_arc)
}
/// Start the gateway HTTP server. This is the entry point when `--gateway` is used.
pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> {
let config_dir = config_path
.parent()
.unwrap_or(std::path::Path::new("."))
.to_path_buf();
let config = gateway::io::load_config(config_path).map_err(std::io::Error::other)?;
let state =
GatewayState::new(config, config_dir.clone(), port).map_err(std::io::Error::other)?;
let state_arc = Arc::new(state);
let active = state_arc.active_project.read().await.clone();
crate::slog!("[gateway] Starting gateway on port {port}, active project: {active}");
crate::slog!(
"[gateway] Registered projects: {}",
state_arc
.projects
.read()
.await
.keys()
.cloned()
.collect::<Vec<_>>()
.join(", ")
);
// Write `.mcp.json` so that the gateway's bot connects to this gateway's MCP endpoint.
if let Err(e) = gateway::io::write_gateway_mcp_json(&config_dir, port) {
crate::slog!("[gateway] Warning: could not write .mcp.json: {e}");
}
// Spawn the Matrix bot if `.huskies/bot.toml` exists in the config directory.
let gateway_projects: Vec<String> = state_arc.projects.read().await.keys().cloned().collect();
let gateway_project_urls: std::collections::BTreeMap<String, String> = state_arc
.projects
.read()
.await
.iter()
.map(|(name, entry)| (name.clone(), entry.url.clone()))
.collect();
let bot_abort = gateway::io::spawn_gateway_bot(
&config_dir,
Arc::clone(&state_arc.active_project),
gateway_projects,
gateway_project_urls,
port,
Some(state_arc.event_tx.clone()),
);
*state_arc.bot_handle.lock().await = bot_abort;
let route = build_gateway_route(state_arc);
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let addr = format!("{host}:{port}");
crate::slog!("[gateway] Listening on {addr}");
poem::Server::new(poem::listener::TcpListener::bind(&addr))
.run(route)
.await
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
use crate::service::gateway::{GatewayConfig, GatewayState, ProjectEntry};
use std::collections::BTreeMap;
use std::path::PathBuf;
fn make_test_state() -> Arc<GatewayState> {
let mut projects = BTreeMap::new();
projects.insert(
"test".into(),
ProjectEntry {
url: "http://test:3001".into(),
},
);
let config = GatewayConfig { projects };
Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap())
}
#[test]
fn gateway_route_tree_builds_without_panic() {
let state = make_test_state();
let _route = build_gateway_route(state);
}
// ── Tests that exercised internal functions have been moved to their
// ── respective service/gateway modules. The integration tests that use
// ── poem::test::TestClient and mock HTTP servers remain here since they
// ── test the combined HTTP + service interaction through real routes.
#[tokio::test]
async fn generate_token_creates_pending_token() {
let state = make_test_state();
let app = poem::Route::new()
.at(
"/gateway/tokens",
poem::post(gateway_generate_token_handler),
)
.data(state.clone());
let cli = poem::test::TestClient::new(app);
let resp = cli.post("/gateway/tokens").send().await;
assert_eq!(resp.0.status(), poem::http::StatusCode::OK);
let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap();
let token = body["token"].as_str().unwrap();
assert!(!token.is_empty());
let tokens = state.pending_tokens.read().await;
assert!(tokens.contains_key(token));
}
// ── Notification poller integration tests ────────────────────────────
#[tokio::test]
async fn gateway_notification_poller_continues_when_one_project_unreachable() {
use crate::chat::{ChatTransport, MessageId};
use crate::service::events::StoredEvent;
use async_trait::async_trait;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
type CallLog = Arc<std::sync::Mutex<Vec<String>>>;
struct MockTransport {
calls: CallLog,
}
#[async_trait]
impl ChatTransport for MockTransport {
async fn send_message(
&self,
_room_id: &str,
plain: &str,
_html: &str,
) -> Result<MessageId, String> {
self.calls.lock().unwrap().push(plain.to_string());
Ok("id".to_string())
}
async fn edit_message(
&self,
_room_id: &str,
_id: &str,
_plain: &str,
_html: &str,
) -> Result<(), String> {
Ok(())
}
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
Ok(())
}
}
let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new()));
let transport = Arc::new(MockTransport {
calls: Arc::clone(&calls),
});
let event = vec![StoredEvent::StoryBlocked {
story_id: "10_story_ok".to_string(),
reason: "retry limit".to_string(),
timestamp_ms: 500,
}];
let event_body = serde_json::to_vec(&event).unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let good_port = listener.local_addr().unwrap().port();
let good_url = format!("http://127.0.0.1:{good_port}");
tokio::spawn(async move {
for _ in 0..4 {
if let Ok((mut stream, _)) = listener.accept().await {
let mut buf = vec![0u8; 4096];
let _ = stream.read(&mut buf).await;
let header = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
event_body.len()
);
let _ = stream.write_all(header.as_bytes()).await;
let _ = stream.write_all(&event_body).await;
}
}
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let bad_url = "http://127.0.0.1:1".to_string();
let mut project_urls = BTreeMap::new();
project_urls.insert("good-project".to_string(), good_url);
project_urls.insert("unreachable-project".to_string(), bad_url);
gateway::spawn_gateway_notification_poller(
transport as Arc<dyn crate::chat::ChatTransport>,
vec!["!room:example.org".to_string()],
project_urls,
1,
);
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
let messages = calls.lock().unwrap();
assert!(
!messages.is_empty(),
"Expected notifications from the reachable project; got none"
);
let has_good = messages
.iter()
.any(|m| m.contains("[good-project]") && m.contains("10_story_ok"));
assert!(
has_good,
"Expected a notification from [good-project]; got: {messages:?}"
);
let has_bad = messages.iter().any(|m| m.contains("[unreachable-project]"));
assert!(
!has_bad,
"Unreachable project must not produce notifications; got: {messages:?}"
);
}
#[tokio::test]
async fn gateway_notification_poller_sends_only_to_configured_gateway_rooms() {
use crate::chat::{ChatTransport, MessageId};
use crate::service::events::StoredEvent;
use async_trait::async_trait;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
type RoomLog = Arc<std::sync::Mutex<Vec<String>>>;
struct RoomCapture {
rooms: RoomLog,
}
#[async_trait]
impl ChatTransport for RoomCapture {
async fn send_message(
&self,
room_id: &str,
_plain: &str,
_html: &str,
) -> Result<MessageId, String> {
self.rooms.lock().unwrap().push(room_id.to_string());
Ok("id".to_string())
}
async fn edit_message(
&self,
_room_id: &str,
_id: &str,
_plain: &str,
_html: &str,
) -> Result<(), String> {
Ok(())
}
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
Ok(())
}
}
let rooms: RoomLog = Arc::new(std::sync::Mutex::new(Vec::new()));
let transport = Arc::new(RoomCapture {
rooms: Arc::clone(&rooms),
});
let event = vec![StoredEvent::MergeFailure {
story_id: "5_story_x".to_string(),
reason: "conflict".to_string(),
timestamp_ms: 300,
}];
let event_body = serde_json::to_vec(&event).unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
let url = format!("http://127.0.0.1:{port}");
tokio::spawn(async move {
for _ in 0..4 {
if let Ok((mut stream, _)) = listener.accept().await {
let mut buf = vec![0u8; 4096];
let _ = stream.read(&mut buf).await;
let header = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
event_body.len()
);
let _ = stream.write_all(header.as_bytes()).await;
let _ = stream.write_all(&event_body).await;
}
}
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
const GATEWAY_ROOM: &str = "!gateway:example.org";
#[allow(dead_code)]
const PER_PROJECT_ROOM: &str = "!project:example.org";
let mut project_urls = BTreeMap::new();
project_urls.insert("myproj".to_string(), url);
gateway::spawn_gateway_notification_poller(
transport as Arc<dyn crate::chat::ChatTransport>,
vec![GATEWAY_ROOM.to_string()],
project_urls,
1,
);
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
let room_calls = rooms.lock().unwrap();
assert!(
!room_calls.is_empty(),
"Expected at least one notification; got none"
);
for room in room_calls.iter() {
assert_eq!(
room, GATEWAY_ROOM,
"Notification must only go to the gateway room, not {room}"
);
}
assert!(
!room_calls.iter().any(|r| r == PER_PROJECT_ROOM),
"Per-project room must not receive gateway aggregated notifications"
);
}
// ── init_project integration tests ──────────────────────────────────
#[tokio::test]
async fn init_project_scaffolds_huskies_dir() {
let dir = tempfile::tempdir().unwrap();
let state = make_test_state();
let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await;
assert!(
result.is_ok(),
"init_project should succeed: {:?}",
result.err()
);
assert!(dir.path().join(".huskies").exists());
assert!(dir.path().join(".huskies/project.toml").exists());
assert!(dir.path().join(".huskies/agents.toml").exists());
assert!(dir.path().join("script/test").exists());
}
#[tokio::test]
async fn init_project_creates_wizard_state() {
let dir = tempfile::tempdir().unwrap();
let state = make_test_state();
gateway::init_project(&state, dir.path().to_str().unwrap(), None, None)
.await
.unwrap();
let wizard_state_path = dir.path().join(".huskies/wizard_state.json");
assert!(wizard_state_path.exists());
let content = std::fs::read_to_string(&wizard_state_path).unwrap();
let v: serde_json::Value = serde_json::from_str(&content).unwrap();
assert!(v.get("steps").is_some());
assert!(v.get("completed").is_some());
}
#[tokio::test]
async fn init_project_already_initialised_returns_error() {
let dir = tempfile::tempdir().unwrap();
std::fs::create_dir_all(dir.path().join(".huskies")).unwrap();
let state = make_test_state();
let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn init_project_missing_path_returns_error() {
let state = make_test_state();
let result = gateway::init_project(&state, "", None, None).await;
assert!(result.is_err());
}
#[tokio::test]
async fn init_project_registers_in_projects_toml_when_name_and_url_given() {
let dir = tempfile::tempdir().unwrap();
let config_dir = tempfile::tempdir().unwrap();
let mut projects = BTreeMap::new();
projects.insert(
"existing".into(),
ProjectEntry {
url: "http://existing:3001".into(),
},
);
let config = GatewayConfig { projects };
let state =
Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap());
let result = gateway::init_project(
&state,
dir.path().to_str().unwrap(),
Some("new-project"),
Some("http://new-project:3002"),
)
.await;
assert!(result.is_ok());
let projects = state.projects.read().await;
assert!(projects.contains_key("new-project"));
assert_eq!(projects["new-project"].url, "http://new-project:3002");
}
#[tokio::test]
async fn init_project_duplicate_name_returns_error() {
let dir = tempfile::tempdir().unwrap();
let mut projects = BTreeMap::new();
projects.insert(
"taken".into(),
ProjectEntry {
url: "http://taken:3001".into(),
},
);
let config = GatewayConfig { projects };
let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap());
let result = gateway::init_project(
&state,
dir.path().to_str().unwrap(),
Some("taken"),
Some("http://new:3002"),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn init_project_then_wizard_status_integration() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let mock_port = listener.local_addr().unwrap().port();
let mock_url = format!("http://127.0.0.1:{mock_port}");
tokio::spawn(async move {
if let Ok((mut stream, _)) = listener.accept().await {
let mut buf = vec![0u8; 4096];
let _ = stream.read(&mut buf).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [{
"type": "text",
"text": "{\"steps\":[{\"id\":\"scaffold\",\"title\":\"Scaffold\",\"status\":\"confirmed\"}],\"completed\":false}"
}]
}
});
let body_bytes = serde_json::to_vec(&body).unwrap();
let header = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body_bytes.len()
);
let _ = stream.write_all(header.as_bytes()).await;
let _ = stream.write_all(&body_bytes).await;
}
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let mut projects = BTreeMap::new();
projects.insert("mock-project".into(), ProjectEntry { url: mock_url });
let config = GatewayConfig { projects };
let config_dir = tempfile::tempdir().unwrap();
let state =
Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap());
let project_dir = tempfile::tempdir().unwrap();
let result =
gateway::init_project(&state, project_dir.path().to_str().unwrap(), None, None).await;
assert!(result.is_ok());
assert!(project_dir.path().join(".huskies").exists());
let wizard_path = project_dir.path().join(".huskies/wizard_state.json");
assert!(wizard_path.exists());
// Proxy call to the mock server.
let active_url = state.active_url().await.unwrap();
let proxy_body = serde_json::to_vec(&serde_json::json!({
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": { "name": "wizard_status", "arguments": {} }
}))
.unwrap();
let proxy_resp = gateway::io::proxy_mcp_call(&state.client, &active_url, &proxy_body).await;
assert!(proxy_resp.is_ok());
let resp_json: serde_json::Value = serde_json::from_slice(&proxy_resp.unwrap()).unwrap();
let result = resp_json.get("result");
assert!(result.is_some());
let text = result
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
.unwrap_or("");
let wizard: serde_json::Value = serde_json::from_str(text).unwrap();
assert!(wizard.get("steps").is_some());
}
// ── Aggregate pipeline status integration tests ─────────────────────
#[tokio::test]
async fn aggregate_pipeline_status_integration_healthy_and_unreachable() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let mock_port = listener.local_addr().unwrap().port();
let healthy_url = format!("http://127.0.0.1:{mock_port}");
tokio::spawn(async move {
if let Ok((mut stream, _)) = listener.accept().await {
let mut buf = vec![0u8; 4096];
let _ = stream.read(&mut buf).await;
let pipeline_json = serde_json::to_string(&serde_json::json!({
"active": [
{ "story_id": "1_story_a", "name": "A", "stage": "current" },
{ "story_id": "2_story_b", "name": "B", "stage": "qa" },
{ "story_id": "3_story_c", "name": "C", "stage": "current", "blocked": true, "retry_count": 5 },
],
"backlog": [{ "story_id": "4_story_d", "name": "D" }],
"backlog_count": 1
}))
.unwrap();
let body = serde_json::to_vec(&serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [{ "type": "text", "text": pipeline_json }]
}
}))
.unwrap();
let header = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
body.len()
);
let _ = stream.write_all(header.as_bytes()).await;
let _ = stream.write_all(&body).await;
}
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let unreachable_url = "http://127.0.0.1:1".to_string();
let mut project_urls = BTreeMap::new();
project_urls.insert("healthy-project".to_string(), healthy_url);
project_urls.insert("broken-project".to_string(), unreachable_url);
let client = reqwest::Client::new();
let statuses = gateway::fetch_all_project_pipeline_statuses(&project_urls, &client).await;
assert!(statuses.contains_key("healthy-project"));
assert!(statuses.contains_key("broken-project"));
let healthy = &statuses["healthy-project"];
assert!(healthy.get("error").is_none());
assert_eq!(healthy["counts"]["backlog"], 1);
assert_eq!(healthy["counts"]["current"], 2);
assert_eq!(healthy["counts"]["qa"], 1);
let blocked = healthy["blocked"].as_array().unwrap();
assert_eq!(blocked.len(), 1);
assert_eq!(blocked[0]["story_id"], "3_story_c");
let broken = &statuses["broken-project"];
assert!(broken.get("error").is_some());
}
// ── Multi-project notification poller integration ────────────────────
#[tokio::test]
async fn gateway_notification_poller_delivers_events_from_two_projects_with_project_tags() {
use crate::chat::{ChatTransport, MessageId};
use crate::service::events::StoredEvent;
use async_trait::async_trait;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
type CallLog = Arc<std::sync::Mutex<Vec<(String, String, String)>>>;
struct MockTransport {
calls: CallLog,
}
#[async_trait]
impl ChatTransport for MockTransport {
async fn send_message(
&self,
room_id: &str,
plain: &str,
html: &str,
) -> Result<MessageId, String> {
self.calls.lock().unwrap().push((
room_id.to_string(),
plain.to_string(),
html.to_string(),
));
Ok("mock-id".to_string())
}
async fn edit_message(
&self,
_room_id: &str,
_id: &str,
_plain: &str,
_html: &str,
) -> Result<(), String> {
Ok(())
}
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
Ok(())
}
}
let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new()));
let transport = Arc::new(MockTransport {
calls: Arc::clone(&calls),
});
let alpha_events = vec![StoredEvent::StageTransition {
story_id: "1_story_alpha".to_string(),
from_stage: "2_current".to_string(),
to_stage: "3_qa".to_string(),
timestamp_ms: 100,
}];
let alpha_body = serde_json::to_vec(&alpha_events).unwrap();
let alpha_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let alpha_port = alpha_listener.local_addr().unwrap().port();
let alpha_url = format!("http://127.0.0.1:{alpha_port}");
tokio::spawn(async move {
for _ in 0..4 {
if let Ok((mut stream, _)) = alpha_listener.accept().await {
let mut buf = vec![0u8; 4096];
let _ = stream.read(&mut buf).await;
let header = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
alpha_body.len()
);
let _ = stream.write_all(header.as_bytes()).await;
let _ = stream.write_all(&alpha_body).await;
}
}
});
let beta_events = vec![StoredEvent::MergeFailure {
story_id: "2_story_beta".to_string(),
reason: "merge conflict in lib.rs".to_string(),
timestamp_ms: 200,
}];
let beta_body = serde_json::to_vec(&beta_events).unwrap();
let beta_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let beta_port = beta_listener.local_addr().unwrap().port();
let beta_url = format!("http://127.0.0.1:{beta_port}");
tokio::spawn(async move {
for _ in 0..4 {
if let Ok((mut stream, _)) = beta_listener.accept().await {
let mut buf = vec![0u8; 4096];
let _ = stream.read(&mut buf).await;
let header = format!(
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
beta_body.len()
);
let _ = stream.write_all(header.as_bytes()).await;
let _ = stream.write_all(&beta_body).await;
}
}
});
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let mut project_urls = BTreeMap::new();
project_urls.insert("alpha".to_string(), alpha_url);
project_urls.insert("beta".to_string(), beta_url);
gateway::spawn_gateway_notification_poller(
transport as Arc<dyn crate::chat::ChatTransport>,
vec!["!room:example.org".to_string()],
project_urls,
1,
);
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
let calls = calls.lock().unwrap();
assert!(
!calls.is_empty(),
"Expected at least one notification; got none"
);
let plains: Vec<&str> = calls.iter().map(|(_, p, _)| p.as_str()).collect();
let alpha_notification = plains
.iter()
.any(|p| p.contains("[alpha]") && p.contains("1"));
let beta_notification = plains
.iter()
.any(|p| p.contains("[beta]") && p.contains("merge conflict"));
assert!(
alpha_notification,
"Expected a notification from [alpha] containing story ID '1'; got: {plains:?}"
);
assert!(
beta_notification,
"Expected a notification from [beta] containing 'merge conflict'; got: {plains:?}"
);
for (room_id, _, _) in calls.iter() {
assert_eq!(
room_id, "!room:example.org",
"All notifications must go to the gateway room"
);
}
}
// ── Gateway broadcaster forwarder tests ─────────────────────────────
#[tokio::test]
async fn broadcaster_forwarder_forwards_events_with_project_tag() {
use crate::chat::{ChatTransport, MessageId};
use crate::service::events::StoredEvent;
use async_trait::async_trait;
type CallLog = Arc<std::sync::Mutex<Vec<(String, String)>>>;
struct MockTransport {
calls: CallLog,
}
#[async_trait]
impl ChatTransport for MockTransport {
async fn send_message(
&self,
room_id: &str,
plain: &str,
_html: &str,
) -> Result<MessageId, String> {
self.calls
.lock()
.unwrap()
.push((room_id.to_string(), plain.to_string()));
Ok("id".to_string())
}
async fn edit_message(
&self,
_room_id: &str,
_id: &str,
_plain: &str,
_html: &str,
) -> Result<(), String> {
Ok(())
}
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
Ok(())
}
}
let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new()));
let transport = Arc::new(MockTransport {
calls: Arc::clone(&calls),
});
let (tx, rx) =
tokio::sync::broadcast::channel::<crate::service::gateway::GatewayStatusEvent>(16);
gateway::spawn_gateway_broadcaster_forwarder(
transport as Arc<dyn crate::chat::ChatTransport>,
vec!["!room:example.org".to_string()],
rx,
);
// Give the forwarder task a moment to start.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let event = crate::service::gateway::GatewayStatusEvent {
project: "my-project".to_string(),
event: StoredEvent::StageTransition {
story_id: "7_story_x".to_string(),
from_stage: "2_current".to_string(),
to_stage: "3_qa".to_string(),
timestamp_ms: 100,
},
};
tx.send(event).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let messages = calls.lock().unwrap();
assert_eq!(messages.len(), 1, "Expected exactly one notification");
let (room, plain) = &messages[0];
assert_eq!(room, "!room:example.org");
assert!(
plain.starts_with("[my-project]"),
"Expected [my-project] prefix; got: {plain}"
);
assert!(
plain.contains("7_story_x"),
"Expected story ID; got: {plain}"
);
}
#[tokio::test]
async fn broadcaster_forwarder_resubscribes_on_lag() {
use crate::chat::{ChatTransport, MessageId};
use crate::service::events::StoredEvent;
use async_trait::async_trait;
type Counter = Arc<std::sync::Mutex<usize>>;
struct CountTransport {
count: Counter,
}
#[async_trait]
impl ChatTransport for CountTransport {
async fn send_message(
&self,
_room_id: &str,
_plain: &str,
_html: &str,
) -> Result<MessageId, String> {
*self.count.lock().unwrap() += 1;
Ok("id".to_string())
}
async fn edit_message(
&self,
_room_id: &str,
_id: &str,
_plain: &str,
_html: &str,
) -> Result<(), String> {
Ok(())
}
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
Ok(())
}
}
let count: Counter = Arc::new(std::sync::Mutex::new(0));
let transport = Arc::new(CountTransport {
count: Arc::clone(&count),
});
// Use a tiny channel (capacity 1) so the second send causes a Lagged error.
let (tx, rx) =
tokio::sync::broadcast::channel::<crate::service::gateway::GatewayStatusEvent>(1);
// Flood the channel to trigger Lagged before the forwarder task starts.
let make_event = |n: u64| crate::service::gateway::GatewayStatusEvent {
project: "p".to_string(),
event: StoredEvent::StageTransition {
story_id: format!("{n}_story"),
from_stage: "2_current".to_string(),
to_stage: "3_qa".to_string(),
timestamp_ms: n,
},
};
// Send 3 events to overflow the capacity-1 channel before the task runs.
let _ = tx.send(make_event(1));
let _ = tx.send(make_event(2));
let _ = tx.send(make_event(3));
gateway::spawn_gateway_broadcaster_forwarder(
transport as Arc<dyn crate::chat::ChatTransport>,
vec!["!r:x.org".to_string()],
rx,
);
// Send one more event after the forwarder subscribes; it should arrive.
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
tx.send(make_event(4)).unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// After Lagged + resubscribe, the forwarder must still process event 4.
let received = *count.lock().unwrap();
assert!(
received >= 1,
"Expected at least one event after Lagged resubscribe; got {received}"
);
}
// ── BotConfig tests ─────────────────────────────────────────────────
#[test]
fn bot_config_loads_from_gateway_config_dir() {
use crate::chat::transport::matrix::BotConfig;
let tmp = tempfile::tempdir().unwrap();
let huskies_dir = tmp.path().join(".huskies");
std::fs::create_dir_all(&huskies_dir).unwrap();
std::fs::write(
huskies_dir.join("bot.toml"),
r#"
homeserver = "https://matrix.example.com"
username = "@bot:example.com"
password = "secret"
room_ids = ["!abc:example.com"]
enabled = true
"#,
)
.unwrap();
let config = BotConfig::load(tmp.path());
assert!(config.is_some());
let config = config.unwrap();
assert_eq!(
config.homeserver.as_deref(),
Some("https://matrix.example.com")
);
}
#[test]
fn bot_config_absent_returns_none_in_gateway_mode() {
use crate::chat::transport::matrix::BotConfig;
let tmp = tempfile::tempdir().unwrap();
let config = BotConfig::load(tmp.path());
assert!(config.is_none());
}
#[test]
fn bot_config_disabled_returns_none_in_gateway_mode() {
use crate::chat::transport::matrix::BotConfig;
let tmp = tempfile::tempdir().unwrap();
let huskies_dir = tmp.path().join(".huskies");
std::fs::create_dir_all(&huskies_dir).unwrap();
std::fs::write(
huskies_dir.join("bot.toml"),
r#"
homeserver = "https://matrix.example.com"
username = "@bot:example.com"
password = "secret"
room_ids = ["!abc:example.com"]
enabled = false
"#,
)
.unwrap();
let config = BotConfig::load(tmp.path());
assert!(config.is_none());
}
}