Files
huskies/server/src/main.rs
T
dave 06035f20ad fix: restore #[tokio::main] on main(), #[cfg(unix)] on platform tests, #[allow] on run_pty_session/AuthListenerResult
The biggest miss is #[tokio::main] — without it, async fn main() doesn't compile,
and the binary in every worktree fails 'cargo check'. Agents in those worktrees
burn their turn budgets trying to fix the build before they can do real work, then
get killed by the watchdog. That's why all three in-flight stories failed.

Other restored attributes:
- #[cfg(unix)] on 4 tests in merge/squash and scaffold (skip on non-Unix)
- #[allow(dead_code)] on AuthListenerResult test enum
- #[allow(clippy::too_many_arguments)] on run_pty_session

Same root cause as the earlier #[test] attribute losses: my line ranges started
at the fn line, missing the leading attribute on the previous line.
2026-04-26 23:38:17 +00:00

898 lines
38 KiB
Rust

//! Huskies server — entry point, CLI argument parsing, and server startup.
// matrix-sdk-crypto's deeply nested types require a higher recursion limit
// when the `e2e-encryption` feature is enabled.
#![recursion_limit = "256"]
mod agent_log;
mod agent_mode;
mod agents;
mod chat;
mod config;
pub mod crdt_snapshot;
pub mod crdt_state;
pub mod crdt_sync;
pub mod crdt_wire;
mod db;
pub mod gateway;
mod http;
mod io;
mod llm;
pub mod log_buffer;
pub mod mesh;
pub mod node_identity;
pub(crate) mod pipeline_state;
pub mod rebuild;
mod service;
pub mod services;
mod state;
mod store;
mod workflow;
mod worktree;
use crate::agents::AgentPool;
use crate::chat::transport::whatsapp::WhatsAppConversationHistory;
use crate::http::build_routes;
use crate::http::context::AppContext;
use crate::http::{remove_port_file, resolve_port, write_port_file};
use crate::io::fs::find_story_kit_root;
use crate::rebuild::{BotShutdownNotifier, ShutdownReason};
use crate::state::SessionState;
use crate::store::JsonFileStore;
use crate::workflow::WorkflowState;
use poem::Server;
use poem::listener::TcpListener;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::broadcast;
mod cli;
use cli::{CliArgs, parse_cli_args, print_help, resolve_path_arg};
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
// Reap zombie grandchildren on Unix (for native deployments without tini/init).
// Docker containers with `init: true` in docker-compose.yml already have tini
// as PID 1 for this. For native macOS/Linux, poll waitpid(-1, WNOHANG) in a
// background thread so orphaned grandchildren don't accumulate as zombies.
#[cfg(unix)]
std::thread::spawn(|| {
loop {
// SAFETY: waitpid(-1, ...) with WNOHANG is always safe to call.
unsafe { while libc::waitpid(-1, std::ptr::null_mut(), libc::WNOHANG) > 0 {} }
std::thread::sleep(std::time::Duration::from_secs(5));
}
});
// Log version and build hash so we can verify what's running.
let build_hash =
std::fs::read_to_string(".huskies/build_hash").unwrap_or_else(|_| "unknown".to_string());
slog!(
"[startup] huskies v{} (build {})",
env!("CARGO_PKG_VERSION"),
build_hash.trim()
);
let app_state = Arc::new(SessionState::default());
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
// Migrate legacy root-level store.json into .huskies/ if the new path does
// not yet exist. This keeps existing deployments working after upgrade.
let legacy_store_path = cwd.join("store.json");
let store_path = cwd.join(".huskies").join("store.json");
if legacy_store_path.exists() && !store_path.exists() {
if let Some(parent) = store_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::rename(&legacy_store_path, &store_path);
}
let store = Arc::new(JsonFileStore::from_path(store_path).map_err(std::io::Error::other)?);
// Collect CLI args, skipping the binary name (argv[0]).
let raw_args: Vec<String> = std::env::args().skip(1).collect();
let cli = match parse_cli_args(&raw_args) {
Ok(args) => args,
Err(msg) => {
eprintln!("error: {msg}");
eprintln!("Run 'huskies --help' for usage.");
std::process::exit(1);
}
};
let is_init = cli.init;
let is_agent = cli.agent;
let is_gateway = cli.gateway;
let agent_rendezvous = cli.rendezvous.clone();
let explicit_path = resolve_path_arg(cli.path.as_deref(), &cwd);
// Port resolution: CLI flag > project.toml (loaded later) > default.
// Use the CLI port for scaffolding .mcp.json; final port is resolved
// after the project root is known.
let port = cli.port.unwrap_or_else(resolve_port);
// When a path is given explicitly on the CLI, it must already exist as a
// directory. We do not create directories from the command line.
if let Some(ref path) = explicit_path {
if !path.exists() {
eprintln!("error: path does not exist: {}", path.display());
std::process::exit(1);
}
if !path.is_dir() {
eprintln!("error: path is not a directory: {}", path.display());
std::process::exit(1);
}
}
// ── Gateway mode: multi-project proxy ──────────────────────────────
//
// When `huskies --gateway` is invoked, skip the normal single-project
// server and instead start a lightweight proxy that routes MCP calls
// to per-project Docker containers based on a projects.toml config.
if is_gateway {
let config_dir = explicit_path.unwrap_or_else(|| cwd.clone());
let config_path = config_dir.join("projects.toml");
return gateway::run(&config_path, port).await;
}
if is_init {
// `huskies init [PATH]` — always scaffold, never search parents.
let init_root = explicit_path.unwrap_or_else(|| cwd.clone());
if !init_root.exists() {
std::fs::create_dir_all(&init_root).unwrap_or_else(|e| {
eprintln!(
"error: cannot create directory {}: {e}",
init_root.display()
);
std::process::exit(1);
});
}
match io::fs::open_project(
init_root.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
port,
)
.await
{
Ok(_) => {
if let Some(root) = app_state.project_root.lock().unwrap().as_ref() {
config::ProjectConfig::load(root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
// Initialize wizard state for the setup flow.
io::wizard::WizardState::init_if_missing(root);
}
}
Err(e) => {
eprintln!("error: {e}");
std::process::exit(1);
}
}
} else if let Some(explicit_root) = explicit_path {
// An explicit path was given on the command line.
// Open it directly — scaffold .huskies/ if it is missing — and
// exit with a clear error message if the path is invalid.
match io::fs::open_project(
explicit_root.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
port,
)
.await
{
Ok(_) => {
if let Some(root) = app_state.project_root.lock().unwrap().as_ref() {
config::ProjectConfig::load(root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
}
}
Err(e) => {
eprintln!("error: {e}");
std::process::exit(1);
}
}
} else {
// No path argument — auto-detect a .huskies/ project in cwd or
// parent directories (preserves existing behaviour).
if let Some(project_root) = find_story_kit_root(&cwd) {
io::fs::open_project(
project_root.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
port,
)
.await
.unwrap_or_else(|e| {
slog!("Warning: failed to auto-open project at {project_root:?}: {e}");
project_root.to_string_lossy().to_string()
});
// Validate agent config for the detected project root.
config::ProjectConfig::load(&project_root)
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
} else {
// No .huskies/ found in cwd or parents — scaffold cwd as a new
// project, exactly like `huskies .` does.
io::fs::open_project(
cwd.to_string_lossy().to_string(),
&app_state,
store.as_ref(),
port,
)
.await
.unwrap_or_else(|e| {
slog!("Warning: failed to scaffold project at {cwd:?}: {e}");
cwd.to_string_lossy().to_string()
});
}
}
// Enable persistent server log file now that the project root is known.
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
let log_dir = root.join(".huskies").join("logs");
let _ = std::fs::create_dir_all(&log_dir);
log_buffer::global().set_log_file(log_dir.join("server.log"));
}
// Initialise the SQLite pipeline shadow-write database and CRDT state layer.
// Clone the path out before the await so we don't hold the MutexGuard across
// an await point.
let pipeline_db_path = app_state
.project_root
.lock()
.unwrap()
.as_ref()
.map(|root| root.join(".huskies").join("pipeline.db"));
if let Some(ref db_path) = pipeline_db_path {
if let Err(e) = db::init(db_path).await {
slog!("[db] Failed to initialise pipeline.db: {e}");
}
if let Err(e) = crdt_state::init(db_path).await {
slog!("[crdt] Failed to initialise CRDT state layer: {e}");
}
}
// (CRDT state layer is initialised above alongside the legacy pipeline.db.)
// Load trusted keys, token auth config, and start the CRDT sync rendezvous
// client if configured. In agent mode, the --rendezvous flag overrides
// project.toml. The --join-token / HUSKIES_JOIN_TOKEN is appended to the
// rendezvous URL as ?token=... so the server's bearer-token check passes.
let crdt_join_token = cli
.join_token
.clone()
.or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok());
let sync_config = if is_agent {
agent_rendezvous
.clone()
.map(|url| (url, Vec::new(), false, Vec::new()))
} else {
app_state
.project_root
.lock()
.unwrap()
.as_ref()
.and_then(|root| config::ProjectConfig::load(root).ok())
.and_then(|cfg| {
cfg.rendezvous.map(|url| {
(
url,
cfg.trusted_keys,
cfg.crdt_require_token,
cfg.crdt_tokens,
)
})
})
};
if let Some((rendezvous_url, trusted_keys, require_token, crdt_tokens)) = sync_config {
crdt_sync::init_trusted_keys(trusted_keys);
crdt_sync::init_token_auth(require_token, crdt_tokens);
crdt_sync::spawn_rendezvous_client(rendezvous_url, crdt_join_token);
} else {
// Even without rendezvous, initialise trusted keys and token auth for
// incoming connections.
let (keys, require_token, crdt_tokens) = app_state
.project_root
.lock()
.unwrap()
.as_ref()
.and_then(|root| config::ProjectConfig::load(root).ok())
.map(|cfg| (cfg.trusted_keys, cfg.crdt_require_token, cfg.crdt_tokens))
.unwrap_or_default();
crdt_sync::init_trusted_keys(keys);
crdt_sync::init_token_auth(require_token, crdt_tokens);
}
// ── Agent mode: headless build agent ────────────────────────────────
//
// When `huskies agent --rendezvous <URL>` is invoked, skip the web UI,
// chat bots, and HTTP server entirely. Instead, run a headless loop that:
// 1. Syncs CRDT state with the rendezvous peer.
// 2. Scans for unclaimed work and claims it via CRDT.
// 3. Runs Claude Code locally for claimed stories.
// 4. Pushes feature branches and reports completion via CRDT.
if is_agent {
let agent_root = app_state.project_root.lock().unwrap().clone();
let rendezvous = agent_rendezvous.expect("agent mode requires --rendezvous");
// Join token / gateway URL can come from CLI flags or environment variables.
let join_token = cli
.join_token
.clone()
.or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok());
let agent_gateway_url = cli
.gateway_url
.clone()
.or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok());
return agent_mode::run(agent_root, rendezvous, port, join_token, agent_gateway_url).await;
}
let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default()));
// Event bus: broadcast channel for pipeline lifecycle events.
// Created before AgentPool so the pool can emit AgentStateChanged events.
let (watcher_tx, _) = broadcast::channel::<io::watcher::WatcherEvent>(1024);
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));
// Filesystem watcher: watches config files (project.toml, agents.toml) for
// hot-reload. Work-item pipeline events are driven by CRDT state transitions
// via crdt_state::subscribe(). Sweep (done→archived) is handled by the unified
// background tick loop below.
if let Some(ref root) = *app_state.project_root.lock().unwrap() {
io::watcher::start_watcher(root.clone(), watcher_tx.clone());
}
// Bridge CRDT state-transition events to the watcher broadcast channel.
// This replaces the filesystem watcher as the source of WorkItem events.
// Also prunes worktrees when stories transition to 6_archived.
{
let crdt_watcher_tx = watcher_tx.clone();
let crdt_prune_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
if let Some(mut crdt_rx) = crdt_state::subscribe() {
tokio::spawn(async move {
while let Ok(evt) = crdt_rx.recv().await {
// Prune the worktree when a story is archived.
if evt.to_stage == "6_archived"
&& let Some(root) = crdt_prune_root.as_ref().cloned()
{
let story_id = evt.story_id.clone();
tokio::task::spawn_blocking(move || {
if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) {
crate::slog!("[crdt] worktree prune failed for {story_id}: {e}");
}
});
}
let (action, commit_msg) =
io::watcher::stage_metadata(&evt.to_stage, &evt.story_id)
.unwrap_or(("update", format!("huskies: update {}", evt.story_id)));
let watcher_evt = io::watcher::WatcherEvent::WorkItem {
stage: evt.to_stage,
item_id: evt.story_id,
action: action.to_string(),
commit_msg,
from_stage: evt.from_stage,
};
let _ = crdt_watcher_tx.send(watcher_evt);
}
});
}
}
// Subscribe to watcher events so that auto-assign triggers when a work item
// enters an active pipeline stage (2_current/, 3_qa/, 4_merge/).
{
let watcher_auto_rx = watcher_tx.subscribe();
let watcher_auto_agents = Arc::clone(&agents);
let watcher_auto_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
if let Some(root) = watcher_auto_root {
tokio::spawn(async move {
let mut rx = watcher_auto_rx;
while let Ok(event) = rx.recv().await {
if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event
&& matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge")
{
slog!(
"[auto-assign] CRDT transition detected in {stage}/; \
triggering auto-assign."
);
watcher_auto_agents.auto_assign_available_work(&root).await;
}
}
});
}
}
// Reconciliation progress channel: startup reconciliation → WebSocket clients.
let (reconciliation_tx, _) = broadcast::channel::<agents::ReconciliationEvent>(64);
// Permission channel: MCP prompt_permission → WebSocket handler.
let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
// Clone watcher_tx for the Matrix bot before it is moved into AppContext.
let watcher_tx_for_bot = watcher_tx.clone();
// Subscribe to watcher events for WhatsApp/Slack notification listeners
// before watcher_tx is moved into AppContext.
let watcher_rx_for_whatsapp = watcher_tx.subscribe();
let watcher_rx_for_slack = watcher_tx.subscribe();
let watcher_rx_for_discord = watcher_tx.subscribe();
// Subscribe to watcher events for the per-project event buffer (gateway polling).
let watcher_rx_for_events = watcher_tx.subscribe();
// Wrap perm_rx in Arc<Mutex> so it can be shared across the Services
// bundle (AppContext + Matrix bot) and the webhook-based transports.
let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx));
// Capture project root, agents Arc, and reconciliation sender before ctx
// is consumed by build_routes.
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
let startup_agents = Arc::clone(&agents);
let startup_reconciliation_tx = reconciliation_tx.clone();
// Clone for shutdown cleanup — kill orphaned PTY children before exiting.
let agents_for_shutdown = Arc::clone(&agents);
// ── Construct the shared Services bundle ────────────────────────────
//
// A single `Arc<Services>` is built here and cloned into `AppContext`
// and the Matrix `BotContext`. Bot-level fields (name, user-id, etc.)
// come from `bot.toml` when present; otherwise sensible defaults apply.
let bot_cfg_for_services = startup_root
.as_ref()
.and_then(|root| chat::transport::matrix::BotConfig::load(root));
let services = Arc::new(services::Services {
project_root: startup_root.clone().unwrap_or_default(),
agents: Arc::clone(&agents),
bot_name: bot_cfg_for_services
.as_ref()
.and_then(|c| c.display_name.clone())
.unwrap_or_else(|| "Assistant".to_string()),
bot_user_id: String::new(),
ambient_rooms: Arc::new(std::sync::Mutex::new(
bot_cfg_for_services
.as_ref()
.map(|c| c.ambient_rooms.iter().cloned().collect())
.unwrap_or_default(),
)),
perm_rx: Arc::clone(&perm_rx),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
permission_timeout_secs: bot_cfg_for_services
.as_ref()
.map(|c| c.permission_timeout_secs)
.unwrap_or(120),
status: Arc::new(service::status::StatusBroadcaster::new()),
});
// Build WhatsApp webhook context if bot.toml configures transport = "whatsapp".
let whatsapp_ctx: Option<Arc<chat::transport::whatsapp::WhatsAppWebhookContext>> = startup_root
.as_ref()
.and_then(|root| chat::transport::matrix::BotConfig::load(root))
.filter(|cfg| cfg.transport == "whatsapp")
.map(|cfg| {
let provider = cfg.whatsapp_provider.clone();
let transport: Arc<dyn crate::chat::ChatTransport> = if provider == "twilio" {
Arc::new(chat::transport::whatsapp::TwilioWhatsAppTransport::new(
cfg.twilio_account_sid.clone().unwrap_or_default(),
cfg.twilio_auth_token.clone().unwrap_or_default(),
cfg.twilio_whatsapp_number.clone().unwrap_or_default(),
))
} else {
let template_name = cfg
.whatsapp_notification_template
.clone()
.unwrap_or_else(|| "pipeline_notification".to_string());
Arc::new(chat::transport::whatsapp::WhatsAppTransport::new(
cfg.whatsapp_phone_number_id.clone().unwrap_or_default(),
cfg.whatsapp_access_token.clone().unwrap_or_default(),
template_name,
))
};
let root = startup_root.clone().unwrap();
let history = chat::transport::whatsapp::load_whatsapp_history(&root);
Arc::new(chat::transport::whatsapp::WhatsAppWebhookContext {
services: Arc::clone(&services),
verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(),
provider,
transport,
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
window_tracker: Arc::new(chat::transport::whatsapp::MessagingWindowTracker::new()),
allowed_phones: cfg.whatsapp_allowed_phones.clone(),
})
});
// Build Slack webhook context if bot.toml configures transport = "slack".
let slack_ctx: Option<Arc<chat::transport::slack::SlackWebhookContext>> = startup_root
.as_ref()
.and_then(|root| chat::transport::matrix::BotConfig::load(root))
.filter(|cfg| cfg.transport == "slack")
.map(|cfg| {
let transport = Arc::new(chat::transport::slack::SlackTransport::new(
cfg.slack_bot_token.clone().unwrap_or_default(),
));
let root = startup_root.clone().unwrap();
let history = chat::transport::slack::load_slack_history(&root);
let channel_ids: std::collections::HashSet<String> =
cfg.slack_channel_ids.iter().cloned().collect();
Arc::new(chat::transport::slack::SlackWebhookContext {
services: Arc::clone(&services),
signing_secret: cfg.slack_signing_secret.clone().unwrap_or_default(),
transport,
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
channel_ids,
})
});
// Build Discord context if bot.toml configures transport = "discord".
let discord_ctx: Option<Arc<chat::transport::discord::DiscordContext>> = startup_root
.as_ref()
.and_then(|root| chat::transport::matrix::BotConfig::load(root))
.filter(|cfg| cfg.transport == "discord")
.map(|cfg| {
let transport = Arc::new(chat::transport::discord::DiscordTransport::new(
cfg.discord_bot_token.clone().unwrap_or_default(),
));
let root = startup_root.clone().unwrap();
let history = chat::transport::discord::load_discord_history(&root);
let channel_ids: std::collections::HashSet<String> =
cfg.discord_channel_ids.iter().cloned().collect();
let allowed_users: std::collections::HashSet<String> =
cfg.discord_allowed_users.iter().cloned().collect();
Arc::new(chat::transport::discord::DiscordContext {
services: Arc::clone(&services),
bot_token: cfg.discord_bot_token.clone().unwrap_or_default(),
transport,
history: std::sync::Arc::new(tokio::sync::Mutex::new(history)),
history_size: cfg.history_size,
channel_ids,
allowed_users,
})
});
// Build a best-effort shutdown notifier for webhook-based transports.
//
// • Slack: channels are fixed at startup (channel_ids from bot.toml).
// • Discord: channels are fixed at startup (channel_ids from bot.toml).
// • WhatsApp: active senders are tracked at runtime in ambient_rooms.
// We keep the WhatsApp context Arc so we can read the rooms at shutdown.
// • Matrix: the bot task manages its own announcement via matrix_shutdown_tx.
let bot_shutdown_notifier: Option<Arc<BotShutdownNotifier>> = if let Some(ref ctx) = slack_ctx {
let channels: Vec<String> = ctx.channel_ids.iter().cloned().collect();
Some(Arc::new(BotShutdownNotifier::new(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
channels,
ctx.services.bot_name.clone(),
)))
} else if let Some(ref ctx) = discord_ctx {
let channels: Vec<String> = ctx.channel_ids.iter().cloned().collect();
Some(Arc::new(BotShutdownNotifier::new(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
channels,
ctx.services.bot_name.clone(),
)))
} else {
None
};
// Retain a reference to the WhatsApp context for shutdown notifications.
// At shutdown time we read ambient_rooms to get the current set of active senders.
let whatsapp_ctx_for_shutdown: Option<Arc<chat::transport::whatsapp::WhatsAppWebhookContext>> =
whatsapp_ctx.clone();
// ── Startup announcements (WhatsApp & Slack) ──────────────────────────
//
// Send "{bot_name} is online." to all known contacts so users know the bot
// is ready. This mirrors the Matrix bot's startup announcement and fires
// on every fresh process start — including after a rebuild/re-exec.
//
// • WhatsApp: send to all phone numbers present in persisted history.
// • Slack: send to all configured channel IDs (channel_ids from bot.toml).
// • Matrix: handled by spawn_bot() below; no action needed here.
if let Some(ref ctx) = whatsapp_ctx {
let transport = Arc::clone(&ctx.transport);
let bot_name = ctx.services.bot_name.clone();
let history: WhatsAppConversationHistory = Arc::clone(&ctx.history);
tokio::spawn(async move {
let senders: Vec<String> = history.lock().await.keys().cloned().collect();
if senders.is_empty() {
return;
}
let notifier = crate::rebuild::BotShutdownNotifier::new(transport, senders, bot_name);
notifier.notify_startup().await;
});
}
if let Some(ref ctx) = slack_ctx {
let transport = Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>;
let bot_name = ctx.services.bot_name.clone();
let channels: Vec<String> = ctx.channel_ids.iter().cloned().collect();
tokio::spawn(async move {
if channels.is_empty() {
return;
}
let notifier = crate::rebuild::BotShutdownNotifier::new(transport, channels, bot_name);
notifier.notify_startup().await;
});
}
if let Some(ref ctx) = discord_ctx {
let transport = Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>;
let bot_name = ctx.services.bot_name.clone();
let channels: Vec<String> = ctx.channel_ids.iter().cloned().collect();
tokio::spawn(async move {
if channels.is_empty() {
return;
}
let notifier = crate::rebuild::BotShutdownNotifier::new(transport, channels, bot_name);
notifier.notify_startup().await;
});
}
// Watch channel: signals the Matrix bot task to send a shutdown announcement.
// `None` initial value means "server is running".
let (matrix_shutdown_tx, matrix_shutdown_rx) =
tokio::sync::watch::channel::<Option<ShutdownReason>>(None);
let matrix_shutdown_tx = Arc::new(matrix_shutdown_tx);
let matrix_shutdown_tx_for_rebuild = Arc::clone(&matrix_shutdown_tx);
// Bug 501: shared rate-limit retry timer store, accessible from MCP tools
// via AppContext so manual interventions (move_story → backlog, stop_agent)
// can cancel pending timers in-memory rather than only on disk.
//
// TODO(bug 501): the matrix bot currently spawns its own TimerStore instance
// in `chat::transport::matrix::bot::run::spawn_bot`. Refactor to consume this
// shared instance via `AppContext.timer_store` so cancellations from MCP
// tools and the bot's tick loop see the same in-memory state.
let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load(
startup_root
.as_ref()
.map(|r| r.join(".huskies").join("timers.json"))
.unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-timers.json")),
));
let timer_store_for_tick = Arc::clone(&timer_store);
let ctx = AppContext {
state: app_state,
store,
workflow,
services: Arc::clone(&services),
watcher_tx,
reconciliation_tx,
perm_tx,
qa_app_process: Arc::new(std::sync::Mutex::new(None)),
bot_shutdown: bot_shutdown_notifier.clone(),
matrix_shutdown_tx: Some(Arc::clone(&matrix_shutdown_tx)),
timer_store,
test_jobs: std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashMap::new())),
};
// Create the per-project event buffer and subscribe it to the watcher channel
// so that pipeline events are buffered for the gateway's `/api/events` poller.
let event_buffer = crate::http::events::EventBuffer::new();
crate::http::events::subscribe_to_watcher(event_buffer.clone(), watcher_rx_for_events);
let app = build_routes(
ctx,
whatsapp_ctx.clone(),
slack_ctx.clone(),
port,
Some(event_buffer),
);
// Unified 1-second background tick loop: fires due timers, detects orphaned
// agents (watchdog), and promotes done→archived items (sweep). Replaces the
// three separate background loops that previously ran independently.
{
let tick_agents = Arc::clone(&startup_agents);
let tick_timer = timer_store_for_tick;
let tick_root = startup_root.clone();
let sweep_cfg = tick_root
.as_ref()
.and_then(|r| config::ProjectConfig::load(r).ok())
.map(|c| c.watcher)
.unwrap_or_default();
let sweep_every = sweep_cfg.sweep_interval_secs.max(1);
let done_retention = std::time::Duration::from_secs(sweep_cfg.done_retention_secs);
let pending_count = tick_timer.list().len();
crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)");
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
let mut tick_count: u64 = 0;
loop {
interval.tick().await;
tick_count = tick_count.wrapping_add(1);
// Timer: fire due timers every second.
if let Some(ref root) = tick_root {
let result =
crate::service::timer::tick_once(&tick_timer, &tick_agents, root).await;
if let Err(msg) = result {
crate::slog_error!("[tick] Timer tick panicked: {msg}");
}
}
// Watchdog: detect orphaned Running agents every 30 ticks.
if tick_count.is_multiple_of(30) {
let found = tick_agents.run_watchdog_pass(tick_root.as_deref());
if found > 0 {
crate::slog!(
"[tick] {found} orphaned agent(s) detected; triggering auto-assign."
);
if let Some(ref root) = tick_root {
tick_agents.auto_assign_available_work(root).await;
}
}
}
// Sweep: promote done→archived every sweep_interval_secs ticks.
if tick_count.is_multiple_of(sweep_every) {
crate::io::watcher::sweep_done_to_archived(done_retention);
}
}
});
}
// Optional Matrix bot: connect to the homeserver and start listening for
// messages if `.huskies/bot.toml` is present and enabled.
if let Some(ref root) = startup_root {
let _ = chat::transport::matrix::spawn_bot(
root,
watcher_tx_for_bot,
Arc::clone(&services),
matrix_shutdown_rx,
None,
vec![],
std::collections::BTreeMap::new(),
);
} else {
// Keep the receiver alive (drop it) so the sender never errors.
drop(matrix_shutdown_rx);
}
// Spawn stage-transition notification listeners for WhatsApp and Slack.
// These mirror the listener that the Matrix bot spawns internally.
if let (Some(ctx), Some(root)) = (&whatsapp_ctx, &startup_root) {
let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms);
crate::service::notifications::spawn_notification_listener(
Arc::clone(&ctx.transport),
move || ambient_rooms.lock().unwrap().iter().cloned().collect(),
watcher_rx_for_whatsapp,
root.clone(),
);
} else {
drop(watcher_rx_for_whatsapp);
}
if let (Some(ctx), Some(root)) = (&slack_ctx, &startup_root) {
let channel_ids: Vec<String> = ctx.channel_ids.iter().cloned().collect();
crate::service::notifications::spawn_notification_listener(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
move || channel_ids.clone(),
watcher_rx_for_slack,
root.clone(),
);
} else {
drop(watcher_rx_for_slack);
}
if let (Some(ctx), Some(root)) = (&discord_ctx, &startup_root) {
// Spawn the Discord Gateway WebSocket listener.
chat::transport::discord::gateway::spawn_gateway(Arc::clone(ctx));
// Spawn stage-transition notification listener for Discord.
let channel_ids: Vec<String> = ctx.channel_ids.iter().cloned().collect();
crate::service::notifications::spawn_notification_listener(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
move || channel_ids.clone(),
watcher_rx_for_discord,
root.clone(),
);
} else {
drop(watcher_rx_for_discord);
}
// On startup:
// 1. Reconcile any stories whose agent work was committed while the server was
// offline (worktree has commits ahead of master but pipeline didn't advance).
// 2. Auto-assign free agents to remaining unassigned work in the pipeline.
if let Some(root) = startup_root {
tokio::spawn(async move {
slog!("[startup] Reconciling completed worktrees from previous session.");
startup_agents
.reconcile_on_startup(&root, &startup_reconciliation_tx)
.await;
slog!("[auto-assign] Scanning pipeline stages for unassigned work.");
startup_agents.auto_assign_available_work(&root).await;
});
}
let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let addr = format!("{host}:{port}");
println!("\x1b[97;1m");
println!(" /\\_/\\ \x1b[96;1m _ _ _ _ \x1b[97;1m");
println!(" / o o \\ \x1b[96;1m| | | |_ _ ___| | _(_) ___ ___\x1b[97;1m");
println!(" ( Y ) \x1b[96;1m| |_| | | | / __| |/ / |/ _ \\/ __|\x1b[97;1m");
println!(" \\ ^ / \x1b[96;1m| _ | |_| \\__ \\ <| | __/\\__ \\\x1b[97;1m");
println!(" )===( \\ \x1b[96;1m|_| |_|\\__,_|___/_|\\_\\_|\\___||___/\x1b[97;1m");
println!(" / \\ \\ \x1b[90mStory-driven development, powered by AI\x1b[97;1m");
println!(" | | | |");
println!(" /| | |\\|");
println!(" \\|__|__|/\x1b[0m");
println!();
println!("HUSKIES_PORT={port}");
println!("\x1b[96;1mFrontend:\x1b[0m \x1b[94mhttp://{addr}\x1b[0m");
println!("\x1b[92;1mOpenAPI Docs:\x1b[0m \x1b[94mhttp://{addr}/docs\x1b[0m");
let port_file = write_port_file(&cwd, port);
let result = Server::new(TcpListener::bind(&addr)).run(app).await;
// ── Shutdown notifications (best-effort) ─────────────────────────────
//
// The server is stopping (SIGINT / SIGTERM). Notify active bot channels
// so participants know the bot is going offline. We do this before killing
// PTY children so network I/O can still complete.
// Slack: notifier holds the fixed channel list.
if let Some(ref notifier) = bot_shutdown_notifier {
notifier.notify(ShutdownReason::Manual).await;
}
// WhatsApp: read the current set of ambient rooms and notify each sender.
if let Some(ref ctx) = whatsapp_ctx_for_shutdown {
let rooms: Vec<String> = ctx
.services
.ambient_rooms
.lock()
.unwrap()
.iter()
.cloned()
.collect();
if !rooms.is_empty() {
let wa_notifier = BotShutdownNotifier::new(
Arc::clone(&ctx.transport) as Arc<dyn crate::chat::ChatTransport>,
rooms,
ctx.services.bot_name.clone(),
);
wa_notifier.notify(ShutdownReason::Manual).await;
}
}
// Matrix: signal the bot task and give it a short window to send its message.
let _ = matrix_shutdown_tx_for_rebuild.send(Some(ShutdownReason::Manual));
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
// ── Cleanup ──────────────────────────────────────────────────────────
// Kill all active PTY child processes before exiting to prevent orphaned
// Claude Code processes from running after the server restarts.
agents_for_shutdown.kill_all_children();
if let Some(ref path) = port_file {
remove_port_file(path);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[should_panic(expected = "Invalid project.toml: Duplicate agent name")]
fn panics_on_duplicate_agent_names() {
let tmp = tempfile::tempdir().unwrap();
let sk = tmp.path().join(".huskies");
std::fs::create_dir_all(&sk).unwrap();
std::fs::write(
sk.join("project.toml"),
r#"
[[agent]]
name = "coder"
[[agent]]
name = "coder"
"#,
)
.unwrap();
config::ProjectConfig::load(tmp.path())
.unwrap_or_else(|e| panic!("Invalid project.toml: {e}"));
}
}