From 46b1e84629592eef6a9a7dd6d7ad438826d9ba25 Mon Sep 17 00:00:00 2001 From: dave Date: Tue, 28 Apr 2026 19:12:55 +0000 Subject: [PATCH] huskies: merge 791 --- server/src/main.rs | 822 +++----------------------------- server/src/startup/bots.rs | 370 ++++++++++++++ server/src/startup/mod.rs | 5 + server/src/startup/project.rs | 213 +++++++++ server/src/startup/tick_loop.rs | 187 ++++++++ 5 files changed, 841 insertions(+), 756 deletions(-) create mode 100644 server/src/startup/bots.rs create mode 100644 server/src/startup/mod.rs create mode 100644 server/src/startup/project.rs create mode 100644 server/src/startup/tick_loop.rs diff --git a/server/src/main.rs b/server/src/main.rs index 2e1b588a..4b385e02 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -26,18 +26,17 @@ pub(crate) mod pipeline_state; pub mod rebuild; mod service; pub mod services; +mod startup; mod state; mod store; mod workflow; mod worktree; use crate::agents::AgentPool; -use crate::chat::transport::whatsapp::WhatsAppConversationHistory; use crate::http::build_routes; use crate::http::context::AppContext; use crate::http::{remove_port_file, resolve_port, write_port_file}; -use crate::io::fs::find_story_kit_root; -use crate::rebuild::{BotShutdownNotifier, ShutdownReason}; +use crate::rebuild::ShutdownReason; use crate::state::SessionState; use crate::store::JsonFileStore; use crate::workflow::WorkflowState; @@ -78,7 +77,7 @@ async fn main() -> Result<(), std::io::Error> { let app_state = Arc::new(SessionState::default()); let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")); // Migrate legacy root-level store.json into .huskies/ if the new path does - // not yet exist. This keeps existing deployments working after upgrade. + // not yet exist. let legacy_store_path = cwd.join("store.json"); let store_path = cwd.join(".huskies").join("store.json"); if legacy_store_path.exists() && !store_path.exists() { @@ -108,12 +107,9 @@ async fn main() -> Result<(), std::io::Error> { let explicit_path = resolve_path_arg(cli.path.as_deref(), &cwd); // Port resolution: CLI flag > project.toml (loaded later) > default. - // Use the CLI port for scaffolding .mcp.json; final port is resolved - // after the project root is known. let port = cli.port.unwrap_or_else(resolve_port); - // When a path is given explicitly on the CLI, it must already exist as a - // directory. We do not create directories from the command line. + // When a path is given explicitly on the CLI, it must already exist as a directory. if let Some(ref path) = explicit_path { if !path.exists() { eprintln!("error: path does not exist: {}", path.display()); @@ -125,232 +121,34 @@ async fn main() -> Result<(), std::io::Error> { } } - // ── Gateway mode: multi-project proxy ────────────────────────────── - // - // When `huskies --gateway` is invoked, skip the normal single-project - // server and instead start a lightweight proxy that routes MCP calls - // to per-project Docker containers based on a projects.toml config. + // ── Gateway mode: multi-project proxy ──────────────────────────────────── if is_gateway { let config_dir = explicit_path.unwrap_or_else(|| cwd.clone()); let config_path = config_dir.join("projects.toml"); return gateway::run(&config_path, port).await; } - if is_init { - // `huskies init [PATH]` — always scaffold, never search parents. - let init_root = explicit_path.unwrap_or_else(|| cwd.clone()); - if !init_root.exists() { - std::fs::create_dir_all(&init_root).unwrap_or_else(|e| { - eprintln!( - "error: cannot create directory {}: {e}", - init_root.display() - ); - std::process::exit(1); - }); - } - match io::fs::open_project( - init_root.to_string_lossy().to_string(), - &app_state, - store.as_ref(), - port, - ) - .await - { - Ok(_) => { - if let Some(root) = app_state.project_root.lock().unwrap().as_ref() { - config::ProjectConfig::load(root) - .unwrap_or_else(|e| panic!("Invalid project.toml: {e}")); - // Initialize wizard state for the setup flow. - io::wizard::WizardState::init_if_missing(root); - } - } - Err(e) => { - eprintln!("error: {e}"); - std::process::exit(1); - } - } - } else if let Some(explicit_root) = explicit_path { - // An explicit path was given on the command line. - // Open it directly — scaffold .huskies/ if it is missing — and - // exit with a clear error message if the path is invalid. - match io::fs::open_project( - explicit_root.to_string_lossy().to_string(), - &app_state, - store.as_ref(), - port, - ) - .await - { - Ok(_) => { - if let Some(root) = app_state.project_root.lock().unwrap().as_ref() { - config::ProjectConfig::load(root) - .unwrap_or_else(|e| panic!("Invalid project.toml: {e}")); - } - } - Err(e) => { - eprintln!("error: {e}"); - std::process::exit(1); - } - } - } else { - // No path argument — auto-detect a .huskies/ project in cwd or - // parent directories (preserves existing behaviour). - if let Some(project_root) = find_story_kit_root(&cwd) { - io::fs::open_project( - project_root.to_string_lossy().to_string(), - &app_state, - store.as_ref(), - port, - ) - .await - .unwrap_or_else(|e| { - slog!("Warning: failed to auto-open project at {project_root:?}: {e}"); - project_root.to_string_lossy().to_string() - }); + startup::project::open_project_root(is_init, explicit_path, &cwd, &app_state, &store, port) + .await; - // Validate agent config for the detected project root. - config::ProjectConfig::load(&project_root) - .unwrap_or_else(|e| panic!("Invalid project.toml: {e}")); - } else { - // No .huskies/ found in cwd or parents — scaffold cwd as a new - // project, exactly like `huskies .` does. - io::fs::open_project( - cwd.to_string_lossy().to_string(), - &app_state, - store.as_ref(), - port, - ) - .await - .unwrap_or_else(|e| { - slog!("Warning: failed to scaffold project at {cwd:?}: {e}"); - cwd.to_string_lossy().to_string() - }); - } - } + startup::project::init_subsystems(&app_state, &cwd).await; - // Enable persistent server log file now that the project root is known. - if let Some(ref root) = *app_state.project_root.lock().unwrap() { - let log_dir = root.join(".huskies").join("logs"); - let _ = std::fs::create_dir_all(&log_dir); - log_buffer::global().set_log_file(log_dir.join("server.log")); - } - - // Initialise the node's Ed25519 identity keypair (file-based, mode 0600). - // The key is stored at .huskies/node_identity.key and persisted across - // restarts. The public key is exposed via GET /identity. - { - let key_path = app_state - .project_root - .lock() - .unwrap() - .as_ref() - .map(|root| root.join(".huskies").join("node_identity.key")) - .unwrap_or_else(|| cwd.join(".huskies").join("node_identity.key")); - if let Err(e) = node_identity::init_identity(&key_path) { - slog!("[identity] Failed to initialise node identity keypair: {e}"); - } else if let Some(id) = node_identity::get_identity() { - slog!("[identity] Node ID: {}", id.node_id); - } - } - - // Initialise the SQLite pipeline shadow-write database and CRDT state layer. - // Clone the path out before the await so we don't hold the MutexGuard across - // an await point. - let pipeline_db_path = app_state - .project_root - .lock() - .unwrap() - .as_ref() - .map(|root| root.join(".huskies").join("pipeline.db")); - if let Some(ref db_path) = pipeline_db_path { - if let Err(e) = db::init(db_path).await { - slog!("[db] Failed to initialise pipeline.db: {e}"); - } - if let Err(e) = crdt_state::init(db_path).await { - slog!("[crdt] Failed to initialise CRDT state layer: {e}"); - } else { - // Migrate items that have an empty name field: derive the name - // from the story ID slug. No-op for items that already have a name. - crdt_state::migrate_names_from_slugs(); - - // Migrate story IDs from slug form ("664_story_...") to numeric-only - // ("664"). Returns migrated pairs so we can rename filesystem artifacts. - // No-op when all IDs are already numeric. - let id_migrations = crdt_state::migrate_story_ids_to_numeric(); - if !id_migrations.is_empty() { - // Derive the project root from the db_path: .huskies/pipeline.db - // lives two levels below the project root. - if let Some(project_root) = db_path.parent().and_then(|p| p.parent()) { - worktree::migrate_slug_paths(project_root, &id_migrations); - } - } - } - } - - // (CRDT state layer is initialised above alongside the legacy pipeline.db.) - - // Load trusted keys, token auth config, and start the CRDT sync rendezvous - // client if configured. In agent mode, the --rendezvous flag overrides - // project.toml. The --join-token / HUSKIES_JOIN_TOKEN is appended to the - // rendezvous URL as ?token=... so the server's bearer-token check passes. let crdt_join_token = cli .join_token .clone() .or_else(|| std::env::var("HUSKIES_JOIN_TOKEN").ok()); - let sync_config = if is_agent { - agent_rendezvous - .clone() - .map(|url| (url, Vec::new(), false, Vec::new())) - } else { - app_state - .project_root - .lock() - .unwrap() - .as_ref() - .and_then(|root| config::ProjectConfig::load(root).ok()) - .and_then(|cfg| { - cfg.rendezvous.map(|url| { - ( - url, - cfg.trusted_keys, - cfg.crdt_require_token, - cfg.crdt_tokens, - ) - }) - }) - }; - if let Some((rendezvous_url, trusted_keys, require_token, crdt_tokens)) = sync_config { - crdt_sync::init_trusted_keys(trusted_keys); - crdt_sync::init_token_auth(require_token, crdt_tokens); - crdt_sync::spawn_rendezvous_client(rendezvous_url, crdt_join_token); - } else { - // Even without rendezvous, initialise trusted keys and token auth for - // incoming connections. - let (keys, require_token, crdt_tokens) = app_state - .project_root - .lock() - .unwrap() - .as_ref() - .and_then(|root| config::ProjectConfig::load(root).ok()) - .map(|cfg| (cfg.trusted_keys, cfg.crdt_require_token, cfg.crdt_tokens)) - .unwrap_or_default(); - crdt_sync::init_trusted_keys(keys); - crdt_sync::init_token_auth(require_token, crdt_tokens); - } + startup::project::configure_crdt_sync( + &app_state, + is_agent, + agent_rendezvous.clone(), + crdt_join_token, + ); - // ── Agent mode: headless build agent ──────────────────────────────── - // - // When `huskies agent --rendezvous ` is invoked, skip the web UI, - // chat bots, and HTTP server entirely. Instead, run a headless loop that: - // 1. Syncs CRDT state with the rendezvous peer. - // 2. Scans for unclaimed work and claims it via CRDT. - // 3. Runs Claude Code locally for claimed stories. - // 4. Pushes feature branches and reports completion via CRDT. + // ── Agent mode: headless build agent ───────────────────────────────────── if is_agent { let agent_root = app_state.project_root.lock().unwrap().clone(); let rendezvous = agent_rendezvous.expect("agent mode requires --rendezvous"); - // Join token / gateway URL can come from CLI flags or environment variables. let join_token = cli .join_token .clone() @@ -365,318 +163,78 @@ async fn main() -> Result<(), std::io::Error> { let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default())); // Event bus: broadcast channel for pipeline lifecycle events. - // Created before AgentPool so the pool can emit AgentStateChanged events. let (watcher_tx, _) = broadcast::channel::(1024); let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); - // Filesystem watcher: watches config files (project.toml, agents.toml) for - // hot-reload. Work-item pipeline events are driven by CRDT state transitions - // via crdt_state::subscribe(). Sweep (done→archived) is handled by the unified - // background tick loop below. + // Filesystem watcher: watches config files for hot-reload. if let Some(ref root) = *app_state.project_root.lock().unwrap() { io::watcher::start_watcher(root.clone(), watcher_tx.clone()); } - // Bridge CRDT state-transition events to the watcher broadcast channel. - // This replaces the filesystem watcher as the source of WorkItem events. - // Also prunes worktrees when stories transition to 6_archived. - { - let crdt_watcher_tx = watcher_tx.clone(); - let crdt_prune_root: Option = app_state.project_root.lock().unwrap().clone(); - if let Some(mut crdt_rx) = crdt_state::subscribe() { - tokio::spawn(async move { - while let Ok(evt) = crdt_rx.recv().await { - // Prune the worktree when a story is archived. - if crate::pipeline_state::Stage::from_dir(&evt.to_stage) - .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) - && let Some(root) = crdt_prune_root.as_ref().cloned() - { - let story_id = evt.story_id.clone(); - tokio::task::spawn_blocking(move || { - if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) { - crate::slog!("[crdt] worktree prune failed for {story_id}: {e}"); - } - }); - } - let (action, commit_msg) = - io::watcher::stage_metadata(&evt.to_stage, &evt.story_id) - .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); - let watcher_evt = io::watcher::WatcherEvent::WorkItem { - stage: evt.to_stage, - item_id: evt.story_id, - action: action.to_string(), - commit_msg, - from_stage: evt.from_stage, - }; - let _ = crdt_watcher_tx.send(watcher_evt); - } - }); - } - } + // Spawn CRDT→watcher bridge and auto-assign subscriber. + startup::tick_loop::spawn_event_bridges( + watcher_tx.clone(), + app_state.project_root.lock().unwrap().clone(), + Arc::clone(&agents), + ); - // Subscribe to watcher events so that auto-assign triggers when a work item - // enters an active pipeline stage (2_current/, 3_qa/, 4_merge/). - { - let watcher_auto_rx = watcher_tx.subscribe(); - let watcher_auto_agents = Arc::clone(&agents); - let watcher_auto_root: Option = app_state.project_root.lock().unwrap().clone(); - if let Some(root) = watcher_auto_root { - tokio::spawn(async move { - let mut rx = watcher_auto_rx; - while let Ok(event) = rx.recv().await { - if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event - && crate::pipeline_state::Stage::from_dir(stage.as_str()) - .is_some_and(|s| s.is_active()) - { - slog!( - "[auto-assign] CRDT transition detected in {stage}/; \ - triggering auto-assign." - ); - watcher_auto_agents.auto_assign_available_work(&root).await; - } - } - }); - } - } - - // Reconciliation progress channel: startup reconciliation → WebSocket clients. + // Reconciliation progress channel and permission channel. let (reconciliation_tx, _) = broadcast::channel::(64); - - // Permission channel: MCP prompt_permission → WebSocket handler. let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); - // Clone watcher_tx for the Matrix bot before it is moved into AppContext. let watcher_tx_for_bot = watcher_tx.clone(); - // Subscribe to watcher events for WhatsApp/Slack notification listeners - // before watcher_tx is moved into AppContext. let watcher_rx_for_whatsapp = watcher_tx.subscribe(); let watcher_rx_for_slack = watcher_tx.subscribe(); let watcher_rx_for_discord = watcher_tx.subscribe(); - // Subscribe to watcher events for the per-project event buffer (gateway polling). let watcher_rx_for_events = watcher_tx.subscribe(); - // Wrap perm_rx in Arc so it can be shared across the Services - // bundle (AppContext + Matrix bot) and the webhook-based transports. + let perm_rx = Arc::new(tokio::sync::Mutex::new(perm_rx)); - // Capture project root, agents Arc, and reconciliation sender before ctx - // is consumed by build_routes. let startup_root: Option = app_state.project_root.lock().unwrap().clone(); let startup_agents = Arc::clone(&agents); let startup_reconciliation_tx = reconciliation_tx.clone(); - // Clone for shutdown cleanup — kill orphaned PTY children before exiting. let agents_for_shutdown = Arc::clone(&agents); - // ── Construct the shared Services bundle ──────────────────────────── - // - // A single `Arc` is built here and cloned into `AppContext` - // and the Matrix `BotContext`. Bot-level fields (name, user-id, etc.) - // come from `bot.toml` when present; otherwise sensible defaults apply. - let bot_cfg_for_services = startup_root + // ── Construct the shared Services bundle ────────────────────────────────── + let bot_cfg = startup_root .as_ref() .and_then(|root| chat::transport::matrix::BotConfig::load(root)); let services = Arc::new(services::Services { project_root: startup_root.clone().unwrap_or_default(), agents: Arc::clone(&agents), - bot_name: bot_cfg_for_services + bot_name: bot_cfg .as_ref() .and_then(|c| c.display_name.clone()) .unwrap_or_else(|| "Assistant".to_string()), bot_user_id: String::new(), ambient_rooms: Arc::new(std::sync::Mutex::new( - bot_cfg_for_services + bot_cfg .as_ref() .map(|c| c.ambient_rooms.iter().cloned().collect()) .unwrap_or_default(), )), perm_rx: Arc::clone(&perm_rx), pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), - permission_timeout_secs: bot_cfg_for_services + permission_timeout_secs: bot_cfg .as_ref() .map(|c| c.permission_timeout_secs) .unwrap_or(120), status: agents.status_broadcaster(), }); - // Build WhatsApp webhook context if bot.toml configures transport = "whatsapp". - let whatsapp_ctx: Option> = startup_root - .as_ref() - .and_then(|root| chat::transport::matrix::BotConfig::load(root)) - .filter(|cfg| cfg.transport == "whatsapp") - .map(|cfg| { - let provider = cfg.whatsapp_provider.clone(); - let transport: Arc = if provider == "twilio" { - Arc::new(chat::transport::whatsapp::TwilioWhatsAppTransport::new( - cfg.twilio_account_sid.clone().unwrap_or_default(), - cfg.twilio_auth_token.clone().unwrap_or_default(), - cfg.twilio_whatsapp_number.clone().unwrap_or_default(), - )) - } else { - let template_name = cfg - .whatsapp_notification_template - .clone() - .unwrap_or_else(|| "pipeline_notification".to_string()); - Arc::new(chat::transport::whatsapp::WhatsAppTransport::new( - cfg.whatsapp_phone_number_id.clone().unwrap_or_default(), - cfg.whatsapp_access_token.clone().unwrap_or_default(), - template_name, - )) - }; - let root = startup_root.clone().unwrap(); - let history = chat::transport::whatsapp::load_whatsapp_history(&root); - Arc::new(chat::transport::whatsapp::WhatsAppWebhookContext { - services: Arc::clone(&services), - verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(), - provider, - transport, - history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), - history_size: cfg.history_size, - window_tracker: Arc::new(chat::transport::whatsapp::MessagingWindowTracker::new()), - allowed_phones: cfg.whatsapp_allowed_phones.clone(), - }) - }); + // ── Build bot contexts (WhatsApp / Slack / Discord) ─────────────────────── + let (bot_ctxs, matrix_shutdown_rx) = + startup::bots::build_bot_contexts(&startup_root, &services); + startup::bots::spawn_startup_announcements(&bot_ctxs); - // Build Slack webhook context if bot.toml configures transport = "slack". - let slack_ctx: Option> = startup_root - .as_ref() - .and_then(|root| chat::transport::matrix::BotConfig::load(root)) - .filter(|cfg| cfg.transport == "slack") - .map(|cfg| { - let transport = Arc::new(chat::transport::slack::SlackTransport::new( - cfg.slack_bot_token.clone().unwrap_or_default(), - )); - let root = startup_root.clone().unwrap(); - let history = chat::transport::slack::load_slack_history(&root); - let channel_ids: std::collections::HashSet = - cfg.slack_channel_ids.iter().cloned().collect(); - Arc::new(chat::transport::slack::SlackWebhookContext { - services: Arc::clone(&services), - signing_secret: cfg.slack_signing_secret.clone().unwrap_or_default(), - transport, - history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), - history_size: cfg.history_size, - channel_ids, - }) - }); + let matrix_shutdown_tx_for_rebuild = Arc::clone(&bot_ctxs.matrix_shutdown_tx); - // Build Discord context if bot.toml configures transport = "discord". - let discord_ctx: Option> = startup_root - .as_ref() - .and_then(|root| chat::transport::matrix::BotConfig::load(root)) - .filter(|cfg| cfg.transport == "discord") - .map(|cfg| { - let transport = Arc::new(chat::transport::discord::DiscordTransport::new( - cfg.discord_bot_token.clone().unwrap_or_default(), - )); - let root = startup_root.clone().unwrap(); - let history = chat::transport::discord::load_discord_history(&root); - let channel_ids: std::collections::HashSet = - cfg.discord_channel_ids.iter().cloned().collect(); - let allowed_users: std::collections::HashSet = - cfg.discord_allowed_users.iter().cloned().collect(); - Arc::new(chat::transport::discord::DiscordContext { - services: Arc::clone(&services), - bot_token: cfg.discord_bot_token.clone().unwrap_or_default(), - transport, - history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), - history_size: cfg.history_size, - channel_ids, - allowed_users, - }) - }); - - // Build a best-effort shutdown notifier for webhook-based transports. - // - // • Slack: channels are fixed at startup (channel_ids from bot.toml). - // • Discord: channels are fixed at startup (channel_ids from bot.toml). - // • WhatsApp: active senders are tracked at runtime in ambient_rooms. - // We keep the WhatsApp context Arc so we can read the rooms at shutdown. - // • Matrix: the bot task manages its own announcement via matrix_shutdown_tx. - let bot_shutdown_notifier: Option> = if let Some(ref ctx) = slack_ctx { - let channels: Vec = ctx.channel_ids.iter().cloned().collect(); - Some(Arc::new(BotShutdownNotifier::new( - Arc::clone(&ctx.transport) as Arc, - channels, - ctx.services.bot_name.clone(), - ))) - } else if let Some(ref ctx) = discord_ctx { - let channels: Vec = ctx.channel_ids.iter().cloned().collect(); - Some(Arc::new(BotShutdownNotifier::new( - Arc::clone(&ctx.transport) as Arc, - channels, - ctx.services.bot_name.clone(), - ))) - } else { - None - }; - // Retain a reference to the WhatsApp context for shutdown notifications. - // At shutdown time we read ambient_rooms to get the current set of active senders. - let whatsapp_ctx_for_shutdown: Option> = - whatsapp_ctx.clone(); - - // ── Startup announcements (WhatsApp & Slack) ────────────────────────── - // - // Send "{bot_name} is online." to all known contacts so users know the bot - // is ready. This mirrors the Matrix bot's startup announcement and fires - // on every fresh process start — including after a rebuild/re-exec. - // - // • WhatsApp: send to all phone numbers present in persisted history. - // • Slack: send to all configured channel IDs (channel_ids from bot.toml). - // • Matrix: handled by spawn_bot() below; no action needed here. - if let Some(ref ctx) = whatsapp_ctx { - let transport = Arc::clone(&ctx.transport); - let bot_name = ctx.services.bot_name.clone(); - let history: WhatsAppConversationHistory = Arc::clone(&ctx.history); - tokio::spawn(async move { - let senders: Vec = history.lock().await.keys().cloned().collect(); - if senders.is_empty() { - return; - } - let notifier = crate::rebuild::BotShutdownNotifier::new(transport, senders, bot_name); - notifier.notify_startup().await; - }); - } - if let Some(ref ctx) = slack_ctx { - let transport = Arc::clone(&ctx.transport) as Arc; - let bot_name = ctx.services.bot_name.clone(); - let channels: Vec = ctx.channel_ids.iter().cloned().collect(); - tokio::spawn(async move { - if channels.is_empty() { - return; - } - let notifier = crate::rebuild::BotShutdownNotifier::new(transport, channels, bot_name); - notifier.notify_startup().await; - }); - } - if let Some(ref ctx) = discord_ctx { - let transport = Arc::clone(&ctx.transport) as Arc; - let bot_name = ctx.services.bot_name.clone(); - let channels: Vec = ctx.channel_ids.iter().cloned().collect(); - tokio::spawn(async move { - if channels.is_empty() { - return; - } - let notifier = crate::rebuild::BotShutdownNotifier::new(transport, channels, bot_name); - notifier.notify_startup().await; - }); - } - - // Watch channel: signals the Matrix bot task to send a shutdown announcement. - // `None` initial value means "server is running". - let (matrix_shutdown_tx, matrix_shutdown_rx) = - tokio::sync::watch::channel::>(None); - let matrix_shutdown_tx = Arc::new(matrix_shutdown_tx); - let matrix_shutdown_tx_for_rebuild = Arc::clone(&matrix_shutdown_tx); - - // Shared rate-limit retry timer store, accessible from MCP tools via - // AppContext so manual interventions (move_story → backlog, stop_agent) - // can cancel pending timers in-memory rather than only on disk. - // Also shared with the Matrix bot tick loop (bug 655). + // Shared rate-limit retry timer store. let timer_store = std::sync::Arc::new(crate::service::timer::TimerStore::load( startup_root .as_ref() .map(|r| r.join(".huskies").join("timers.json")) .unwrap_or_else(|| std::path::PathBuf::from("/tmp/huskies-timers.json")), )); - let timer_store_for_tick = Arc::clone(&timer_store); let timer_store_for_bot = Arc::clone(&timer_store); @@ -689,117 +247,34 @@ async fn main() -> Result<(), std::io::Error> { reconciliation_tx, perm_tx, qa_app_process: Arc::new(std::sync::Mutex::new(None)), - bot_shutdown: bot_shutdown_notifier.clone(), - matrix_shutdown_tx: Some(Arc::clone(&matrix_shutdown_tx)), + bot_shutdown: bot_ctxs.shutdown_notifier.clone(), + matrix_shutdown_tx: Some(Arc::clone(&bot_ctxs.matrix_shutdown_tx)), timer_store, }; - // Create the per-project event buffer and subscribe it to the watcher channel - // so that pipeline events are buffered for the gateway's `/api/events` poller. + // Per-project event buffer for the gateway's `/api/events` poller. let event_buffer = crate::http::events::EventBuffer::new(); crate::http::events::subscribe_to_watcher(event_buffer.clone(), watcher_rx_for_events); - // ── Gateway relay task ─────────────────────────────────────────────────── - // - // When `gateway_url` is configured (via project.toml or HUSKIES_GATEWAY_URL) - // start a background task that pushes StatusEvents to the gateway's - // /gateway/events/push WebSocket endpoint. The project name sent to the - // gateway defaults to the project root directory name when `gateway_project` - // is not explicitly set. - { - let relay_gateway_url = startup_root - .as_ref() - .and_then(|r| config::ProjectConfig::load(r).ok()) - .and_then(|c| c.gateway_url) - .or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok()) - .unwrap_or_default(); - - if !relay_gateway_url.is_empty() { - let relay_project_name = startup_root - .as_ref() - .and_then(|r| config::ProjectConfig::load(r).ok()) - .and_then(|c| c.gateway_project) - .or_else(|| std::env::var("HUSKIES_GATEWAY_PROJECT").ok()) - .or_else(|| { - startup_root - .as_ref() - .and_then(|r| r.file_name()) - .map(|n| n.to_string_lossy().into_owned()) - }) - .unwrap_or_else(|| "project".to_string()); - - gateway_relay::spawn_relay_task( - relay_gateway_url, - relay_project_name, - Arc::clone(&services.status), - reqwest::Client::new(), - ); - } - } + // Gateway relay task (pushes StatusEvents to a configured gateway). + startup::tick_loop::spawn_gateway_relay(&startup_root, Arc::clone(&services.status)); let app = build_routes( ctx, - whatsapp_ctx.clone(), - slack_ctx.clone(), + bot_ctxs.whatsapp_ctx.clone(), + bot_ctxs.slack_ctx.clone(), port, Some(event_buffer), ); - // Unified 1-second background tick loop: fires due timers, detects orphaned - // agents (watchdog), and promotes done→archived items (sweep). Replaces the - // three separate background loops that previously ran independently. - { - let tick_agents = Arc::clone(&startup_agents); - let tick_timer = timer_store_for_tick; - let tick_root = startup_root.clone(); - let sweep_cfg = tick_root - .as_ref() - .and_then(|r| config::ProjectConfig::load(r).ok()) - .map(|c| c.watcher) - .unwrap_or_default(); - let sweep_every = sweep_cfg.sweep_interval_secs.max(1); - let done_retention = std::time::Duration::from_secs(sweep_cfg.done_retention_secs); - let pending_count = tick_timer.list().len(); - crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)"); - tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); - let mut tick_count: u64 = 0; - loop { - interval.tick().await; - tick_count = tick_count.wrapping_add(1); + // Unified 1-second background tick loop. + startup::tick_loop::spawn_tick_loop( + Arc::clone(&startup_agents), + timer_store_for_tick, + startup_root.clone(), + ); - // Timer: fire due timers every second. - if let Some(ref root) = tick_root { - let result = - crate::service::timer::tick_once(&tick_timer, &tick_agents, root).await; - if let Err(msg) = result { - crate::slog_error!("[tick] Timer tick panicked: {msg}"); - } - } - - // Watchdog: detect orphaned Running agents every 30 ticks. - if tick_count.is_multiple_of(30) { - let found = tick_agents.run_watchdog_pass(tick_root.as_deref()); - if found > 0 { - crate::slog!( - "[tick] {found} orphaned agent(s) detected; triggering auto-assign." - ); - if let Some(ref root) = tick_root { - tick_agents.auto_assign_available_work(root).await; - } - } - } - - // Sweep: promote done→archived every sweep_interval_secs ticks. - if tick_count.is_multiple_of(sweep_every) { - crate::io::watcher::sweep_done_to_archived(done_retention); - } - } - }); - } - - // Optional Matrix bot: connect to the homeserver and start listening for - // messages if `.huskies/bot.toml` is present and enabled. + // Optional Matrix bot. if let Some(ref root) = startup_root { let _ = chat::transport::matrix::spawn_bot( root, @@ -813,159 +288,25 @@ async fn main() -> Result<(), std::io::Error> { None, ); } else { - // Keep the receiver alive (drop it) so the sender never errors. drop(matrix_shutdown_rx); } - // Spawn stage-transition notification listeners for WhatsApp and Slack. - // These mirror the listener that the Matrix bot spawns internally. - if let (Some(ctx), Some(root)) = (&whatsapp_ctx, &startup_root) { - let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms); - crate::service::notifications::spawn_notification_listener( - Arc::clone(&ctx.transport), - move || ambient_rooms.lock().unwrap().iter().cloned().collect(), - watcher_rx_for_whatsapp, - root.clone(), - ); + // Notification listeners for WhatsApp, Slack, Discord. + startup::bots::spawn_notification_listeners( + &bot_ctxs, + &startup_root, + watcher_rx_for_whatsapp, + watcher_rx_for_slack, + watcher_rx_for_discord, + ); - // Subscribe to the status broadcaster if the whatsapp_status_consumer toggle - // is enabled (default: true). Formats each StatusEvent via the common - // formatter and sends the resulting text to all active WhatsApp senders. - // The task exits automatically when the broadcaster is dropped on shutdown. - { - use crate::service::status::format::format_status_event; + // Reconcile completed worktrees and auto-assign free agents. + startup::tick_loop::spawn_startup_reconciliation( + startup_root.clone(), + startup_agents, + startup_reconciliation_tx, + ); - let status_enabled = config::ProjectConfig::load(root) - .map(|c| c.whatsapp_status_consumer) - .unwrap_or(true); - - if status_enabled { - let mut sub = ctx.services.status.subscribe(); - let transport = Arc::clone(&ctx.transport) as Arc; - let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms); - tokio::spawn(async move { - while let Some(event) = sub.recv().await { - let plain = format_status_event(&event); - let rooms: Vec = - ambient_rooms.lock().unwrap().iter().cloned().collect(); - for room in &rooms { - if let Err(e) = transport.send_message(room, &plain, "").await { - crate::slog!( - "[whatsapp] Failed to send status event to {room}: {e}" - ); - } - } - } - crate::slog!("[whatsapp] Status subscriber task exiting — broadcaster dropped"); - }); - } - } - } else { - drop(watcher_rx_for_whatsapp); - } - if let (Some(ctx), Some(root)) = (&slack_ctx, &startup_root) { - let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); - crate::service::notifications::spawn_notification_listener( - Arc::clone(&ctx.transport) as Arc, - move || channel_ids.clone(), - watcher_rx_for_slack, - root.clone(), - ); - - // Subscribe to the status broadcaster if the slack_status_consumer toggle - // is enabled (default: true). Formats each StatusEvent via the common - // formatter and sends the resulting text to all configured Slack channels. - // The task exits automatically when the broadcaster is dropped on shutdown. - { - use crate::service::status::format::format_status_event; - - let status_enabled = config::ProjectConfig::load(root) - .map(|c| c.slack_status_consumer) - .unwrap_or(true); - - if status_enabled { - let mut sub = ctx.services.status.subscribe(); - let transport = Arc::clone(&ctx.transport) as Arc; - let channels: Vec = ctx.channel_ids.iter().cloned().collect(); - tokio::spawn(async move { - while let Some(event) = sub.recv().await { - let plain = format_status_event(&event); - for channel in &channels { - if let Err(e) = transport.send_message(channel, &plain, "").await { - crate::slog!( - "[slack] Failed to send status event to {channel}: {e}" - ); - } - } - } - crate::slog!("[slack] Status subscriber task exiting — broadcaster dropped"); - }); - } - } - } else { - drop(watcher_rx_for_slack); - } - if let (Some(ctx), Some(root)) = (&discord_ctx, &startup_root) { - // Spawn the Discord Gateway WebSocket listener. - chat::transport::discord::gateway::spawn_gateway(Arc::clone(ctx)); - - // Spawn stage-transition notification listener for Discord. - let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); - crate::service::notifications::spawn_notification_listener( - Arc::clone(&ctx.transport) as Arc, - move || channel_ids.clone(), - watcher_rx_for_discord, - root.clone(), - ); - - // Subscribe to the status broadcaster if the discord_status_consumer toggle - // is enabled (default: true). Formats each StatusEvent via the common - // formatter and sends the resulting text to all configured Discord channels. - // The task exits automatically when the broadcaster is dropped on shutdown. - { - use crate::service::status::format::format_status_event; - - let status_enabled = config::ProjectConfig::load(root) - .map(|c| c.discord_status_consumer) - .unwrap_or(true); - - if status_enabled { - let mut sub = ctx.services.status.subscribe(); - let transport = Arc::clone(&ctx.transport) as Arc; - let channels: Vec = ctx.channel_ids.iter().cloned().collect(); - tokio::spawn(async move { - while let Some(event) = sub.recv().await { - let plain = format_status_event(&event); - for channel in &channels { - if let Err(e) = transport.send_message(channel, &plain, "").await { - crate::slog!( - "[discord] Failed to send status event to {channel}: {e}" - ); - } - } - } - crate::slog!("[discord] Status subscriber task exiting — broadcaster dropped"); - }); - } - } - } else { - drop(watcher_rx_for_discord); - } - - // On startup: - // 1. Reconcile any stories whose agent work was committed while the server was - // offline (worktree has commits ahead of master but pipeline didn't advance). - // 2. Auto-assign free agents to remaining unassigned work in the pipeline. - if let Some(root) = startup_root { - tokio::spawn(async move { - slog!("[startup] Reconciling completed worktrees from previous session."); - startup_agents - .reconcile_on_startup(&root, &startup_reconciliation_tx) - .await; - slog!("[auto-assign] Scanning pipeline stages for unassigned work."); - startup_agents.auto_assign_available_work(&root).await; - }); - } let host = std::env::var("HUSKIES_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); let addr = format!("{host}:{port}"); @@ -988,45 +329,14 @@ async fn main() -> Result<(), std::io::Error> { let result = Server::new(TcpListener::bind(&addr)).run(app).await; - // ── Shutdown notifications (best-effort) ───────────────────────────── - // - // The server is stopping (SIGINT / SIGTERM). Notify active bot channels - // so participants know the bot is going offline. We do this before killing - // PTY children so network I/O can still complete. - - // Slack: notifier holds the fixed channel list. - if let Some(ref notifier) = bot_shutdown_notifier { - notifier.notify(ShutdownReason::Manual).await; - } - - // WhatsApp: read the current set of ambient rooms and notify each sender. - if let Some(ref ctx) = whatsapp_ctx_for_shutdown { - let rooms: Vec = ctx - .services - .ambient_rooms - .lock() - .unwrap() - .iter() - .cloned() - .collect(); - if !rooms.is_empty() { - let wa_notifier = BotShutdownNotifier::new( - Arc::clone(&ctx.transport) as Arc, - rooms, - ctx.services.bot_name.clone(), - ); - wa_notifier.notify(ShutdownReason::Manual).await; - } - } + // ── Shutdown notifications (best-effort) ────────────────────────────────── + startup::bots::notify_shutdown(&bot_ctxs).await; // Matrix: signal the bot task and give it a short window to send its message. let _ = matrix_shutdown_tx_for_rebuild.send(Some(ShutdownReason::Manual)); tokio::time::sleep(std::time::Duration::from_millis(1500)).await; - // ── Cleanup ────────────────────────────────────────────────────────── - - // Kill all active PTY child processes before exiting to prevent orphaned - // Claude Code processes from running after the server restarts. + // Kill all active PTY child processes before exiting. agents_for_shutdown.kill_all_children(); if let Some(ref path) = port_file { diff --git a/server/src/startup/bots.rs b/server/src/startup/bots.rs new file mode 100644 index 00000000..1391d3a8 --- /dev/null +++ b/server/src/startup/bots.rs @@ -0,0 +1,370 @@ +//! Chat-transport context construction, startup announcements, stage-transition +//! notification listeners, and shutdown notifications. + +use crate::chat; +use crate::config; +use crate::rebuild::{BotShutdownNotifier, ShutdownReason}; +use crate::service; +use crate::services::Services; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::broadcast; + +/// All chat-transport contexts built at startup, plus the Matrix shutdown channel. +pub(crate) struct BotContexts { + pub(crate) whatsapp_ctx: Option>, + pub(crate) slack_ctx: Option>, + pub(crate) discord_ctx: Option>, + /// Best-effort shutdown notifier for Slack / Discord (fixed channel list). + pub(crate) shutdown_notifier: Option>, + /// Retained for shutdown-time WhatsApp notifications (reads ambient_rooms). + pub(crate) whatsapp_ctx_for_shutdown: + Option>, + /// Sender used to signal the Matrix bot task at shutdown. + pub(crate) matrix_shutdown_tx: Arc>>, +} + +/// Build WhatsApp, Slack, and Discord contexts from `bot.toml`, along with +/// the shutdown notifier and Matrix watch channel. +/// +/// Returns the `BotContexts` struct and the Matrix shutdown receiver separately +/// so the receiver can be moved into `spawn_bot` without partially moving the struct. +pub(crate) fn build_bot_contexts( + startup_root: &Option, + services: &Arc, +) -> ( + BotContexts, + tokio::sync::watch::Receiver>, +) { + let whatsapp_ctx: Option> = startup_root + .as_ref() + .and_then(|root| chat::transport::matrix::BotConfig::load(root)) + .filter(|cfg| cfg.transport == "whatsapp") + .map(|cfg| { + let provider = cfg.whatsapp_provider.clone(); + let transport: Arc = if provider == "twilio" { + Arc::new(chat::transport::whatsapp::TwilioWhatsAppTransport::new( + cfg.twilio_account_sid.clone().unwrap_or_default(), + cfg.twilio_auth_token.clone().unwrap_or_default(), + cfg.twilio_whatsapp_number.clone().unwrap_or_default(), + )) + } else { + let template_name = cfg + .whatsapp_notification_template + .clone() + .unwrap_or_else(|| "pipeline_notification".to_string()); + Arc::new(chat::transport::whatsapp::WhatsAppTransport::new( + cfg.whatsapp_phone_number_id.clone().unwrap_or_default(), + cfg.whatsapp_access_token.clone().unwrap_or_default(), + template_name, + )) + }; + let root = startup_root.clone().unwrap(); + let history = chat::transport::whatsapp::load_whatsapp_history(&root); + Arc::new(chat::transport::whatsapp::WhatsAppWebhookContext { + services: Arc::clone(services), + verify_token: cfg.whatsapp_verify_token.clone().unwrap_or_default(), + provider, + transport, + history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), + history_size: cfg.history_size, + window_tracker: Arc::new(chat::transport::whatsapp::MessagingWindowTracker::new()), + allowed_phones: cfg.whatsapp_allowed_phones.clone(), + }) + }); + + let slack_ctx: Option> = startup_root + .as_ref() + .and_then(|root| chat::transport::matrix::BotConfig::load(root)) + .filter(|cfg| cfg.transport == "slack") + .map(|cfg| { + let transport = Arc::new(chat::transport::slack::SlackTransport::new( + cfg.slack_bot_token.clone().unwrap_or_default(), + )); + let root = startup_root.clone().unwrap(); + let history = chat::transport::slack::load_slack_history(&root); + let channel_ids: std::collections::HashSet = + cfg.slack_channel_ids.iter().cloned().collect(); + Arc::new(chat::transport::slack::SlackWebhookContext { + services: Arc::clone(services), + signing_secret: cfg.slack_signing_secret.clone().unwrap_or_default(), + transport, + history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), + history_size: cfg.history_size, + channel_ids, + }) + }); + + let discord_ctx: Option> = startup_root + .as_ref() + .and_then(|root| chat::transport::matrix::BotConfig::load(root)) + .filter(|cfg| cfg.transport == "discord") + .map(|cfg| { + let transport = Arc::new(chat::transport::discord::DiscordTransport::new( + cfg.discord_bot_token.clone().unwrap_or_default(), + )); + let root = startup_root.clone().unwrap(); + let history = chat::transport::discord::load_discord_history(&root); + let channel_ids: std::collections::HashSet = + cfg.discord_channel_ids.iter().cloned().collect(); + let allowed_users: std::collections::HashSet = + cfg.discord_allowed_users.iter().cloned().collect(); + Arc::new(chat::transport::discord::DiscordContext { + services: Arc::clone(services), + bot_token: cfg.discord_bot_token.clone().unwrap_or_default(), + transport, + history: std::sync::Arc::new(tokio::sync::Mutex::new(history)), + history_size: cfg.history_size, + channel_ids, + allowed_users, + }) + }); + + // Build a best-effort shutdown notifier. + // Slack and Discord have fixed channel lists; WhatsApp rooms are tracked at + // runtime via ambient_rooms and handled separately in `notify_shutdown`. + let shutdown_notifier: Option> = if let Some(ref ctx) = slack_ctx { + let channels: Vec = ctx.channel_ids.iter().cloned().collect(); + Some(Arc::new(BotShutdownNotifier::new( + Arc::clone(&ctx.transport) as Arc, + channels, + ctx.services.bot_name.clone(), + ))) + } else if let Some(ref ctx) = discord_ctx { + let channels: Vec = ctx.channel_ids.iter().cloned().collect(); + Some(Arc::new(BotShutdownNotifier::new( + Arc::clone(&ctx.transport) as Arc, + channels, + ctx.services.bot_name.clone(), + ))) + } else { + None + }; + + let whatsapp_ctx_for_shutdown = whatsapp_ctx.clone(); + let (matrix_shutdown_tx, matrix_shutdown_rx) = + tokio::sync::watch::channel::>(None); + let matrix_shutdown_tx = Arc::new(matrix_shutdown_tx); + + ( + BotContexts { + whatsapp_ctx, + slack_ctx, + discord_ctx, + shutdown_notifier, + whatsapp_ctx_for_shutdown, + matrix_shutdown_tx, + }, + matrix_shutdown_rx, + ) +} + +/// Send `"{bot_name} is online."` to all known WhatsApp, Slack, and Discord +/// contacts so users know the bot is ready after a (re)start. +pub(crate) fn spawn_startup_announcements(ctxs: &BotContexts) { + use chat::transport::whatsapp::WhatsAppConversationHistory; + + if let Some(ref ctx) = ctxs.whatsapp_ctx { + let transport = Arc::clone(&ctx.transport); + let bot_name = ctx.services.bot_name.clone(); + let history: WhatsAppConversationHistory = Arc::clone(&ctx.history); + tokio::spawn(async move { + let senders: Vec = history.lock().await.keys().cloned().collect(); + if senders.is_empty() { + return; + } + let notifier = BotShutdownNotifier::new(transport, senders, bot_name); + notifier.notify_startup().await; + }); + } + + if let Some(ref ctx) = ctxs.slack_ctx { + let transport = Arc::clone(&ctx.transport) as Arc; + let bot_name = ctx.services.bot_name.clone(); + let channels: Vec = ctx.channel_ids.iter().cloned().collect(); + tokio::spawn(async move { + if channels.is_empty() { + return; + } + let notifier = BotShutdownNotifier::new(transport, channels, bot_name); + notifier.notify_startup().await; + }); + } + + if let Some(ref ctx) = ctxs.discord_ctx { + let transport = Arc::clone(&ctx.transport) as Arc; + let bot_name = ctx.services.bot_name.clone(); + let channels: Vec = ctx.channel_ids.iter().cloned().collect(); + tokio::spawn(async move { + if channels.is_empty() { + return; + } + let notifier = BotShutdownNotifier::new(transport, channels, bot_name); + notifier.notify_startup().await; + }); + } +} + +/// Spawn stage-transition notification listeners and status-event consumers for +/// all configured chat transports (WhatsApp, Slack, Discord). +/// +/// Watcher receivers that have no matching transport are dropped immediately. +pub(crate) fn spawn_notification_listeners( + ctxs: &BotContexts, + startup_root: &Option, + watcher_rx_for_whatsapp: broadcast::Receiver, + watcher_rx_for_slack: broadcast::Receiver, + watcher_rx_for_discord: broadcast::Receiver, +) { + if let (Some(ctx), Some(root)) = (&ctxs.whatsapp_ctx, startup_root) { + let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms); + service::notifications::spawn_notification_listener( + Arc::clone(&ctx.transport), + move || ambient_rooms.lock().unwrap().iter().cloned().collect(), + watcher_rx_for_whatsapp, + root.clone(), + ); + + { + use crate::service::status::format::format_status_event; + + let status_enabled = config::ProjectConfig::load(root) + .map(|c| c.whatsapp_status_consumer) + .unwrap_or(true); + + if status_enabled { + let mut sub = ctx.services.status.subscribe(); + let transport = Arc::clone(&ctx.transport) as Arc; + let ambient_rooms = Arc::clone(&ctx.services.ambient_rooms); + tokio::spawn(async move { + while let Some(event) = sub.recv().await { + let plain = format_status_event(&event); + let rooms: Vec = + ambient_rooms.lock().unwrap().iter().cloned().collect(); + for room in &rooms { + if let Err(e) = transport.send_message(room, &plain, "").await { + crate::slog!( + "[whatsapp] Failed to send status event to {room}: {e}" + ); + } + } + } + crate::slog!("[whatsapp] Status subscriber task exiting — broadcaster dropped"); + }); + } + } + } else { + drop(watcher_rx_for_whatsapp); + } + + if let (Some(ctx), Some(root)) = (&ctxs.slack_ctx, startup_root) { + let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); + service::notifications::spawn_notification_listener( + Arc::clone(&ctx.transport) as Arc, + move || channel_ids.clone(), + watcher_rx_for_slack, + root.clone(), + ); + + { + use crate::service::status::format::format_status_event; + + let status_enabled = config::ProjectConfig::load(root) + .map(|c| c.slack_status_consumer) + .unwrap_or(true); + + if status_enabled { + let mut sub = ctx.services.status.subscribe(); + let transport = Arc::clone(&ctx.transport) as Arc; + let channels: Vec = ctx.channel_ids.iter().cloned().collect(); + tokio::spawn(async move { + while let Some(event) = sub.recv().await { + let plain = format_status_event(&event); + for channel in &channels { + if let Err(e) = transport.send_message(channel, &plain, "").await { + crate::slog!( + "[slack] Failed to send status event to {channel}: {e}" + ); + } + } + } + crate::slog!("[slack] Status subscriber task exiting — broadcaster dropped"); + }); + } + } + } else { + drop(watcher_rx_for_slack); + } + + if let (Some(ctx), Some(root)) = (&ctxs.discord_ctx, startup_root) { + chat::transport::discord::gateway::spawn_gateway(Arc::clone(ctx)); + + let channel_ids: Vec = ctx.channel_ids.iter().cloned().collect(); + service::notifications::spawn_notification_listener( + Arc::clone(&ctx.transport) as Arc, + move || channel_ids.clone(), + watcher_rx_for_discord, + root.clone(), + ); + + { + use crate::service::status::format::format_status_event; + + let status_enabled = config::ProjectConfig::load(root) + .map(|c| c.discord_status_consumer) + .unwrap_or(true); + + if status_enabled { + let mut sub = ctx.services.status.subscribe(); + let transport = Arc::clone(&ctx.transport) as Arc; + let channels: Vec = ctx.channel_ids.iter().cloned().collect(); + tokio::spawn(async move { + while let Some(event) = sub.recv().await { + let plain = format_status_event(&event); + for channel in &channels { + if let Err(e) = transport.send_message(channel, &plain, "").await { + crate::slog!( + "[discord] Failed to send status event to {channel}: {e}" + ); + } + } + } + crate::slog!("[discord] Status subscriber task exiting — broadcaster dropped"); + }); + } + } + } else { + drop(watcher_rx_for_discord); + } +} + +/// Send shutdown notifications to all active bot channels (best-effort). +/// +/// Called after the HTTP server stops accepting connections so that +/// network I/O can still complete before PTY children are killed. +pub(crate) async fn notify_shutdown(ctxs: &BotContexts) { + // Slack / Discord: notifier holds the fixed channel list. + if let Some(ref notifier) = ctxs.shutdown_notifier { + notifier.notify(ShutdownReason::Manual).await; + } + + // WhatsApp: read the current set of ambient rooms and notify each sender. + if let Some(ref ctx) = ctxs.whatsapp_ctx_for_shutdown { + let rooms: Vec = ctx + .services + .ambient_rooms + .lock() + .unwrap() + .iter() + .cloned() + .collect(); + if !rooms.is_empty() { + let wa_notifier = BotShutdownNotifier::new( + Arc::clone(&ctx.transport) as Arc, + rooms, + ctx.services.bot_name.clone(), + ); + wa_notifier.notify(ShutdownReason::Manual).await; + } + } +} diff --git a/server/src/startup/mod.rs b/server/src/startup/mod.rs new file mode 100644 index 00000000..ed7a4534 --- /dev/null +++ b/server/src/startup/mod.rs @@ -0,0 +1,5 @@ +//! Server startup helpers: project initialisation, background tasks, and bot setup. + +pub(crate) mod bots; +pub(crate) mod project; +pub(crate) mod tick_loop; diff --git a/server/src/startup/project.rs b/server/src/startup/project.rs new file mode 100644 index 00000000..856f36f6 --- /dev/null +++ b/server/src/startup/project.rs @@ -0,0 +1,213 @@ +//! Project-root discovery, subsystem initialisation (log, identity, DB, CRDT), +//! and CRDT-sync configuration. + +use crate::config; +use crate::crdt_state; +use crate::crdt_sync; +use crate::db; +use crate::io::fs::find_story_kit_root; +use crate::log_buffer; +use crate::node_identity; +use crate::state::SessionState; +use crate::store::JsonFileStore; +use crate::worktree; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +/// Open (or scaffold) the project root according to the CLI flags and CWD. +/// +/// Handles `--init`, an explicit path argument, and the default auto-detect +/// behaviour. Modifies `app_state.project_root` as a side effect. +pub(crate) async fn open_project_root( + is_init: bool, + explicit_path: Option, + cwd: &Path, + app_state: &Arc, + store: &Arc, + port: u16, +) { + if is_init { + let init_root = explicit_path.unwrap_or_else(|| cwd.to_path_buf()); + if !init_root.exists() { + std::fs::create_dir_all(&init_root).unwrap_or_else(|e| { + eprintln!( + "error: cannot create directory {}: {e}", + init_root.display() + ); + std::process::exit(1); + }); + } + match crate::io::fs::open_project( + init_root.to_string_lossy().to_string(), + app_state, + store.as_ref(), + port, + ) + .await + { + Ok(_) => { + if let Some(root) = app_state.project_root.lock().unwrap().as_ref() { + config::ProjectConfig::load(root) + .unwrap_or_else(|e| panic!("Invalid project.toml: {e}")); + crate::io::wizard::WizardState::init_if_missing(root); + } + } + Err(e) => { + eprintln!("error: {e}"); + std::process::exit(1); + } + } + } else if let Some(explicit_root) = explicit_path { + match crate::io::fs::open_project( + explicit_root.to_string_lossy().to_string(), + app_state, + store.as_ref(), + port, + ) + .await + { + Ok(_) => { + if let Some(root) = app_state.project_root.lock().unwrap().as_ref() { + config::ProjectConfig::load(root) + .unwrap_or_else(|e| panic!("Invalid project.toml: {e}")); + } + } + Err(e) => { + eprintln!("error: {e}"); + std::process::exit(1); + } + } + } else if let Some(project_root) = find_story_kit_root(cwd) { + crate::io::fs::open_project( + project_root.to_string_lossy().to_string(), + app_state, + store.as_ref(), + port, + ) + .await + .unwrap_or_else(|e| { + crate::slog!("Warning: failed to auto-open project at {project_root:?}: {e}"); + project_root.to_string_lossy().to_string() + }); + + config::ProjectConfig::load(&project_root) + .unwrap_or_else(|e| panic!("Invalid project.toml: {e}")); + } else { + crate::io::fs::open_project( + cwd.to_string_lossy().to_string(), + app_state, + store.as_ref(), + port, + ) + .await + .unwrap_or_else(|e| { + crate::slog!("Warning: failed to scaffold project at {cwd:?}: {e}"); + cwd.to_string_lossy().to_string() + }); + } +} + +/// Set up the server log file, node identity keypair, pipeline DB, and CRDT state. +pub(crate) async fn init_subsystems(app_state: &Arc, cwd: &Path) { + // Enable persistent server log file now that the project root is known. + if let Some(ref root) = *app_state.project_root.lock().unwrap() { + let log_dir = root.join(".huskies").join("logs"); + let _ = std::fs::create_dir_all(&log_dir); + log_buffer::global().set_log_file(log_dir.join("server.log")); + } + + // Initialise the node's Ed25519 identity keypair (file-based, mode 0600). + // The key is stored at .huskies/node_identity.key and persisted across restarts. + { + let key_path = app_state + .project_root + .lock() + .unwrap() + .as_ref() + .map(|root| root.join(".huskies").join("node_identity.key")) + .unwrap_or_else(|| cwd.join(".huskies").join("node_identity.key")); + if let Err(e) = node_identity::init_identity(&key_path) { + crate::slog!("[identity] Failed to initialise node identity keypair: {e}"); + } else if let Some(id) = node_identity::get_identity() { + crate::slog!("[identity] Node ID: {}", id.node_id); + } + } + + // Initialise the SQLite pipeline shadow-write database and CRDT state layer. + // Clone the path out before the await so we don't hold the MutexGuard across + // an await point. + let pipeline_db_path = app_state + .project_root + .lock() + .unwrap() + .as_ref() + .map(|root| root.join(".huskies").join("pipeline.db")); + + if let Some(ref db_path) = pipeline_db_path { + if let Err(e) = db::init(db_path).await { + crate::slog!("[db] Failed to initialise pipeline.db: {e}"); + } + if let Err(e) = crdt_state::init(db_path).await { + crate::slog!("[crdt] Failed to initialise CRDT state layer: {e}"); + } else { + crdt_state::migrate_names_from_slugs(); + let id_migrations = crdt_state::migrate_story_ids_to_numeric(); + if !id_migrations.is_empty() + && let Some(project_root) = db_path.parent().and_then(|p| p.parent()) + { + worktree::migrate_slug_paths(project_root, &id_migrations); + } + } + } +} + +/// Wire up CRDT sync: trusted keys, token auth, and the rendezvous client. +/// +/// In agent mode the rendezvous URL comes from the CLI; otherwise it is read +/// from `project.toml`. +pub(crate) fn configure_crdt_sync( + app_state: &Arc, + is_agent: bool, + agent_rendezvous: Option, + crdt_join_token: Option, +) { + let sync_config = if is_agent { + agent_rendezvous + .clone() + .map(|url| (url, Vec::new(), false, Vec::new())) + } else { + app_state + .project_root + .lock() + .unwrap() + .as_ref() + .and_then(|root| config::ProjectConfig::load(root).ok()) + .and_then(|cfg| { + cfg.rendezvous.map(|url| { + ( + url, + cfg.trusted_keys, + cfg.crdt_require_token, + cfg.crdt_tokens, + ) + }) + }) + }; + + if let Some((rendezvous_url, trusted_keys, require_token, crdt_tokens)) = sync_config { + crdt_sync::init_trusted_keys(trusted_keys); + crdt_sync::init_token_auth(require_token, crdt_tokens); + crdt_sync::spawn_rendezvous_client(rendezvous_url, crdt_join_token); + } else { + let (keys, require_token, crdt_tokens) = app_state + .project_root + .lock() + .unwrap() + .as_ref() + .and_then(|root| config::ProjectConfig::load(root).ok()) + .map(|cfg| (cfg.trusted_keys, cfg.crdt_require_token, cfg.crdt_tokens)) + .unwrap_or_default(); + crdt_sync::init_trusted_keys(keys); + crdt_sync::init_token_auth(require_token, crdt_tokens); + } +} diff --git a/server/src/startup/tick_loop.rs b/server/src/startup/tick_loop.rs new file mode 100644 index 00000000..356d1217 --- /dev/null +++ b/server/src/startup/tick_loop.rs @@ -0,0 +1,187 @@ +//! Background tasks: CRDT-event bridge, auto-assign subscriber, unified tick +//! loop, gateway relay, and startup reconciliation. + +use crate::agents::{AgentPool, ReconciliationEvent}; +use crate::config; +use crate::gateway_relay; +use crate::io; +use crate::pipeline_state; +use crate::service; +use crate::service::status::StatusBroadcaster; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::broadcast; + +/// Bridge CRDT state-transition events to the watcher broadcast channel and +/// spawn the auto-assign subscriber that triggers on active-stage transitions. +pub(crate) fn spawn_event_bridges( + watcher_tx: broadcast::Sender, + project_root: Option, + agents: Arc, +) { + // CRDT → watcher bridge: translate CRDT stage-transition events into + // WatcherEvent::WorkItem so downstream consumers (WebSocket, auto-assign) + // see a uniform stream regardless of whether the event originated from the + // filesystem watcher or from a CRDT sync peer. + { + let crdt_watcher_tx = watcher_tx.clone(); + let crdt_prune_root = project_root.clone(); + if let Some(mut crdt_rx) = crate::crdt_state::subscribe() { + tokio::spawn(async move { + while let Ok(evt) = crdt_rx.recv().await { + if crate::pipeline_state::Stage::from_dir(&evt.to_stage) + .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) + && let Some(root) = crdt_prune_root.as_ref().cloned() + { + let story_id = evt.story_id.clone(); + tokio::task::spawn_blocking(move || { + if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) { + crate::slog!("[crdt] worktree prune failed for {story_id}: {e}"); + } + }); + } + let (action, commit_msg) = + io::watcher::stage_metadata(&evt.to_stage, &evt.story_id) + .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); + let watcher_evt = io::watcher::WatcherEvent::WorkItem { + stage: evt.to_stage, + item_id: evt.story_id, + action: action.to_string(), + commit_msg, + from_stage: evt.from_stage, + }; + let _ = crdt_watcher_tx.send(watcher_evt); + } + }); + } + } + + // Auto-assign: trigger `auto_assign_available_work` whenever a work item + // enters an active pipeline stage (2_current/, 3_qa/, 4_merge/). + if let Some(root) = project_root { + let watcher_auto_rx = watcher_tx.subscribe(); + let watcher_auto_agents = Arc::clone(&agents); + tokio::spawn(async move { + let mut rx = watcher_auto_rx; + while let Ok(event) = rx.recv().await { + if let io::watcher::WatcherEvent::WorkItem { ref stage, .. } = event + && pipeline_state::Stage::from_dir(stage.as_str()) + .is_some_and(|s| s.is_active()) + { + crate::slog!( + "[auto-assign] CRDT transition detected in {stage}/; \ + triggering auto-assign." + ); + watcher_auto_agents.auto_assign_available_work(&root).await; + } + } + }); + } +} + +/// Spawn the unified 1-second background tick loop. +/// +/// Fires due timers, runs the agent watchdog every 30 ticks, and promotes +/// done→archived items every `sweep_interval_secs` ticks. +pub(crate) fn spawn_tick_loop( + agents: Arc, + timer_store: Arc, + root: Option, +) { + let sweep_cfg = root + .as_ref() + .and_then(|r| config::ProjectConfig::load(r).ok()) + .map(|c| c.watcher) + .unwrap_or_default(); + let sweep_every = sweep_cfg.sweep_interval_secs.max(1); + let done_retention = std::time::Duration::from_secs(sweep_cfg.done_retention_secs); + let pending_count = timer_store.list().len(); + crate::slog!("[tick] Unified tick loop started; {pending_count} pending timer(s)"); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); + let mut tick_count: u64 = 0; + loop { + interval.tick().await; + tick_count = tick_count.wrapping_add(1); + + // Timer: fire due timers every second. + if let Some(ref r) = root { + let result = service::timer::tick_once(&timer_store, &agents, r).await; + if let Err(msg) = result { + crate::slog_error!("[tick] Timer tick panicked: {msg}"); + } + } + + // Watchdog: detect orphaned Running agents every 30 ticks. + if tick_count.is_multiple_of(30) { + let found = agents.run_watchdog_pass(root.as_deref()); + if found > 0 { + crate::slog!( + "[tick] {found} orphaned agent(s) detected; triggering auto-assign." + ); + if let Some(ref r) = root { + agents.auto_assign_available_work(r).await; + } + } + } + + // Sweep: promote done→archived every sweep_interval_secs ticks. + if tick_count.is_multiple_of(sweep_every) { + io::watcher::sweep_done_to_archived(done_retention); + } + } + }); +} + +/// Spawn the gateway relay task if `gateway_url` is configured in +/// `project.toml` or the `HUSKIES_GATEWAY_URL` environment variable. +pub(crate) fn spawn_gateway_relay(startup_root: &Option, status: Arc) { + let relay_gateway_url = startup_root + .as_ref() + .and_then(|r| config::ProjectConfig::load(r).ok()) + .and_then(|c| c.gateway_url) + .or_else(|| std::env::var("HUSKIES_GATEWAY_URL").ok()) + .unwrap_or_default(); + + if !relay_gateway_url.is_empty() { + let relay_project_name = startup_root + .as_ref() + .and_then(|r| config::ProjectConfig::load(r).ok()) + .and_then(|c| c.gateway_project) + .or_else(|| std::env::var("HUSKIES_GATEWAY_PROJECT").ok()) + .or_else(|| { + startup_root + .as_ref() + .and_then(|r| r.file_name()) + .map(|n| n.to_string_lossy().into_owned()) + }) + .unwrap_or_else(|| "project".to_string()); + + gateway_relay::spawn_relay_task( + relay_gateway_url, + relay_project_name, + status, + reqwest::Client::new(), + ); + } +} + +/// Spawn the startup reconciliation task: reconcile any stories whose agent +/// work was committed while the server was offline, then auto-assign free agents. +pub(crate) fn spawn_startup_reconciliation( + startup_root: Option, + startup_agents: Arc, + startup_reconciliation_tx: broadcast::Sender, +) { + if let Some(root) = startup_root { + tokio::spawn(async move { + crate::slog!("[startup] Reconciling completed worktrees from previous session."); + startup_agents + .reconcile_on_startup(&root, &startup_reconciliation_tx) + .await; + crate::slog!("[auto-assign] Scanning pipeline stages for unassigned work."); + startup_agents.auto_assign_available_work(&root).await; + }); + } +}