huskies: merge 832
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,143 @@
|
||||
//! Multi-project gateway — entrypoint wiring and route tree.
|
||||
//!
|
||||
//! When `huskies --gateway` is used, the server starts in gateway mode.
|
||||
//! Business logic lives in `service::gateway`, HTTP handlers in `http::gateway`.
|
||||
//! This file contains only the `run` entrypoint and `build_gateway_route` wiring.
|
||||
|
||||
use crate::http::gateway::*;
|
||||
use crate::service::gateway::{self, GatewayState};
|
||||
use poem::EndpointExt;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
// Re-export public types that callers reference as `crate::gateway::*`.
|
||||
pub use crate::service::gateway::{
|
||||
GatewayConfig, GatewayState as GatewayStateType, GatewayStatusEvent, ProjectEntry,
|
||||
broadcast_status_event, fetch_all_project_pipeline_statuses, format_aggregate_status_compact,
|
||||
spawn_gateway_broadcaster_forwarder, spawn_gateway_notification_poller,
|
||||
subscribe_status_events,
|
||||
};
|
||||
|
||||
/// Build the complete gateway route tree.
|
||||
///
|
||||
/// Extracted from `run` so that tests can construct the full route tree and
|
||||
/// catch duplicate-route panics before they reach production.
|
||||
pub fn build_gateway_route(state_arc: Arc<GatewayState>) -> impl poem::Endpoint {
|
||||
poem::Route::new()
|
||||
.at("/bot-config", poem::get(gateway_bot_config_page_handler))
|
||||
.at("/api/gateway", poem::get(gateway_api_handler))
|
||||
.at(
|
||||
"/api/gateway/projects",
|
||||
poem::post(gateway_add_project_handler),
|
||||
)
|
||||
.at(
|
||||
"/api/gateway/projects/:name",
|
||||
poem::delete(gateway_remove_project_handler),
|
||||
)
|
||||
.at(
|
||||
"/api/gateway/bot-config",
|
||||
poem::get(gateway_bot_config_get_handler).post(gateway_bot_config_save_handler),
|
||||
)
|
||||
.at(
|
||||
"/mcp",
|
||||
poem::post(gateway_mcp_post_handler).get(gateway_mcp_get_handler),
|
||||
)
|
||||
// Agent join endpoints.
|
||||
.at("/gateway/mode", poem::get(gateway_mode_handler))
|
||||
.at(
|
||||
"/gateway/tokens",
|
||||
poem::post(gateway_generate_token_handler),
|
||||
)
|
||||
.at(
|
||||
"/gateway/events/push",
|
||||
poem::get(gateway_event_push_handler),
|
||||
)
|
||||
// Agent registration via CRDT-sync WebSocket.
|
||||
.at("/crdt-sync", poem::get(gateway_crdt_sync_handler))
|
||||
// Agent management REST endpoints.
|
||||
.at(
|
||||
"/gateway/agents/:id/assign",
|
||||
poem::post(gateway_assign_agent_handler),
|
||||
)
|
||||
// Serve the embedded React frontend so the gateway has a UI.
|
||||
.at(
|
||||
"/assets/*path",
|
||||
poem::get(crate::http::assets::embedded_asset),
|
||||
)
|
||||
.at("/*path", poem::get(crate::http::assets::embedded_file))
|
||||
.at("/", poem::get(crate::http::assets::embedded_index))
|
||||
.data(state_arc)
|
||||
}
|
||||
|
||||
/// Start the gateway HTTP server. This is the entry point when `--gateway` is used.
|
||||
pub async fn run(config_path: &Path, port: u16) -> Result<(), std::io::Error> {
|
||||
let config_dir = config_path
|
||||
.parent()
|
||||
.unwrap_or(std::path::Path::new("."))
|
||||
.to_path_buf();
|
||||
|
||||
let config = gateway::io::load_config(config_path).map_err(std::io::Error::other)?;
|
||||
|
||||
// Initialise the CRDT so gateway_config.active_project is persisted across restarts.
|
||||
let crdt_db = config_dir.join("gateway.db");
|
||||
if let Err(e) = crate::crdt_state::init(&crdt_db).await {
|
||||
crate::slog!(
|
||||
"[gateway] Warning: CRDT init failed ({e}); active-project selection will not persist"
|
||||
);
|
||||
}
|
||||
|
||||
let state =
|
||||
GatewayState::new(config, config_dir.clone(), port).map_err(std::io::Error::other)?;
|
||||
let state_arc = Arc::new(state);
|
||||
|
||||
let active = state_arc.active_project.read().await.clone();
|
||||
crate::slog!("[gateway] Starting gateway on port {port}, active project: {active}");
|
||||
crate::slog!(
|
||||
"[gateway] Registered projects: {}",
|
||||
state_arc
|
||||
.projects
|
||||
.read()
|
||||
.await
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
);
|
||||
|
||||
// Write `.mcp.json` so that the gateway's bot connects to this gateway's MCP endpoint.
|
||||
if let Err(e) = gateway::io::write_gateway_mcp_json(&config_dir, port) {
|
||||
crate::slog!("[gateway] Warning: could not write .mcp.json: {e}");
|
||||
}
|
||||
|
||||
// Spawn the Matrix bot if `.huskies/bot.toml` exists in the config directory.
|
||||
let gateway_projects: Vec<String> = state_arc.projects.read().await.keys().cloned().collect();
|
||||
let gateway_project_urls: std::collections::BTreeMap<String, String> = state_arc
|
||||
.projects
|
||||
.read()
|
||||
.await
|
||||
.iter()
|
||||
.map(|(name, entry)| (name.clone(), entry.url.clone()))
|
||||
.collect();
|
||||
let bot_abort = gateway::io::spawn_gateway_bot(
|
||||
&config_dir,
|
||||
Arc::clone(&state_arc.active_project),
|
||||
gateway_projects,
|
||||
gateway_project_urls,
|
||||
port,
|
||||
Some(state_arc.event_tx.clone()),
|
||||
);
|
||||
*state_arc.bot_handle.lock().await = bot_abort;
|
||||
|
||||
let route = build_gateway_route(state_arc);
|
||||
|
||||
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let addr = format!("{host}:{port}");
|
||||
crate::slog!("[gateway] Listening on {addr}");
|
||||
|
||||
poem::Server::new(poem::listener::TcpListener::bind(&addr))
|
||||
.run(route)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
@@ -0,0 +1,929 @@
|
||||
//! Integration tests for the gateway route tree and service interactions.
|
||||
|
||||
use super::*;
|
||||
use crate::service::gateway::{GatewayConfig, GatewayState, ProjectEntry};
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
fn make_test_state() -> Arc<GatewayState> {
|
||||
let mut projects = BTreeMap::new();
|
||||
projects.insert(
|
||||
"test".into(),
|
||||
ProjectEntry {
|
||||
url: "http://test:3001".into(),
|
||||
},
|
||||
);
|
||||
let config = GatewayConfig { projects };
|
||||
Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gateway_route_tree_builds_without_panic() {
|
||||
let state = make_test_state();
|
||||
let _route = build_gateway_route(state);
|
||||
}
|
||||
|
||||
// ── Tests that exercised internal functions have been moved to their
|
||||
// ── respective service/gateway modules. The integration tests that use
|
||||
// ── poem::test::TestClient and mock HTTP servers remain here since they
|
||||
// ── test the combined HTTP + service interaction through real routes.
|
||||
|
||||
#[tokio::test]
|
||||
async fn crdt_sync_handshake_then_register_writes_crdt_node() {
|
||||
crate::crdt_state::init_for_test();
|
||||
let state = make_test_state();
|
||||
|
||||
// Generate a valid join token via the tokens endpoint.
|
||||
let token_app = poem::Route::new()
|
||||
.at(
|
||||
"/gateway/tokens",
|
||||
poem::post(gateway_generate_token_handler),
|
||||
)
|
||||
.data(state.clone());
|
||||
let cli = poem::test::TestClient::new(token_app);
|
||||
let resp = cli.post("/gateway/tokens").send().await;
|
||||
assert_eq!(resp.0.status(), poem::http::StatusCode::OK);
|
||||
let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap();
|
||||
let token = body["token"].as_str().unwrap().to_string();
|
||||
|
||||
// Token must be pending before the upgrade.
|
||||
assert!(state.pending_tokens.read().await.contains_key(&token));
|
||||
|
||||
// Call register_agent directly (the service function exercised by the handler).
|
||||
let node = crate::service::gateway::register_agent(
|
||||
&state,
|
||||
&token,
|
||||
"test-label".into(),
|
||||
"ws://test:9000".into(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Token is consumed after registration.
|
||||
assert!(state.pending_tokens.read().await.is_empty());
|
||||
|
||||
// CRDT node was written and is visible via list_agents.
|
||||
let agents = crate::service::gateway::list_agents();
|
||||
assert!(
|
||||
agents.iter().any(|n| n.node_id == node.node_id),
|
||||
"Registered node must appear in list_agents"
|
||||
);
|
||||
|
||||
// remove_agent tombstones the node.
|
||||
assert!(crate::service::gateway::remove_agent(&node.node_id));
|
||||
let alive = crate::service::gateway::list_agents();
|
||||
assert!(
|
||||
!alive.iter().any(|n| n.node_id == node.node_id),
|
||||
"Tombstoned node must not appear in list_agents"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn generate_token_creates_pending_token() {
|
||||
let state = make_test_state();
|
||||
let app = poem::Route::new()
|
||||
.at(
|
||||
"/gateway/tokens",
|
||||
poem::post(gateway_generate_token_handler),
|
||||
)
|
||||
.data(state.clone());
|
||||
let cli = poem::test::TestClient::new(app);
|
||||
let resp = cli.post("/gateway/tokens").send().await;
|
||||
assert_eq!(resp.0.status(), poem::http::StatusCode::OK);
|
||||
let body: serde_json::Value = resp.0.into_body().into_json().await.unwrap();
|
||||
let token = body["token"].as_str().unwrap();
|
||||
assert!(!token.is_empty());
|
||||
let tokens = state.pending_tokens.read().await;
|
||||
assert!(tokens.contains_key(token));
|
||||
}
|
||||
|
||||
// ── Notification poller integration tests ────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn gateway_notification_poller_continues_when_one_project_unreachable() {
|
||||
use crate::chat::{ChatTransport, MessageId};
|
||||
use crate::service::events::StoredEvent;
|
||||
use async_trait::async_trait;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
type CallLog = Arc<std::sync::Mutex<Vec<String>>>;
|
||||
|
||||
struct MockTransport {
|
||||
calls: CallLog,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChatTransport for MockTransport {
|
||||
async fn send_message(
|
||||
&self,
|
||||
_room_id: &str,
|
||||
plain: &str,
|
||||
_html: &str,
|
||||
) -> Result<MessageId, String> {
|
||||
self.calls.lock().unwrap().push(plain.to_string());
|
||||
Ok("id".to_string())
|
||||
}
|
||||
|
||||
async fn edit_message(
|
||||
&self,
|
||||
_room_id: &str,
|
||||
_id: &str,
|
||||
_plain: &str,
|
||||
_html: &str,
|
||||
) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||
let transport = Arc::new(MockTransport {
|
||||
calls: Arc::clone(&calls),
|
||||
});
|
||||
|
||||
let event = vec![StoredEvent::StoryBlocked {
|
||||
story_id: "10_story_ok".to_string(),
|
||||
reason: "retry limit".to_string(),
|
||||
timestamp_ms: 500,
|
||||
}];
|
||||
let event_body = serde_json::to_vec(&event).unwrap();
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let good_port = listener.local_addr().unwrap().port();
|
||||
let good_url = format!("http://127.0.0.1:{good_port}");
|
||||
tokio::spawn(async move {
|
||||
for _ in 0..4 {
|
||||
if let Ok((mut stream, _)) = listener.accept().await {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let _ = stream.read(&mut buf).await;
|
||||
let header = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
|
||||
event_body.len()
|
||||
);
|
||||
let _ = stream.write_all(header.as_bytes()).await;
|
||||
let _ = stream.write_all(&event_body).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
|
||||
let bad_url = "http://127.0.0.1:1".to_string();
|
||||
|
||||
let mut project_urls = BTreeMap::new();
|
||||
project_urls.insert("good-project".to_string(), good_url);
|
||||
project_urls.insert("unreachable-project".to_string(), bad_url);
|
||||
|
||||
gateway::spawn_gateway_notification_poller(
|
||||
transport as Arc<dyn crate::chat::ChatTransport>,
|
||||
vec!["!room:example.org".to_string()],
|
||||
project_urls,
|
||||
1,
|
||||
);
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
|
||||
|
||||
let messages = calls.lock().unwrap();
|
||||
assert!(
|
||||
!messages.is_empty(),
|
||||
"Expected notifications from the reachable project; got none"
|
||||
);
|
||||
let has_good = messages
|
||||
.iter()
|
||||
.any(|m| m.contains("[good-project]") && m.contains("10_story_ok"));
|
||||
assert!(
|
||||
has_good,
|
||||
"Expected a notification from [good-project]; got: {messages:?}"
|
||||
);
|
||||
let has_bad = messages.iter().any(|m| m.contains("[unreachable-project]"));
|
||||
assert!(
|
||||
!has_bad,
|
||||
"Unreachable project must not produce notifications; got: {messages:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn gateway_notification_poller_sends_only_to_configured_gateway_rooms() {
|
||||
use crate::chat::{ChatTransport, MessageId};
|
||||
use crate::service::events::StoredEvent;
|
||||
use async_trait::async_trait;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
type RoomLog = Arc<std::sync::Mutex<Vec<String>>>;
|
||||
|
||||
struct RoomCapture {
|
||||
rooms: RoomLog,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChatTransport for RoomCapture {
|
||||
async fn send_message(
|
||||
&self,
|
||||
room_id: &str,
|
||||
_plain: &str,
|
||||
_html: &str,
|
||||
) -> Result<MessageId, String> {
|
||||
self.rooms.lock().unwrap().push(room_id.to_string());
|
||||
Ok("id".to_string())
|
||||
}
|
||||
|
||||
async fn edit_message(
|
||||
&self,
|
||||
_room_id: &str,
|
||||
_id: &str,
|
||||
_plain: &str,
|
||||
_html: &str,
|
||||
) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let rooms: RoomLog = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||
let transport = Arc::new(RoomCapture {
|
||||
rooms: Arc::clone(&rooms),
|
||||
});
|
||||
|
||||
let event = vec![StoredEvent::MergeFailure {
|
||||
story_id: "5_story_x".to_string(),
|
||||
reason: "conflict".to_string(),
|
||||
timestamp_ms: 300,
|
||||
}];
|
||||
let event_body = serde_json::to_vec(&event).unwrap();
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let port = listener.local_addr().unwrap().port();
|
||||
let url = format!("http://127.0.0.1:{port}");
|
||||
tokio::spawn(async move {
|
||||
for _ in 0..4 {
|
||||
if let Ok((mut stream, _)) = listener.accept().await {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let _ = stream.read(&mut buf).await;
|
||||
let header = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
|
||||
event_body.len()
|
||||
);
|
||||
let _ = stream.write_all(header.as_bytes()).await;
|
||||
let _ = stream.write_all(&event_body).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
|
||||
const GATEWAY_ROOM: &str = "!gateway:example.org";
|
||||
#[allow(dead_code)]
|
||||
const PER_PROJECT_ROOM: &str = "!project:example.org";
|
||||
|
||||
let mut project_urls = BTreeMap::new();
|
||||
project_urls.insert("myproj".to_string(), url);
|
||||
|
||||
gateway::spawn_gateway_notification_poller(
|
||||
transport as Arc<dyn crate::chat::ChatTransport>,
|
||||
vec![GATEWAY_ROOM.to_string()],
|
||||
project_urls,
|
||||
1,
|
||||
);
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
|
||||
|
||||
let room_calls = rooms.lock().unwrap();
|
||||
assert!(
|
||||
!room_calls.is_empty(),
|
||||
"Expected at least one notification; got none"
|
||||
);
|
||||
for room in room_calls.iter() {
|
||||
assert_eq!(
|
||||
room, GATEWAY_ROOM,
|
||||
"Notification must only go to the gateway room, not {room}"
|
||||
);
|
||||
}
|
||||
assert!(
|
||||
!room_calls.iter().any(|r| r == PER_PROJECT_ROOM),
|
||||
"Per-project room must not receive gateway aggregated notifications"
|
||||
);
|
||||
}
|
||||
|
||||
// ── init_project integration tests ──────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_project_scaffolds_huskies_dir() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let state = make_test_state();
|
||||
let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await;
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
"init_project should succeed: {:?}",
|
||||
result.err()
|
||||
);
|
||||
assert!(dir.path().join(".huskies").exists());
|
||||
assert!(dir.path().join(".huskies/project.toml").exists());
|
||||
assert!(dir.path().join(".huskies/agents.toml").exists());
|
||||
assert!(dir.path().join("script/test").exists());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_project_creates_wizard_state() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let state = make_test_state();
|
||||
gateway::init_project(&state, dir.path().to_str().unwrap(), None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let wizard_state_path = dir.path().join(".huskies/wizard_state.json");
|
||||
assert!(wizard_state_path.exists());
|
||||
let content = std::fs::read_to_string(&wizard_state_path).unwrap();
|
||||
let v: serde_json::Value = serde_json::from_str(&content).unwrap();
|
||||
assert!(v.get("steps").is_some());
|
||||
assert!(v.get("completed").is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_project_already_initialised_returns_error() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
std::fs::create_dir_all(dir.path().join(".huskies")).unwrap();
|
||||
let state = make_test_state();
|
||||
let result = gateway::init_project(&state, dir.path().to_str().unwrap(), None, None).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_project_missing_path_returns_error() {
|
||||
let state = make_test_state();
|
||||
let result = gateway::init_project(&state, "", None, None).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_project_registers_in_projects_toml_when_name_and_url_given() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let config_dir = tempfile::tempdir().unwrap();
|
||||
let mut projects = BTreeMap::new();
|
||||
projects.insert(
|
||||
"existing".into(),
|
||||
ProjectEntry {
|
||||
url: "http://existing:3001".into(),
|
||||
},
|
||||
);
|
||||
let config = GatewayConfig { projects };
|
||||
let state = Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap());
|
||||
|
||||
let result = gateway::init_project(
|
||||
&state,
|
||||
dir.path().to_str().unwrap(),
|
||||
Some("new-project"),
|
||||
Some("http://new-project:3002"),
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_ok());
|
||||
|
||||
let projects = state.projects.read().await;
|
||||
assert!(projects.contains_key("new-project"));
|
||||
assert_eq!(projects["new-project"].url, "http://new-project:3002");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_project_duplicate_name_returns_error() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let mut projects = BTreeMap::new();
|
||||
projects.insert(
|
||||
"taken".into(),
|
||||
ProjectEntry {
|
||||
url: "http://taken:3001".into(),
|
||||
},
|
||||
);
|
||||
let config = GatewayConfig { projects };
|
||||
let state = Arc::new(GatewayState::new(config, PathBuf::new(), 3000).unwrap());
|
||||
|
||||
let result = gateway::init_project(
|
||||
&state,
|
||||
dir.path().to_str().unwrap(),
|
||||
Some("taken"),
|
||||
Some("http://new:3002"),
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_project_then_wizard_status_integration() {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let mock_port = listener.local_addr().unwrap().port();
|
||||
let mock_url = format!("http://127.0.0.1:{mock_port}");
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Ok((mut stream, _)) = listener.accept().await {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let _ = stream.read(&mut buf).await;
|
||||
let body = serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"result": {
|
||||
"content": [{
|
||||
"type": "text",
|
||||
"text": "{\"steps\":[{\"id\":\"scaffold\",\"title\":\"Scaffold\",\"status\":\"confirmed\"}],\"completed\":false}"
|
||||
}]
|
||||
}
|
||||
});
|
||||
let body_bytes = serde_json::to_vec(&body).unwrap();
|
||||
let header = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
|
||||
body_bytes.len()
|
||||
);
|
||||
let _ = stream.write_all(header.as_bytes()).await;
|
||||
let _ = stream.write_all(&body_bytes).await;
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
|
||||
let mut projects = BTreeMap::new();
|
||||
projects.insert("mock-project".into(), ProjectEntry { url: mock_url });
|
||||
let config = GatewayConfig { projects };
|
||||
let config_dir = tempfile::tempdir().unwrap();
|
||||
let state = Arc::new(GatewayState::new(config, config_dir.path().to_path_buf(), 3000).unwrap());
|
||||
|
||||
let project_dir = tempfile::tempdir().unwrap();
|
||||
let result =
|
||||
gateway::init_project(&state, project_dir.path().to_str().unwrap(), None, None).await;
|
||||
assert!(result.is_ok());
|
||||
assert!(project_dir.path().join(".huskies").exists());
|
||||
|
||||
let wizard_path = project_dir.path().join(".huskies/wizard_state.json");
|
||||
assert!(wizard_path.exists());
|
||||
|
||||
// Proxy call to the mock server.
|
||||
let active_url = state.active_url().await.unwrap();
|
||||
let proxy_body = serde_json::to_vec(&serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 2,
|
||||
"method": "tools/call",
|
||||
"params": { "name": "wizard_status", "arguments": {} }
|
||||
}))
|
||||
.unwrap();
|
||||
let proxy_resp = gateway::io::proxy_mcp_call(&state.client, &active_url, &proxy_body).await;
|
||||
assert!(proxy_resp.is_ok());
|
||||
|
||||
let resp_json: serde_json::Value = serde_json::from_slice(&proxy_resp.unwrap()).unwrap();
|
||||
let result = resp_json.get("result");
|
||||
assert!(result.is_some());
|
||||
let text = result
|
||||
.and_then(|r| r.get("content"))
|
||||
.and_then(|c| c.get(0))
|
||||
.and_then(|c| c.get("text"))
|
||||
.and_then(|t| t.as_str())
|
||||
.unwrap_or("");
|
||||
let wizard: serde_json::Value = serde_json::from_str(text).unwrap();
|
||||
assert!(wizard.get("steps").is_some());
|
||||
}
|
||||
|
||||
// ── Aggregate pipeline status integration tests ─────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn aggregate_pipeline_status_integration_healthy_and_unreachable() {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let mock_port = listener.local_addr().unwrap().port();
|
||||
let healthy_url = format!("http://127.0.0.1:{mock_port}");
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Ok((mut stream, _)) = listener.accept().await {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let _ = stream.read(&mut buf).await;
|
||||
let pipeline_json = serde_json::to_string(&serde_json::json!({
|
||||
"active": [
|
||||
{ "story_id": "1_story_a", "name": "A", "stage": "current" },
|
||||
{ "story_id": "2_story_b", "name": "B", "stage": "qa" },
|
||||
{ "story_id": "3_story_c", "name": "C", "stage": "current", "blocked": true, "retry_count": 5 },
|
||||
],
|
||||
"backlog": [{ "story_id": "4_story_d", "name": "D" }],
|
||||
"backlog_count": 1
|
||||
}))
|
||||
.unwrap();
|
||||
let body = serde_json::to_vec(&serde_json::json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"result": {
|
||||
"content": [{ "type": "text", "text": pipeline_json }]
|
||||
}
|
||||
}))
|
||||
.unwrap();
|
||||
let header = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
|
||||
body.len()
|
||||
);
|
||||
let _ = stream.write_all(header.as_bytes()).await;
|
||||
let _ = stream.write_all(&body).await;
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
|
||||
let unreachable_url = "http://127.0.0.1:1".to_string();
|
||||
|
||||
let mut project_urls = BTreeMap::new();
|
||||
project_urls.insert("healthy-project".to_string(), healthy_url);
|
||||
project_urls.insert("broken-project".to_string(), unreachable_url);
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let statuses = gateway::fetch_all_project_pipeline_statuses(&project_urls, &client).await;
|
||||
|
||||
assert!(statuses.contains_key("healthy-project"));
|
||||
assert!(statuses.contains_key("broken-project"));
|
||||
|
||||
let healthy = &statuses["healthy-project"];
|
||||
assert!(healthy.get("error").is_none());
|
||||
assert_eq!(healthy["counts"]["backlog"], 1);
|
||||
assert_eq!(healthy["counts"]["current"], 2);
|
||||
assert_eq!(healthy["counts"]["qa"], 1);
|
||||
|
||||
let blocked = healthy["blocked"].as_array().unwrap();
|
||||
assert_eq!(blocked.len(), 1);
|
||||
assert_eq!(blocked[0]["story_id"], "3_story_c");
|
||||
|
||||
let broken = &statuses["broken-project"];
|
||||
assert!(broken.get("error").is_some());
|
||||
}
|
||||
|
||||
// ── Multi-project notification poller integration ────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn gateway_notification_poller_delivers_events_from_two_projects_with_project_tags() {
|
||||
use crate::chat::{ChatTransport, MessageId};
|
||||
use crate::service::events::StoredEvent;
|
||||
use async_trait::async_trait;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
type CallLog = Arc<std::sync::Mutex<Vec<(String, String, String)>>>;
|
||||
|
||||
struct MockTransport {
|
||||
calls: CallLog,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChatTransport for MockTransport {
|
||||
async fn send_message(
|
||||
&self,
|
||||
room_id: &str,
|
||||
plain: &str,
|
||||
html: &str,
|
||||
) -> Result<MessageId, String> {
|
||||
self.calls.lock().unwrap().push((
|
||||
room_id.to_string(),
|
||||
plain.to_string(),
|
||||
html.to_string(),
|
||||
));
|
||||
Ok("mock-id".to_string())
|
||||
}
|
||||
|
||||
async fn edit_message(
|
||||
&self,
|
||||
_room_id: &str,
|
||||
_id: &str,
|
||||
_plain: &str,
|
||||
_html: &str,
|
||||
) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||
let transport = Arc::new(MockTransport {
|
||||
calls: Arc::clone(&calls),
|
||||
});
|
||||
|
||||
let alpha_events = vec![StoredEvent::StageTransition {
|
||||
story_id: "1_story_alpha".to_string(),
|
||||
from_stage: "2_current".to_string(),
|
||||
to_stage: "3_qa".to_string(),
|
||||
timestamp_ms: 100,
|
||||
}];
|
||||
let alpha_body = serde_json::to_vec(&alpha_events).unwrap();
|
||||
let alpha_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let alpha_port = alpha_listener.local_addr().unwrap().port();
|
||||
let alpha_url = format!("http://127.0.0.1:{alpha_port}");
|
||||
tokio::spawn(async move {
|
||||
for _ in 0..4 {
|
||||
if let Ok((mut stream, _)) = alpha_listener.accept().await {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let _ = stream.read(&mut buf).await;
|
||||
let header = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
|
||||
alpha_body.len()
|
||||
);
|
||||
let _ = stream.write_all(header.as_bytes()).await;
|
||||
let _ = stream.write_all(&alpha_body).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let beta_events = vec![StoredEvent::MergeFailure {
|
||||
story_id: "2_story_beta".to_string(),
|
||||
reason: "merge conflict in lib.rs".to_string(),
|
||||
timestamp_ms: 200,
|
||||
}];
|
||||
let beta_body = serde_json::to_vec(&beta_events).unwrap();
|
||||
let beta_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let beta_port = beta_listener.local_addr().unwrap().port();
|
||||
let beta_url = format!("http://127.0.0.1:{beta_port}");
|
||||
tokio::spawn(async move {
|
||||
for _ in 0..4 {
|
||||
if let Ok((mut stream, _)) = beta_listener.accept().await {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
let _ = stream.read(&mut buf).await;
|
||||
let header = format!(
|
||||
"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
|
||||
beta_body.len()
|
||||
);
|
||||
let _ = stream.write_all(header.as_bytes()).await;
|
||||
let _ = stream.write_all(&beta_body).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
|
||||
let mut project_urls = BTreeMap::new();
|
||||
project_urls.insert("alpha".to_string(), alpha_url);
|
||||
project_urls.insert("beta".to_string(), beta_url);
|
||||
|
||||
gateway::spawn_gateway_notification_poller(
|
||||
transport as Arc<dyn crate::chat::ChatTransport>,
|
||||
vec!["!room:example.org".to_string()],
|
||||
project_urls,
|
||||
1,
|
||||
);
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
|
||||
|
||||
let calls = calls.lock().unwrap();
|
||||
assert!(
|
||||
!calls.is_empty(),
|
||||
"Expected at least one notification; got none"
|
||||
);
|
||||
|
||||
let plains: Vec<&str> = calls.iter().map(|(_, p, _)| p.as_str()).collect();
|
||||
|
||||
let alpha_notification = plains
|
||||
.iter()
|
||||
.any(|p| p.contains("[alpha]") && p.contains("1"));
|
||||
let beta_notification = plains
|
||||
.iter()
|
||||
.any(|p| p.contains("[beta]") && p.contains("merge conflict"));
|
||||
|
||||
assert!(
|
||||
alpha_notification,
|
||||
"Expected a notification from [alpha] containing story ID '1'; got: {plains:?}"
|
||||
);
|
||||
assert!(
|
||||
beta_notification,
|
||||
"Expected a notification from [beta] containing 'merge conflict'; got: {plains:?}"
|
||||
);
|
||||
|
||||
for (room_id, _, _) in calls.iter() {
|
||||
assert_eq!(
|
||||
room_id, "!room:example.org",
|
||||
"All notifications must go to the gateway room"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Gateway broadcaster forwarder tests ─────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn broadcaster_forwarder_forwards_events_with_project_tag() {
|
||||
use crate::chat::{ChatTransport, MessageId};
|
||||
use crate::service::events::StoredEvent;
|
||||
use async_trait::async_trait;
|
||||
|
||||
type CallLog = Arc<std::sync::Mutex<Vec<(String, String)>>>;
|
||||
|
||||
struct MockTransport {
|
||||
calls: CallLog,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChatTransport for MockTransport {
|
||||
async fn send_message(
|
||||
&self,
|
||||
room_id: &str,
|
||||
plain: &str,
|
||||
_html: &str,
|
||||
) -> Result<MessageId, String> {
|
||||
self.calls
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push((room_id.to_string(), plain.to_string()));
|
||||
Ok("id".to_string())
|
||||
}
|
||||
|
||||
async fn edit_message(
|
||||
&self,
|
||||
_room_id: &str,
|
||||
_id: &str,
|
||||
_plain: &str,
|
||||
_html: &str,
|
||||
) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let calls: CallLog = Arc::new(std::sync::Mutex::new(Vec::new()));
|
||||
let transport = Arc::new(MockTransport {
|
||||
calls: Arc::clone(&calls),
|
||||
});
|
||||
|
||||
let (tx, rx) =
|
||||
tokio::sync::broadcast::channel::<crate::service::gateway::GatewayStatusEvent>(16);
|
||||
gateway::spawn_gateway_broadcaster_forwarder(
|
||||
transport as Arc<dyn crate::chat::ChatTransport>,
|
||||
vec!["!room:example.org".to_string()],
|
||||
rx,
|
||||
);
|
||||
|
||||
// Give the forwarder task a moment to start.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
|
||||
let event = crate::service::gateway::GatewayStatusEvent {
|
||||
project: "my-project".to_string(),
|
||||
event: StoredEvent::StageTransition {
|
||||
story_id: "7_story_x".to_string(),
|
||||
from_stage: "2_current".to_string(),
|
||||
to_stage: "3_qa".to_string(),
|
||||
timestamp_ms: 100,
|
||||
},
|
||||
};
|
||||
tx.send(event).unwrap();
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
let messages = calls.lock().unwrap();
|
||||
assert_eq!(messages.len(), 1, "Expected exactly one notification");
|
||||
let (room, plain) = &messages[0];
|
||||
assert_eq!(room, "!room:example.org");
|
||||
assert!(
|
||||
plain.starts_with("[my-project]"),
|
||||
"Expected [my-project] prefix; got: {plain}"
|
||||
);
|
||||
assert!(
|
||||
plain.contains("7_story_x"),
|
||||
"Expected story ID; got: {plain}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn broadcaster_forwarder_resubscribes_on_lag() {
|
||||
use crate::chat::{ChatTransport, MessageId};
|
||||
use crate::service::events::StoredEvent;
|
||||
use async_trait::async_trait;
|
||||
|
||||
type Counter = Arc<std::sync::Mutex<usize>>;
|
||||
|
||||
struct CountTransport {
|
||||
count: Counter,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ChatTransport for CountTransport {
|
||||
async fn send_message(
|
||||
&self,
|
||||
_room_id: &str,
|
||||
_plain: &str,
|
||||
_html: &str,
|
||||
) -> Result<MessageId, String> {
|
||||
*self.count.lock().unwrap() += 1;
|
||||
Ok("id".to_string())
|
||||
}
|
||||
|
||||
async fn edit_message(
|
||||
&self,
|
||||
_room_id: &str,
|
||||
_id: &str,
|
||||
_plain: &str,
|
||||
_html: &str,
|
||||
) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_typing(&self, _room_id: &str, _typing: bool) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let count: Counter = Arc::new(std::sync::Mutex::new(0));
|
||||
let transport = Arc::new(CountTransport {
|
||||
count: Arc::clone(&count),
|
||||
});
|
||||
|
||||
// Use a tiny channel (capacity 1) so the second send causes a Lagged error.
|
||||
let (tx, rx) =
|
||||
tokio::sync::broadcast::channel::<crate::service::gateway::GatewayStatusEvent>(1);
|
||||
|
||||
// Flood the channel to trigger Lagged before the forwarder task starts.
|
||||
let make_event = |n: u64| crate::service::gateway::GatewayStatusEvent {
|
||||
project: "p".to_string(),
|
||||
event: StoredEvent::StageTransition {
|
||||
story_id: format!("{n}_story"),
|
||||
from_stage: "2_current".to_string(),
|
||||
to_stage: "3_qa".to_string(),
|
||||
timestamp_ms: n,
|
||||
},
|
||||
};
|
||||
// Send 3 events to overflow the capacity-1 channel before the task runs.
|
||||
let _ = tx.send(make_event(1));
|
||||
let _ = tx.send(make_event(2));
|
||||
let _ = tx.send(make_event(3));
|
||||
|
||||
gateway::spawn_gateway_broadcaster_forwarder(
|
||||
transport as Arc<dyn crate::chat::ChatTransport>,
|
||||
vec!["!r:x.org".to_string()],
|
||||
rx,
|
||||
);
|
||||
|
||||
// Send one more event after the forwarder subscribes; it should arrive.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
||||
tx.send(make_event(4)).unwrap();
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
// After Lagged + resubscribe, the forwarder must still process event 4.
|
||||
let received = *count.lock().unwrap();
|
||||
assert!(
|
||||
received >= 1,
|
||||
"Expected at least one event after Lagged resubscribe; got {received}"
|
||||
);
|
||||
}
|
||||
|
||||
// ── BotConfig tests ─────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn bot_config_loads_from_gateway_config_dir() {
|
||||
use crate::chat::transport::matrix::BotConfig;
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let huskies_dir = tmp.path().join(".huskies");
|
||||
std::fs::create_dir_all(&huskies_dir).unwrap();
|
||||
std::fs::write(
|
||||
huskies_dir.join("bot.toml"),
|
||||
r#"
|
||||
homeserver = "https://matrix.example.com"
|
||||
username = "@bot:example.com"
|
||||
password = "secret"
|
||||
room_ids = ["!abc:example.com"]
|
||||
enabled = true
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let config = BotConfig::load(tmp.path());
|
||||
assert!(config.is_some());
|
||||
let config = config.unwrap();
|
||||
assert_eq!(
|
||||
config.homeserver.as_deref(),
|
||||
Some("https://matrix.example.com")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bot_config_absent_returns_none_in_gateway_mode() {
|
||||
use crate::chat::transport::matrix::BotConfig;
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let config = BotConfig::load(tmp.path());
|
||||
assert!(config.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bot_config_disabled_returns_none_in_gateway_mode() {
|
||||
use crate::chat::transport::matrix::BotConfig;
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let huskies_dir = tmp.path().join(".huskies");
|
||||
std::fs::create_dir_all(&huskies_dir).unwrap();
|
||||
std::fs::write(
|
||||
huskies_dir.join("bot.toml"),
|
||||
r#"
|
||||
homeserver = "https://matrix.example.com"
|
||||
username = "@bot:example.com"
|
||||
password = "secret"
|
||||
room_ids = ["!abc:example.com"]
|
||||
enabled = false
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let config = BotConfig::load(tmp.path());
|
||||
assert!(config.is_none());
|
||||
}
|
||||
Reference in New Issue
Block a user