Files
huskies/server/src/service/gateway/io.rs
T

421 lines
15 KiB
Rust

//! Gateway I/O — the ONLY place in `service/gateway/` that may perform side effects.
//!
//! Side effects here include: reading/writing config and agent state files,
//! HTTP requests to project containers (proxying, health checks, polling),
//! spawning the Matrix bot task, and the notification poller background task.
use super::config::{GatewayConfig, ProjectEntry};
use super::registration::JoinedAgent;
pub use reqwest::Client;
use serde_json::{Value, json};
use std::collections::{BTreeMap, HashMap};
use std::path::Path;
// ── Config I/O ───────────────────────────────────────────────────────────────
/// Load gateway config from a `projects.toml` file.
pub fn load_config(path: &Path) -> Result<GatewayConfig, String> {
let contents = std::fs::read_to_string(path)
.map_err(|e| format!("cannot read {}: {e}", path.display()))?;
toml::from_str(&contents).map_err(|e| format!("invalid projects.toml: {e}"))
}
/// Load persisted agents from `<config_dir>/gateway_agents.json`.
/// Returns an empty list if the file does not exist or cannot be parsed.
pub fn load_agents(config_dir: &Path) -> Vec<JoinedAgent> {
let path = config_dir.join("gateway_agents.json");
match std::fs::read(&path) {
Ok(data) => serde_json::from_slice(&data).unwrap_or_default(),
Err(_) => Vec::new(),
}
}
/// Persist the current projects map to `<config_dir>/projects.toml`.
/// Silently ignores write errors or skips when `config_dir` is empty.
pub async fn save_config(projects: &BTreeMap<String, ProjectEntry>, config_dir: &Path) {
if config_dir.as_os_str().is_empty() {
return;
}
let path = config_dir.join("projects.toml");
let config = GatewayConfig {
projects: projects.clone(),
};
if let Ok(data) = toml::to_string_pretty(&config) {
let _ = tokio::fs::write(&path, data).await;
}
}
/// Persist the current agent list to `<config_dir>/gateway_agents.json`.
/// Silently ignores write errors.
pub async fn save_agents(agents: &[JoinedAgent], config_dir: &Path) {
if config_dir == Path::new("") {
return;
}
let path = config_dir.join("gateway_agents.json");
if let Ok(data) = serde_json::to_vec_pretty(agents) {
let _ = tokio::fs::write(&path, data).await;
}
}
// ── Bot config I/O ──────────────────────────────────────────────────────────
/// Read the current raw bot.toml as key/value pairs for the configuration UI.
/// Returns `None` values if the file does not exist.
pub fn read_bot_config_raw(config_dir: &Path) -> BotConfigFields {
let path = config_dir.join(".huskies").join("bot.toml");
let content = match std::fs::read_to_string(&path) {
Ok(c) => c,
Err(_) => return BotConfigFields::default(),
};
let table: toml::Value = match toml::from_str(&content) {
Ok(v) => v,
Err(_) => return BotConfigFields::default(),
};
let s = |key: &str| -> Option<String> {
table
.get(key)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
};
BotConfigFields {
transport: s("transport").unwrap_or_else(|| "matrix".to_string()),
homeserver: s("homeserver"),
username: s("username"),
password: s("password"),
slack_bot_token: s("slack_bot_token"),
slack_signing_secret: s("slack_signing_secret"),
}
}
/// Raw bot.toml fields for the configuration UI.
#[derive(Default)]
pub struct BotConfigFields {
pub transport: String,
pub homeserver: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub slack_bot_token: Option<String>,
pub slack_signing_secret: Option<String>,
}
/// Write a `bot.toml` from the given content string.
pub fn write_bot_config(config_dir: &Path, content: &str) -> Result<(), String> {
let huskies_dir = config_dir.join(".huskies");
std::fs::create_dir_all(&huskies_dir)
.map_err(|e| format!("cannot create .huskies dir: {e}"))?;
let path = huskies_dir.join("bot.toml");
std::fs::write(&path, content).map_err(|e| format!("cannot write bot.toml: {e}"))
}
// ── MCP proxy I/O ───────────────────────────────────────────────────────────
/// Proxy a raw MCP request body to the given project URL.
pub async fn proxy_mcp_call(
client: &Client,
base_url: &str,
request_bytes: &[u8],
) -> Result<Vec<u8>, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
let resp = client
.post(&mcp_url)
.header("Content-Type", "application/json")
.body(request_bytes.to_vec())
.send()
.await
.map_err(|e| format!("failed to reach {mcp_url}: {e}"))?;
resp.bytes()
.await
.map(|b| b.to_vec())
.map_err(|e| format!("failed to read response from {mcp_url}: {e}"))
}
/// Fetch tools/list from a project's MCP endpoint.
pub async fn fetch_tools_list(client: &Client, base_url: &str) -> Result<Value, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/list",
"params": {}
});
let resp = client
.post(&mcp_url)
.json(&rpc_body)
.send()
.await
.map_err(|e| format!("failed to reach {mcp_url}: {e}"))?;
resp.json()
.await
.map_err(|e| format!("invalid JSON from upstream: {e}"))
}
/// Fetch and aggregate pipeline status for a single project URL.
pub async fn fetch_one_project_pipeline_status(url: &str, client: &Client) -> Value {
let mcp_url = format!("{}/mcp", url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_pipeline_status",
"arguments": {}
}
});
match client.post(&mcp_url).json(&rpc_body).send().await {
Ok(resp) => match resp.json::<Value>().await {
Ok(upstream) => {
if let Some(text) = upstream
.get("result")
.and_then(|r| r.get("content"))
.and_then(|c| c.get(0))
.and_then(|c| c.get("text"))
.and_then(|t| t.as_str())
{
match serde_json::from_str::<Value>(text) {
Ok(pipeline) => {
crate::service::pipeline::aggregate_pipeline_counts(&pipeline)
}
Err(_) => json!({ "error": "invalid pipeline JSON" }),
}
} else {
json!({ "error": "unexpected response shape" })
}
}
Err(e) => json!({ "error": format!("invalid response: {e}") }),
},
Err(e) => json!({ "error": format!("unreachable: {e}") }),
}
}
/// Fetch `get_pipeline_status` from every registered project URL in parallel.
pub async fn fetch_all_project_pipeline_statuses(
project_urls: &BTreeMap<String, String>,
client: &Client,
) -> BTreeMap<String, Value> {
use futures::future::join_all;
let futures: Vec<_> = project_urls
.iter()
.map(|(name, url)| {
let name = name.clone();
let url = url.clone();
let client = client.clone();
async move {
let result = fetch_one_project_pipeline_status(&url, &client).await;
(name, result)
}
})
.collect();
join_all(futures).await.into_iter().collect()
}
/// Fetch the pipeline status from a single project for the `gateway_status` tool.
pub async fn fetch_pipeline_status_for_project(
client: &Client,
base_url: &str,
) -> Result<Value, String> {
let mcp_url = format!("{}/mcp", base_url.trim_end_matches('/'));
let rpc_body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "get_pipeline_status",
"arguments": {}
}
});
let resp = client
.post(&mcp_url)
.json(&rpc_body)
.send()
.await
.map_err(|e| format!("failed to reach {mcp_url}: {e}"))?;
resp.json()
.await
.map_err(|e| format!("invalid upstream response: {e}"))
}
/// Check health of a single project URL.
pub async fn check_project_health(client: &Client, base_url: &str) -> Result<bool, String> {
let health_url = format!("{}/health", base_url.trim_end_matches('/'));
match client.get(&health_url).send().await {
Ok(resp) => Ok(resp.status().is_success()),
Err(e) => Err(format!("unreachable: {e}")),
}
}
// ── Gateway MCP JSON ────────────────────────────────────────────────────────
/// Write (or overwrite) a `.mcp.json` in `config_dir` that points Claude Code
/// CLI at the gateway's own `/mcp` endpoint.
pub fn write_gateway_mcp_json(config_dir: &Path, port: u16) -> Result<(), std::io::Error> {
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let url = format!("http://{host}:{port}/mcp");
let content = json!({
"mcpServers": {
"huskies": {
"type": "http",
"url": url
}
}
});
let path = config_dir.join(".mcp.json");
std::fs::write(&path, serde_json::to_string_pretty(&content).unwrap())?;
crate::slog!("[gateway] Wrote {} pointing to {}", path.display(), url);
Ok(())
}
// ── Init project I/O ────────────────────────────────────────────────────────
/// Check if a path already has a `.huskies/` directory.
pub fn has_huskies_dir(path: &Path) -> bool {
path.join(".huskies").exists()
}
/// Create a directory (and parents) if it does not exist.
pub fn ensure_directory(path: &Path) -> Result<(), String> {
if !path.exists() {
std::fs::create_dir_all(path)
.map_err(|e| format!("failed to create directory '{}': {e}", path.display()))?;
}
Ok(())
}
/// Scaffold a huskies project at the given path.
pub fn scaffold_project(path: &Path) -> Result<(), String> {
crate::io::fs::scaffold::scaffold_story_kit(path, 3001)
}
/// Initialise wizard state at the given path.
pub fn init_wizard_state(path: &Path) {
crate::io::wizard::WizardState::init_if_missing(path);
}
// ── Notification poller ─────────────────────────────────────────────────────
/// Spawn a background task that polls events from all project servers.
pub fn spawn_gateway_notification_poller(
transport: std::sync::Arc<dyn crate::chat::ChatTransport>,
room_ids: Vec<String>,
project_urls: BTreeMap<String, String>,
poll_interval_secs: u64,
) {
tokio::spawn(async move {
let client = Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.unwrap_or_else(|_| Client::new());
let interval = std::time::Duration::from_secs(poll_interval_secs.max(1));
let mut last_ts: HashMap<String, u64> = project_urls
.keys()
.map(|name| (name.clone(), 0u64))
.collect();
loop {
for (project_name, base_url) in &project_urls {
let since = last_ts.get(project_name).copied().unwrap_or(0);
let url = format!("{base_url}/api/events?since={since}");
let response = match client.get(&url).send().await {
Ok(r) => r,
Err(e) => {
crate::slog!(
"[gateway-poller] {project_name}: unreachable ({e}); skipping"
);
continue;
}
};
let events: Vec<crate::service::events::StoredEvent> = match response.json().await {
Ok(v) => v,
Err(e) => {
crate::slog!(
"[gateway-poller] {project_name}: failed to parse events: {e}"
);
continue;
}
};
for event in &events {
let ts = event.timestamp_ms();
if ts > *last_ts.get(project_name).unwrap_or(&0) {
last_ts.insert(project_name.clone(), ts);
}
let (plain, html) = super::polling::format_gateway_event(project_name, event);
for room_id in &room_ids {
if let Err(e) = transport.send_message(room_id, &plain, &html).await {
crate::slog!(
"[gateway-poller] Failed to send notification to {room_id}: {e}"
);
}
}
}
}
tokio::time::sleep(interval).await;
}
});
}
// ── Gateway bot spawn ───────────────────────────────────────────────────────
/// Re-export type alias for the active project lock.
pub type ActiveProject = std::sync::Arc<tokio::sync::RwLock<String>>;
/// Attempt to spawn the Matrix bot against the gateway config directory.
pub fn spawn_gateway_bot(
config_dir: &Path,
active_project: ActiveProject,
gateway_projects: Vec<String>,
gateway_project_urls: BTreeMap<String, String>,
port: u16,
) -> Option<tokio::task::AbortHandle> {
use crate::agents::AgentPool;
use crate::services::Services;
use tokio::sync::{broadcast, mpsc};
let (watcher_tx, _) = broadcast::channel(16);
let (_perm_tx, perm_rx) = mpsc::unbounded_channel();
let perm_rx = std::sync::Arc::new(tokio::sync::Mutex::new(perm_rx));
let (shutdown_tx, shutdown_rx) =
tokio::sync::watch::channel::<Option<crate::rebuild::ShutdownReason>>(None);
std::mem::forget(shutdown_tx);
let agents = std::sync::Arc::new(AgentPool::new(port, watcher_tx.clone()));
let services = std::sync::Arc::new(Services {
project_root: config_dir.to_path_buf(),
agents,
bot_name: "Assistant".to_string(),
bot_user_id: String::new(),
ambient_rooms: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
perm_rx,
pending_perm_replies: std::sync::Arc::new(tokio::sync::Mutex::new(
std::collections::HashMap::new(),
)),
permission_timeout_secs: 120,
});
crate::chat::transport::matrix::spawn_bot(
config_dir,
watcher_tx,
services,
shutdown_rx,
Some(active_project),
gateway_projects,
gateway_project_urls,
)
}