Files
huskies/server/src/main.rs
T
2026-04-29 10:47:18 +00:00

385 lines
14 KiB
Rust

//! Huskies server — entry point, CLI argument parsing, and server startup.
// matrix-sdk-crypto's deeply nested types require a higher recursion limit
// when the `e2e-encryption` feature is enabled.
#![recursion_limit = "256"]
mod agent_log;
mod agent_mode;
mod agents;
mod chat;
mod config;
/// CRDT snapshot — serialisation and restore of the full pipeline CRDT state.
pub mod crdt_snapshot;
/// CRDT state — in-memory pipeline state machine backed by a distributed CRDT.
pub mod crdt_state;
/// CRDT sync — WebSocket-based peer synchronisation for distributed nodes.
pub mod crdt_sync;
/// CRDT wire format — on-wire message types for the crdt-sync protocol.
pub mod crdt_wire;
mod db;
/// Gateway mode — multi-project reverse proxy that fronts multiple project containers.
pub mod gateway;
mod gateway_relay;
mod http;
mod io;
mod llm;
/// Log buffer — in-memory ring buffer for recent server-side log lines.
pub mod log_buffer;
/// Mesh — peer discovery and multi-hop CRDT replication over WebSocket.
pub mod mesh;
/// Node identity — Ed25519 keypair generation and stable node ID management.
pub mod node_identity;
pub(crate) mod pipeline_state;
/// Rebuild — process restart and shutdown coordination.
pub mod rebuild;
mod service;
/// Services — shared service bundle injected into HTTP handlers and bot tasks.
pub mod services;
mod startup;
mod state;
mod store;
mod workflow;
mod worktree;
use crate::agents::AgentPool;
use crate::http::build_routes;
use crate::http::context::AppContext;
use crate::http::{remove_port_file, resolve_port, write_port_file};
use crate::rebuild::ShutdownReason;
use crate::state::SessionState;
use crate::store::JsonFileStore;
use crate::workflow::WorkflowState;
use poem::Server;
use poem::listener::TcpListener;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::broadcast;
mod cli;
use cli::{parse_cli_args, resolve_path_arg};
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
// Reap zombie grandchildren on Unix (for native deployments without tini/init).
// Docker containers with `init: true` in docker-compose.yml already have tini
// as PID 1 for this. For native macOS/Linux, poll waitpid(-1, WNOHANG) in a
// background thread so orphaned grandchildren don't accumulate as zombies.
#[cfg(unix)]
std::thread::spawn(|| {
loop {
// SAFETY: waitpid(-1, ...) with WNOHANG is always safe to call.
unsafe { while libc::waitpid(-1, std::ptr::null_mut(), libc::WNOHANG) > 0 {} }
std::thread::sleep(std::time::Duration::from_secs(5));
}
});
// Log version and build hash so we can verify what's running.
let build_hash =
std::fs::read_to_string(".huskies/build_hash").unwrap_or_else(|_| "unknown".to_string());
slog!(
"[startup] huskies v{} (build {})",
env!("CARGO_PKG_VERSION"),
build_hash.trim()
);
let app_state = Arc::new(SessionState::default());
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
// Migrate legacy root-level store.json into .huskies/ if the new path does
// not yet exist.
let legacy_store_path = cwd.join("store.json");
let store_path = cwd.join(".huskies").join("store.json");
if legacy_store_path.exists() && !store_path.exists() {
if let Some(parent) = store_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::rename(&legacy_store_path, &store_path);
}
let store = Arc::new(JsonFileStore::from_path(store_path).map_err(std::io::Error::other)?);
// Collect CLI args, skipping the binary name (argv[0]).
let raw_args: Vec<String> = std::env::args().skip(1).collect();
let cli = match parse_cli_args(&raw_args) {
Ok(args) => args,
Err(msg) => {
eprintln!("error: {msg}");
eprintln!("Run 'huskies --help' for usage.");
std::process::exit(1);
}
};
let is_init = cli.init;
let is_agent = cli.agent;
let is_gateway = cli.gateway;
let agent_rendezvous = cli.rendezvous.clone();
let explicit_path = resolve_path_arg(cli.path.as_deref(), &cwd);
// Port resolution: CLI flag > project.toml (loaded later) > default.
let port = cli.port.unwrap_or_else(resolve_port);
// When a path is given explicitly on the CLI, it must already exist as a directory.
if let Some(ref path) = explicit_path {
if !path.exists() {
eprintln!("error: path does not exist: {}", path.display());
std::process::exit(1);
}
if !path.is_dir() {
eprintln!("error: path is not a directory: {}", path.display());
std::process::exit(1);
}
}
// ── Gateway mode: multi-project proxy ────────────────────────────────────
if is_gateway {
let config_dir = explicit_path.unwrap_or_else(|| cwd.clone());
let config_path = config_dir.join("projects.toml");
return gateway::run(&config_path, port).await;
}
startup::project::open_project_root(is_init, explicit_path, &cwd, &app_state, &store, port)
.await;
startup::project::init_subsystems(&app_state, &cwd).await;
let crdt_join_token = cli
.join_token
.clone()
.or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok());
startup::project::configure_crdt_sync(
&app_state,
is_agent,
agent_rendezvous.clone(),
crdt_join_token,
);
// ── Agent mode: headless build agent ─────────────────────────────────────
if is_agent {
let agent_root = app_state.project_root.lock().unwrap().clone();
let rendezvous = agent_rendezvous.expect("agent mode requires --rendezvous");
let join_token = cli
.join_token
.clone()
.or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok());
let agent_gateway_url = cli
.gateway_url
.clone()
.or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok());
return agent_mode::run(agent_root, rendezvous, port, join_token, agent_gateway_url).await;
}
let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default()));
// Event bus: broadcast channel for pipeline lifecycle events.
let (watcher_tx, _) = broadcast::channel::<io::watcher::WatcherEvent>(1024);
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));
// Filesystem watcher: watches config files for hot-reload.
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
io::watcher::start_watcher(root.clone(), watcher_tx.clone());
}
// Spawn CRDT→watcher bridge and auto-assign subscriber.
startup::tick_loop::spawn_event_bridges(
watcher_tx.clone(),
app_state.project_root.lock().unwrap().clone(),
Arc::clone(&agents),
);
// Reconciliation progress channel and permission channel.
let (reconciliation_tx, _) = broadcast::channel::<agents::ReconciliationEvent>(64);
let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
let watcher_tx_for_bot = watcher_tx.clone();
let watcher_rx_for_whatsapp = watcher_tx.subscribe();
let watcher_rx_for_slack = watcher_tx.subscribe();
let watcher_rx_for_discord = watcher_tx.subscribe();
let watcher_rx_for_events = watcher_tx.subscribe();
let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx));
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
let startup_agents = Arc::clone(&agents);
let startup_reconciliation_tx = reconciliation_tx.clone();
let agents_for_shutdown = Arc::clone(&agents);
// ── Construct the shared Services bundle ──────────────────────────────────
let bot_cfg = startup_root
.as_ref()
.and_then(|root| chat::transport::matrix::BotConfig::load(root));
let services = Arc::new(services::Services {
project_root: startup_root.clone().unwrap_or_default(),
agents: Arc::clone(&agents),
bot_name: bot_cfg
.as_ref()
.and_then(|c| c.display_name.clone())
.unwrap_or_else(|| "Assistant".to_string()),
bot_user_id: String::new(),
ambient_rooms: Arc::new(std::sync::Mutex::new(
bot_cfg
.as_ref()
.map(|c| c.ambient_rooms.iter().cloned().collect())
.unwrap_or_default(),
)),
perm_rx: Arc::clone(&perm_rx),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
permission_timeout_secs: bot_cfg
.as_ref()
.map(|c| c.permission_timeout_secs)
.unwrap_or(120),
status: agents.status_broadcaster(),
});
// ── Build bot contexts (WhatsApp / Slack / Discord) ───────────────────────
let (bot_ctxs, matrix_shutdown_rx) =
startup::bots::build_bot_contexts(&startup_root, &services);
startup::bots::spawn_startup_announcements(&bot_ctxs);
let matrix_shutdown_tx_for_rebuild = Arc::clone(&bot_ctxs.matrix_shutdown_tx);
// Shared rate-limit retry timer store.
let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load(
startup_root
.as_ref()
.map(|r| r.join(".huskies").join("timers.json"))
.unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-timers.json")),
));
let timer_store_for_tick = Arc::clone(&timer_store);
let timer_store_for_bot = Arc::clone(&timer_store);
let ctx = AppContext {
state: app_state,
store,
workflow,
services: Arc::clone(&services),
watcher_tx,
reconciliation_tx,
perm_tx,
qa_app_process: Arc::new(std::sync::Mutex::new(None)),
bot_shutdown: bot_ctxs.shutdown_notifier.clone(),
matrix_shutdown_tx: Some(Arc::clone(&bot_ctxs.matrix_shutdown_tx)),
timer_store,
};
// Per-project event buffer for the gateway's `/api/events` poller.
let event_buffer = crate::http::events::EventBuffer::new();
crate::http::events::subscribe_to_watcher(event_buffer.clone(), watcher_rx_for_events);
// Gateway relay task (pushes StatusEvents to a configured gateway).
startup::tick_loop::spawn_gateway_relay(&startup_root, Arc::clone(&services.status));
let app = build_routes(
ctx,
bot_ctxs.whatsapp_ctx.clone(),
bot_ctxs.slack_ctx.clone(),
port,
Some(event_buffer),
);
// Unified 1-second background tick loop.
startup::tick_loop::spawn_tick_loop(
Arc::clone(&startup_agents),
timer_store_for_tick,
startup_root.clone(),
);
// Optional Matrix bot.
if let Some(ref root) = startup_root {
let _ = chat::transport::matrix::spawn_bot(
root,
watcher_tx_for_bot,
Arc::clone(&services),
matrix_shutdown_rx,
None,
vec![],
std::collections::BTreeMap::new(),
timer_store_for_bot,
None,
);
} else {
drop(matrix_shutdown_rx);
}
// Notification listeners for WhatsApp, Slack, Discord.
startup::bots::spawn_notification_listeners(
&bot_ctxs,
&startup_root,
watcher_rx_for_whatsapp,
watcher_rx_for_slack,
watcher_rx_for_discord,
);
// Reconcile completed worktrees and auto-assign free agents.
startup::tick_loop::spawn_startup_reconciliation(
startup_root.clone(),
startup_agents,
startup_reconciliation_tx,
);
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let addr = format!("{host}:{port}");
println!("\x1b[97;1m");
println!(" /\\_/\\ \x1b[96;1m _ _ _ _ \x1b[97;1m");
println!(" / o o \\ \x1b[96;1m| | | |_ _ ___| | _(_) ___ ___\x1b[97;1m");
println!(" ( Y ) \x1b[96;1m| |_| | | | / __| |/ / |/ _ \\/ __|\x1b[97;1m");
println!(" \\ ^ / \x1b[96;1m| _ | |_| \\__ \\ <| | __/\\__ \\\x1b[97;1m");
println!(" )===( \\ \x1b[96;1m|_| |_|\\__,_|___/_|\\_\\_|\\___||___/\x1b[97;1m");
println!(" / \\ \\ \x1b[90mStory-driven development, powered by AI\x1b[97;1m");
println!(" | | | |");
println!(" /| | |\\|");
println!(" \\|__|__|/\x1b[0m");
println!();
println!("HUSKIES_PORT={port}");
println!("\x1b[96;1mFrontend:\x1b[0m \x1b[94mhttp://{addr}\x1b[0m");
println!("\x1b[92;1mOpenAPI Docs:\x1b[0m \x1b[94mhttp://{addr}/docs\x1b[0m");
let port_file = write_port_file(&cwd, port);
let result = Server::new(TcpListener::bind(&addr)).run(app).await;
// ── Shutdown notifications (best-effort) ──────────────────────────────────
startup::bots::notify_shutdown(&bot_ctxs).await;
// Matrix: signal the bot task and give it a short window to send its message.
let _ = matrix_shutdown_tx_for_rebuild.send(Some(ShutdownReason::Manual));
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
// Kill all active PTY child processes before exiting.
agents_for_shutdown.kill_all_children();
if let Some(ref path) = port_file {
remove_port_file(path);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(expected = "Invalid project.toml: Duplicate agent name")]
fn panics_on_duplicate_agent_names() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".huskies");
std::fs::create_dir_all(&sk).unwrap();
std::fs::write(
sk.join("project.toml"),
r#"
[[agent]]
name = "coder"
[[agent]]
name = "coder"
"#,
)
.unwrap();
config::ProjectConfig::load(tmp.path())
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
}
}