Files
huskies/server/src/main.rs
T

895 lines
38 KiB
Rust
Raw Normal View History

//! Huskies server — entry point, CLI argument parsing, and server startup.
// matrix-sdk-crypto's deeply nested types require a higher recursion limit
// when the `e2e-encryption` feature is enabled.
#![recursion_limit = "256"]
mod agent_log;
mod agent_mode;
mod agents;
2026-03-27 12:31:08 +00:00
mod chat;
mod config;
pub mod crdt_snapshot;
pub mod crdt_state;
pub mod crdt_sync;
pub mod crdt_wire;
mod db;
pub mod gateway;
mod http;
mod io;
mod llm;
pub mod log_buffer;
pub mod mesh;
pub mod node_identity;
pub(crate) mod pipeline_state;
pub mod rebuild;
mod service;
pub mod services;
mod state;
mod store;
mod workflow;
mod worktree;
use crate::agents::AgentPool;
2026-03-27 12:31:08 +00:00
use crate::chat::transport::whatsapp::WhatsAppConversationHistory;
use crate::http::build_routes;
use crate::http::context::AppContext;
use crate::http::{remove_port_file, resolve_port, write_port_file};
use crate::io::fs::find_story_kit_root;
use crate::rebuild::{BotShutdownNotifier, ShutdownReason};
use crate::state::SessionState;
use crate::store::JsonFileStore;
use crate::workflow::WorkflowState;
use poem::Server;
use poem::listener::TcpListener;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::broadcast;
mod cli;
use cli::{parse_cli_args, resolve_path_arg};
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
// Reap zombie grandchildren on Unix (for native deployments without tini/init).
// Docker containers with `init: true` in docker-compose.yml already have tini
// as PID 1 for this. For native macOS/Linux, poll waitpid(-1, WNOHANG) in a
// background thread so orphaned grandchildren don't accumulate as zombies.
#[cfg(unix)]
2026-04-09 17:58:29 +01:00
std::thread::spawn(|| {
loop {
// SAFETY: waitpid(-1, ...) with WNOHANG is always safe to call.
unsafe { while libc::waitpid(-1, std::ptr::null_mut(), libc::WNOHANG) > 0 {} }
std::thread::sleep(std::time::Duration::from_secs(5));
}
});
// Log version and build hash so we can verify what's running.
let build_hash =
std::fs::read_to_string(".huskies/build_hash").unwrap_or_else(|_| "unknown".to_string());
slog!(
"[startup] huskies v{} (build {})",
env!("CARGO_PKG_VERSION"),
build_hash.trim()
);
let app_state = Arc::new(SessionState::default());
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
// Migrate legacy root-level store.json into .huskies/ if the new path does
// not yet exist. This keeps existing deployments working after upgrade.
let legacy_store_path = cwd.join("store.json");
let store_path = cwd.join(".huskies").join("store.json");
if legacy_store_path.exists() && !store_path.exists() {
if let Some(parent) = store_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::rename(&legacy_store_path, &store_path);
}
2026-04-09 17:58:29 +01:00
let store = Arc::new(JsonFileStore::from_path(store_path).map_err(std::io::Error::other)?);
// Collect CLI args, skipping the binary name (argv[0]).
let raw_args: Vec<String> = std::env::args().skip(1).collect();
let cli = match parse_cli_args(&raw_args) {
Ok(args) => args,
Err(msg) => {
eprintln!("error: {msg}");
eprintln!("Run 'huskies --help' for usage.");
std::process::exit(1);
}
};
let is_init = cli.init;
let is_agent = cli.agent;
let is_gateway = cli.gateway;
let agent_rendezvous = cli.rendezvous.clone();
let explicit_path = resolve_path_arg(cli.path.as_deref(), &cwd);
// Port resolution: CLI flag > project.toml (loaded later) > default.
// Use the CLI port for scaffolding .mcp.json; final port is resolved
// after the project root is known.
let port = cli.port.unwrap_or_else(resolve_port);
// When a path is given explicitly on the CLI, it must already exist as a
// directory. We do not create directories from the command line.
if let Some(ref path) = explicit_path {
if !path.exists() {
2026-03-27 12:31:08 +00:00
eprintln!("error: path does not exist: {}", path.display());
std::process::exit(1);
}
if !path.is_dir() {
2026-03-27 12:31:08 +00:00
eprintln!("error: path is not a directory: {}", path.display());
std::process::exit(1);
}
}
// ── Gateway mode: multi-project proxy ──────────────────────────────
//
// When `huskies --gateway` is invoked, skip the normal single-project
// server and instead start a lightweight proxy that routes MCP calls
// to per-project Docker containers based on a projects.toml config.
if is_gateway {
let config_dir = explicit_path.unwrap_or_else(|| cwd.clone());
let config_path = config_dir.join("projects.toml");
return gateway::run(&config_path, port).await;
}
if is_init {
// `huskies init [PATH]` — always scaffold, never search parents.
let init_root = explicit_path.unwrap_or_else(|| cwd.clone());
if !init_root.exists() {
std::fs::create_dir_all(&init_root).unwrap_or_else(|e| {
2026-04-09 17:58:29 +01:00
eprintln!(
"error: cannot create directory {}: {e}",
init_root.display()
);
std::process::exit(1);
});
}
match io::fs::open_project(
init_root.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
port,
)
.await
{
Ok(_) => {
if let Some(root) = app_state.project_root.lock().unwrap().as_ref() {
config::ProjectConfig::load(root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
// Initialize wizard state for the setup flow.
io::wizard::WizardState::init_if_missing(root);
}
}
Err(e) => {
eprintln!("error: {e}");
std::process::exit(1);
}
}
} else if let Some(explicit_root) = explicit_path {
// An explicit path was given on the command line.
// Open it directly — scaffold .huskies/ if it is missing — and
// exit with a clear error message if the path is invalid.
match io::fs::open_project(
explicit_root.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
port,
)
.await
{
Ok(_) => {
if let Some(root) = app_state.project_root.lock().unwrap().as_ref() {
config::ProjectConfig::load(root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
}
}
Err(e) => {
eprintln!("error: {e}");
std::process::exit(1);
}
}
} else {
// No path argument — auto-detect a .huskies/ project in cwd or
// parent directories (preserves existing behaviour).
if let Some(project_root) = find_story_kit_root(&cwd) {
io::fs::open_project(
project_root.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
port,
)
.await
.unwrap_or_else(|e| {
slog!("Warning: failed to auto-open project at {project_root:?}: {e}");
project_root.to_string_lossy().to_string()
});
// Validate agent config for the detected project root.
config::ProjectConfig::load(&project_root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
} else {
// No .huskies/ found in cwd or parents — scaffold cwd as a new
// project, exactly like `huskies .` does.
io::fs::open_project(
cwd.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
port,
)
.await
.unwrap_or_else(|e| {
slog!("Warning: failed to scaffold project at {cwd:?}: {e}");
cwd.to_string_lossy().to_string()
});
}
}
// Enable persistent server log file now that the project root is known.
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
let log_dir = root.join(".huskies").join("logs");
let _ = std::fs::create_dir_all(&log_dir);
log_buffer::global().set_log_file(log_dir.join("server.log"));
}
// Initialise the SQLite pipeline shadow-write database and CRDT state layer.
// Clone the path out before the await so we don't hold the MutexGuard across
// an await point.
let pipeline_db_path = app_state
.project_root
.lock()
.unwrap()
.as_ref()
.map(|root| root.join(".huskies").join("pipeline.db"));
if let Some(ref db_path) = pipeline_db_path {
if let Err(e) = db::init(db_path).await {
slog!("[db] Failed to initialise pipeline.db: {e}");
}
if let Err(e) = crdt_state::init(db_path).await {
slog!("[crdt] Failed to initialise CRDT state layer: {e}");
}
}
// (CRDT state layer is initialised above alongside the legacy pipeline.db.)
// Load trusted keys, token auth config, and start the CRDT sync rendezvous
// client if configured. In agent mode, the --rendezvous flag overrides
// project.toml. The --join-token / HUSKIES_JOIN_TOKEN is appended to the
// rendezvous URL as ?token=... so the server's bearer-token check passes.
let crdt_join_token = cli
.join_token
.clone()
.or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok());
let sync_config = if is_agent {
agent_rendezvous
.clone()
.map(|url| (url, Vec::new(), false, Vec::new()))
} else {
app_state
.project_root
.lock()
.unwrap()
.as_ref()
.and_then(|root| config::ProjectConfig::load(root).ok())
.and_then(|cfg| {
cfg.rendezvous.map(|url| {
(
url,
cfg.trusted_keys,
cfg.crdt_require_token,
cfg.crdt_tokens,
)
})
})
};
if let Some((rendezvous_url, trusted_keys, require_token, crdt_tokens)) = sync_config {
crdt_sync::init_trusted_keys(trusted_keys);
crdt_sync::init_token_auth(require_token, crdt_tokens);
crdt_sync::spawn_rendezvous_client(rendezvous_url, crdt_join_token);
} else {
// Even without rendezvous, initialise trusted keys and token auth for
// incoming connections.
let (keys, require_token, crdt_tokens) = app_state
.project_root
.lock()
.unwrap()
.as_ref()
.and_then(|root| config::ProjectConfig::load(root).ok())
.map(|cfg| (cfg.trusted_keys, cfg.crdt_require_token, cfg.crdt_tokens))
.unwrap_or_default();
crdt_sync::init_trusted_keys(keys);
crdt_sync::init_token_auth(require_token, crdt_tokens);
}
// ── Agent mode: headless build agent ────────────────────────────────
//
// When `huskies agent --rendezvous <URL>` is invoked, skip the web UI,
// chat bots, and HTTP server entirely. Instead, run a headless loop that:
// 1. Syncs CRDT state with the rendezvous peer.
// 2. Scans for unclaimed work and claims it via CRDT.
// 3. Runs Claude Code locally for claimed stories.
// 4. Pushes feature branches and reports completion via CRDT.
if is_agent {
let agent_root = app_state.project_root.lock().unwrap().clone();
let rendezvous = agent_rendezvous.expect("agent mode requires --rendezvous");
// Join token / gateway URL can come from CLI flags or environment variables.
let join_token = cli
.join_token
.clone()
.or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok());
let agent_gateway_url = cli
.gateway_url
.clone()
.or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok());
return agent_mode::run(agent_root, rendezvous, port, join_token, agent_gateway_url).await;
}
let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default()));
// Event bus: broadcast channel for pipeline lifecycle events.
// Created before AgentPool so the pool can emit AgentStateChanged events.
let (watcher_tx, _) = broadcast::channel::<io::watcher::WatcherEvent>(1024);
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));
// Filesystem watcher: watches config files (project.toml, agents.toml) for
// hot-reload. Work-item pipeline events are driven by CRDT state transitions
// via crdt_state::subscribe(). Sweep (done→archived) is handled by the unified
// background tick loop below.
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
io::watcher::start_watcher(root.clone(), watcher_tx.clone());
}
// Bridge CRDT state-transition events to the watcher broadcast channel.
// This replaces the filesystem watcher as the source of WorkItem events.
// Also prunes worktrees when stories transition to 6_archived.
{
let crdt_watcher_tx = watcher_tx.clone();
let crdt_prune_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
if let Some(mut crdt_rx) = crdt_state::subscribe() {
tokio::spawn(async move {
while let Ok(evt) = crdt_rx.recv().await {
// Prune the worktree when a story is archived.
if evt.to_stage == "6_archived"
&& let Some(root) = crdt_prune_root.as_ref().cloned()
{
let story_id = evt.story_id.clone();
tokio::task::spawn_blocking(move || {
if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) {
crate::slog!("[crdt] worktree prune failed for {story_id}: {e}");
}
});
}
let (action, commit_msg) =
io::watcher::stage_metadata(&evt.to_stage, &evt.story_id)
.unwrap_or(("update", format!("huskies: update {}", evt.story_id)));
let watcher_evt = io::watcher::WatcherEvent::WorkItem {
stage: evt.to_stage,
item_id: evt.story_id,
action: action.to_string(),
commit_msg,
from_stage: evt.from_stage,
};
let _ = crdt_watcher_tx.send(watcher_evt);
}
});
}
}
// Subscribe to watcher events so that auto-assign triggers when a work item
// enters an active pipeline stage (2_current/, 3_qa/, 4_merge/).
{
let watcher_auto_rx = watcher_tx.subscribe();
let watcher_auto_agents = Arc::clone(&agents);
let watcher_auto_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
if let Some(root) = watcher_auto_root {
tokio::spawn(async move {
let mut rx = watcher_auto_rx;
while let Ok(event) = rx.recv().await {
if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event
&& matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge")
{
slog!(
"[auto-assign] CRDT transition detected in {stage}/; \
triggering auto-assign."
);
watcher_auto_agents.auto_assign_available_work(&root).await;
}
}
});
}
}
// Reconciliation progress channel: startup reconciliation → WebSocket clients.
let (reconciliation_tx, _) = broadcast::channel::<agents::ReconciliationEvent>(64);
// Permission channel: MCP prompt_permission → WebSocket handler.
let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
// Clone watcher_tx for the Matrix bot before it is moved into AppContext.
let watcher_tx_for_bot = watcher_tx.clone();
// Subscribe to watcher events for WhatsApp/Slack notification listeners
// before watcher_tx is moved into AppContext.
let watcher_rx_for_whatsapp = watcher_tx.subscribe();
let watcher_rx_for_slack = watcher_tx.subscribe();
let watcher_rx_for_discord = watcher_tx.subscribe();
// Subscribe to watcher events for the per-project event buffer (gateway polling).
let watcher_rx_for_events = watcher_tx.subscribe();
// Wrap perm_rx in Arc<Mutex> so it can be shared across the Services
// bundle (AppContext + Matrix bot) and the webhook-based transports.
let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx));
// Capture project root, agents Arc, and reconciliation sender before ctx
// is consumed by build_routes.
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
let startup_agents = Arc::clone(&agents);
let startup_reconciliation_tx = reconciliation_tx.clone();
// Clone for shutdown cleanup — kill orphaned PTY children before exiting.
let agents_for_shutdown = Arc::clone(&agents);
// ── Construct the shared Services bundle ────────────────────────────
//
// A single `Arc<Services>` is built here and cloned into `AppContext`
// and the Matrix `BotContext`. Bot-level fields (name, user-id, etc.)
// come from `bot.toml` when present; otherwise sensible defaults apply.
let bot_cfg_for_services = startup_root
.as_ref()
.and_then(|root| chat::transport::matrix::BotConfig::load(root));
let services = Arc::new(services::Services {
project_root: startup_root.clone().unwrap_or_default(),
agents: Arc::clone(&agents),
bot_name: bot_cfg_for_services
.as_ref()
.and_then(|c| c.display_name.clone())
.unwrap_or_else(|| "Assistant".to_string()),
bot_user_id: String::new(),
ambient_rooms: Arc::new(std::sync::Mutex::new(
bot_cfg_for_services
.as_ref()
.map(|c| c.ambient_rooms.iter().cloned().collect())
.unwrap_or_default(),
)),
perm_rx: Arc::clone(&perm_rx),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
permission_timeout_secs: bot_cfg_for_services
.as_ref()
.map(|c| c.permission_timeout_secs)
.unwrap_or(120),
status: Arc::new(service::status::StatusBroadcaster::new()),
});
// Build WhatsApp webhook context if bot.toml configures transport = "whatsapp".
let whatsapp_ctx: Option<Arc<chat::transport::whatsapp::WhatsAppWebhookContext>> = startup_root
.as_ref()
.and_then(|root| chat::transport::matrix::BotConfig::load(root))
.filter(|cfg| cfg.transport == "whatsapp")
.map(|cfg| {
let provider = cfg.whatsapp_provider.clone();
2026-03-27 12:31:08 +00:00
let transport: Arc<dyn crate::chat::ChatTransport> = if provider == "twilio" {
Arc::new(chat::transport::whatsapp::TwilioWhatsAppTransport::new(
cfg.twilio_account_sid.clone().unwrap_or_default(),
cfg.twilio_auth_token.clone().unwrap_or_default(),
cfg.twilio_whatsapp_number.clone().unwrap_or_default(),
))
} else {
let template_name = cfg
.whatsapp_notification_template
.clone()
.unwrap_or_else(|| "pipeline_notification".to_string());
Arc::new(chat::transport::whatsapp::WhatsAppTransport::new(
cfg.whatsapp_phone_number_id.clone().unwrap_or_default(),
cfg.whatsapp_access_token.clone().unwrap_or_default(),
template_name,
))
};
let root = startup_root.clone().unwrap();
let history = chat::transport::whatsapp::load_whatsapp_history(&root);
Arc::new(chat::transport::whatsapp::WhatsAppWebhookContext {
services: Arc::clone(&services),
verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(),
provider,
transport,
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
window_tracker: Arc::new(chat::transport::whatsapp::MessagingWindowTracker::new()),
allowed_phones: cfg.whatsapp_allowed_phones.clone(),
})
});
// Build Slack webhook context if bot.toml configures transport = "slack".
let slack_ctx: Option<Arc<chat::transport::slack::SlackWebhookContext>> = startup_root
.as_ref()
.and_then(|root| chat::transport::matrix::BotConfig::load(root))
.filter(|cfg| cfg.transport == "slack")
.map(|cfg| {
let transport = Arc::new(chat::transport::slack::SlackTransport::new(
cfg.slack_bot_token.clone().unwrap_or_default(),
));
let root = startup_root.clone().unwrap();
let history = chat::transport::slack::load_slack_history(&root);
let channel_ids: std::collections::HashSet<String> =
cfg.slack_channel_ids.iter().cloned().collect();
Arc::new(chat::transport::slack::SlackWebhookContext {
services: Arc::clone(&services),
signing_secret: cfg.slack_signing_secret.clone().unwrap_or_default(),
transport,
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
channel_ids,
})
});
// Build Discord context if bot.toml configures transport = "discord".
let discord_ctx: Option<Arc<chat::transport::discord::DiscordContext>> = startup_root
.as_ref()
.and_then(|root| chat::transport::matrix::BotConfig::load(root))
.filter(|cfg| cfg.transport == "discord")
.map(|cfg| {
let transport = Arc::new(chat::transport::discord::DiscordTransport::new(
cfg.discord_bot_token.clone().unwrap_or_default(),
));
let root = startup_root.clone().unwrap();
let history = chat::transport::discord::load_discord_history(&root);
let channel_ids: std::collections::HashSet<String> =
cfg.discord_channel_ids.iter().cloned().collect();
let allowed_users: std::collections::HashSet<String> =
cfg.discord_allowed_users.iter().cloned().collect();
Arc::new(chat::transport::discord::DiscordContext {
services: Arc::clone(&services),
bot_token: cfg.discord_bot_token.clone().unwrap_or_default(),
transport,
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
channel_ids,
allowed_users,
})
});
// Build a best-effort shutdown notifier for webhook-based transports.
//
// • Slack: channels are fixed at startup (channel_ids from bot.toml).
// • Discord: channels are fixed at startup (channel_ids from bot.toml).
// • WhatsApp: active senders are tracked at runtime in ambient_rooms.
// We keep the WhatsApp context Arc so we can read the rooms at shutdown.
// • Matrix: the bot task manages its own announcement via matrix_shutdown_tx.
2026-03-27 12:31:08 +00:00
let bot_shutdown_notifier: Option<Arc<BotShutdownNotifier>> = if let Some(ref ctx) = slack_ctx {
let channels: Vec<String> = ctx.channel_ids.iter().cloned().collect();
Some(Arc::new(BotShutdownNotifier::new(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
channels,
ctx.services.bot_name.clone(),
2026-03-27 12:31:08 +00:00
)))
} else if let Some(ref ctx) = discord_ctx {
let channels: Vec<String> = ctx.channel_ids.iter().cloned().collect();
Some(Arc::new(BotShutdownNotifier::new(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
channels,
ctx.services.bot_name.clone(),
)))
2026-03-27 12:31:08 +00:00
} else {
None
};
// Retain a reference to the WhatsApp context for shutdown notifications.
// At shutdown time we read ambient_rooms to get the current set of active senders.
let whatsapp_ctx_for_shutdown: Option<Arc<chat::transport::whatsapp::WhatsAppWebhookContext>> =
whatsapp_ctx.clone();
// ── Startup announcements (WhatsApp & Slack) ──────────────────────────
//
// Send "{bot_name} is online." to all known contacts so users know the bot
// is ready. This mirrors the Matrix bot's startup announcement and fires
// on every fresh process start — including after a rebuild/re-exec.
//
// • WhatsApp: send to all phone numbers present in persisted history.
// • Slack: send to all configured channel IDs (channel_ids from bot.toml).
// • Matrix: handled by spawn_bot() below; no action needed here.
if let Some(ref ctx) = whatsapp_ctx {
let transport = Arc::clone(&ctx.transport);
let bot_name = ctx.services.bot_name.clone();
2026-03-27 12:31:08 +00:00
let history: WhatsAppConversationHistory = Arc::clone(&ctx.history);
tokio::spawn(async move {
let senders: Vec<String> = history.lock().await.keys().cloned().collect();
if senders.is_empty() {
return;
}
2026-03-27 12:31:08 +00:00
let notifier = crate::rebuild::BotShutdownNotifier::new(transport, senders, bot_name);
notifier.notify_startup().await;
});
}
if let Some(ref ctx) = slack_ctx {
let transport = Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>;
let bot_name = ctx.services.bot_name.clone();
let channels: Vec<String> = ctx.channel_ids.iter().cloned().collect();
tokio::spawn(async move {
if channels.is_empty() {
return;
}
2026-03-27 12:31:08 +00:00
let notifier = crate::rebuild::BotShutdownNotifier::new(transport, channels, bot_name);
notifier.notify_startup().await;
});
}
if let Some(ref ctx) = discord_ctx {
let transport = Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>;
let bot_name = ctx.services.bot_name.clone();
let channels: Vec<String> = ctx.channel_ids.iter().cloned().collect();
tokio::spawn(async move {
if channels.is_empty() {
return;
}
let notifier = crate::rebuild::BotShutdownNotifier::new(transport, channels, bot_name);
notifier.notify_startup().await;
});
}
// Watch channel: signals the Matrix bot task to send a shutdown announcement.
// `None` initial value means "server is running".
let (matrix_shutdown_tx, matrix_shutdown_rx) =
tokio::sync::watch::channel::<Option<ShutdownReason>>(None);
let matrix_shutdown_tx = Arc::new(matrix_shutdown_tx);
let matrix_shutdown_tx_for_rebuild = Arc::clone(&matrix_shutdown_tx);
// Shared rate-limit retry timer store, accessible from MCP tools via
// AppContext so manual interventions (move_story → backlog, stop_agent)
// can cancel pending timers in-memory rather than only on disk.
// Also shared with the Matrix bot tick loop (bug 655).
let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load(
startup_root
.as_ref()
.map(|r| r.join(".huskies").join("timers.json"))
.unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-timers.json")),
));
let timer_store_for_tick = Arc::clone(&timer_store);
let timer_store_for_bot = Arc::clone(&timer_store);
let ctx = AppContext {
state: app_state,
store,
workflow,
services: Arc::clone(&services),
watcher_tx,
reconciliation_tx,
perm_tx,
qa_app_process: Arc::new(std::sync::Mutex::new(None)),
bot_shutdown: bot_shutdown_notifier.clone(),
matrix_shutdown_tx: Some(Arc::clone(&matrix_shutdown_tx)),
timer_store,
test_jobs: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
};
// Create the per-project event buffer and subscribe it to the watcher channel
// so that pipeline events are buffered for the gateway's `/api/events` poller.
let event_buffer = crate::http::events::EventBuffer::new();
crate::http::events::subscribe_to_watcher(event_buffer.clone(), watcher_rx_for_events);
let app = build_routes(
ctx,
whatsapp_ctx.clone(),
slack_ctx.clone(),
port,
Some(event_buffer),
);
// Unified 1-second background tick loop: fires due timers, detects orphaned
// agents (watchdog), and promotes done→archived items (sweep). Replaces the
// three separate background loops that previously ran independently.
{
let tick_agents = Arc::clone(&startup_agents);
let tick_timer = timer_store_for_tick;
let tick_root = startup_root.clone();
let sweep_cfg = tick_root
.as_ref()
.and_then(|r| config::ProjectConfig::load(r).ok())
.map(|c| c.watcher)
.unwrap_or_default();
let sweep_every = sweep_cfg.sweep_interval_secs.max(1);
let done_retention = std::time::Duration::from_secs(sweep_cfg.done_retention_secs);
let pending_count = tick_timer.list().len();
crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)");
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
let mut tick_count: u64 = 0;
loop {
interval.tick().await;
tick_count = tick_count.wrapping_add(1);
// Timer: fire due timers every second.
if let Some(ref root) = tick_root {
let result =
crate::service::timer::tick_once(&tick_timer, &tick_agents, root).await;
if let Err(msg) = result {
crate::slog_error!("[tick] Timer tick panicked: {msg}");
}
}
// Watchdog: detect orphaned Running agents every 30 ticks.
if tick_count.is_multiple_of(30) {
let found = tick_agents.run_watchdog_pass(tick_root.as_deref());
if found > 0 {
crate::slog!(
"[tick] {found} orphaned agent(s) detected; triggering auto-assign."
);
if let Some(ref root) = tick_root {
tick_agents.auto_assign_available_work(root).await;
}
}
}
// Sweep: promote done→archived every sweep_interval_secs ticks.
if tick_count.is_multiple_of(sweep_every) {
crate::io::watcher::sweep_done_to_archived(done_retention);
}
}
});
}
// Optional Matrix bot: connect to the homeserver and start listening for
// messages if `.huskies/bot.toml` is present and enabled.
if let Some(ref root) = startup_root {
let _ = chat::transport::matrix::spawn_bot(
root,
watcher_tx_for_bot,
Arc::clone(&services),
matrix_shutdown_rx,
None,
vec![],
std::collections::BTreeMap::new(),
timer_store_for_bot,
);
} else {
// Keep the receiver alive (drop it) so the sender never errors.
drop(matrix_shutdown_rx);
}
// Spawn stage-transition notification listeners for WhatsApp and Slack.
// These mirror the listener that the Matrix bot spawns internally.
if let (Some(ctx), Some(root)) = (&whatsapp_ctx, &startup_root) {
let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms);
crate::service::notifications::spawn_notification_listener(
Arc::clone(&ctx.transport),
move || ambient_rooms.lock().unwrap().iter().cloned().collect(),
watcher_rx_for_whatsapp,
root.clone(),
);
} else {
drop(watcher_rx_for_whatsapp);
}
if let (Some(ctx), Some(root)) = (&slack_ctx, &startup_root) {
let channel_ids: Vec<String> = ctx.channel_ids.iter().cloned().collect();
crate::service::notifications::spawn_notification_listener(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
move || channel_ids.clone(),
watcher_rx_for_slack,
root.clone(),
);
} else {
drop(watcher_rx_for_slack);
}
if let (Some(ctx), Some(root)) = (&discord_ctx, &startup_root) {
// Spawn the Discord Gateway WebSocket listener.
chat::transport::discord::gateway::spawn_gateway(Arc::clone(ctx));
// Spawn stage-transition notification listener for Discord.
let channel_ids: Vec<String> = ctx.channel_ids.iter().cloned().collect();
crate::service::notifications::spawn_notification_listener(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
move || channel_ids.clone(),
watcher_rx_for_discord,
root.clone(),
);
} else {
drop(watcher_rx_for_discord);
}
// On startup:
// 1. Reconcile any stories whose agent work was committed while the server was
// offline (worktree has commits ahead of master but pipeline didn't advance).
// 2. Auto-assign free agents to remaining unassigned work in the pipeline.
if let Some(root) = startup_root {
tokio::spawn(async move {
slog!("[startup] Reconciling completed worktrees from previous session.");
startup_agents
.reconcile_on_startup(&root, &startup_reconciliation_tx)
.await;
slog!("[auto-assign] Scanning pipeline stages for unassigned work.");
startup_agents.auto_assign_available_work(&root).await;
});
}
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
2026-03-24 21:26:48 +00:00
let addr = format!("{host}:{port}");
2026-04-03 21:38:58 +01:00
println!("\x1b[97;1m");
println!(" /\\_/\\ \x1b[96;1m _ _ _ _ \x1b[97;1m");
println!(" / o o \\ \x1b[96;1m| | | |_ _ ___| | _(_) ___ ___\x1b[97;1m");
println!(" ( Y ) \x1b[96;1m| |_| | | | / __| |/ / |/ _ \\/ __|\x1b[97;1m");
println!(" \\ ^ / \x1b[96;1m| _ | |_| \\__ \\ <| | __/\\__ \\\x1b[97;1m");
println!(" )===( \\ \x1b[96;1m|_| |_|\\__,_|___/_|\\_\\_|\\___||___/\x1b[97;1m");
println!(" / \\ \\ \x1b[90mStory-driven development, powered by AI\x1b[97;1m");
println!(" | | | |");
println!(" /| | |\\|");
println!(" \\|__|__|/\x1b[0m");
2026-04-03 21:03:54 +01:00
println!();
println!("HUSKIES_PORT={port}");
println!("\x1b[96;1mFrontend:\x1b[0m \x1b[94mhttp://{addr}\x1b[0m");
println!("\x1b[92;1mOpenAPI Docs:\x1b[0m \x1b[94mhttp://{addr}/docs\x1b[0m");
let port_file = write_port_file(&cwd, port);
let result = Server::new(TcpListener::bind(&addr)).run(app).await;
// ── Shutdown notifications (best-effort) ─────────────────────────────
//
// The server is stopping (SIGINT / SIGTERM). Notify active bot channels
// so participants know the bot is going offline. We do this before killing
// PTY children so network I/O can still complete.
// Slack: notifier holds the fixed channel list.
if let Some(ref notifier) = bot_shutdown_notifier {
notifier.notify(ShutdownReason::Manual).await;
}
// WhatsApp: read the current set of ambient rooms and notify each sender.
if let Some(ref ctx) = whatsapp_ctx_for_shutdown {
let rooms: Vec<String> = ctx
.services
.ambient_rooms
.lock()
.unwrap()
.iter()
.cloned()
.collect();
if !rooms.is_empty() {
let wa_notifier = BotShutdownNotifier::new(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
rooms,
ctx.services.bot_name.clone(),
);
wa_notifier.notify(ShutdownReason::Manual).await;
}
}
// Matrix: signal the bot task and give it a short window to send its message.
let _ = matrix_shutdown_tx_for_rebuild.send(Some(ShutdownReason::Manual));
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
// ── Cleanup ──────────────────────────────────────────────────────────
// Kill all active PTY child processes before exiting to prevent orphaned
// Claude Code processes from running after the server restarts.
agents_for_shutdown.kill_all_children();
if let Some(ref path) = port_file {
remove_port_file(path);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(expected = "Invalid project.toml: Duplicate agent name")]
fn panics_on_duplicate_agent_names() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".huskies");
std::fs::create_dir_all(&sk).unwrap();
std::fs::write(
sk.join("project.toml"),
r#"
[[agent]]
name = "coder"
[[agent]]
name = "coder"
"#,
)
.unwrap();
config::ProjectConfig::load(tmp.path())
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
}
}