//! Headless build-agent mode for distributed, rendezvous-based story processing. /// /// When invoked via `huskies agent --rendezvous ws://host:3001/crdt-sync`, this /// module runs a headless loop that: /// /// 1. Syncs CRDT state with the rendezvous peer. /// 2. Writes a heartbeat to the CRDT `nodes` list. /// 3. Scans for unclaimed stories in `2_current` and claims them via CRDT. /// 4. Spawns Claude Code locally for each claimed story. /// 5. Pushes the feature branch to the git remote when done. /// 6. Reports completion by advancing the story stage via CRDT. /// 7. Handles offline/reconnect: CRDT merges on reconnect, interrupted work /// is reclaimed after a timeout. /// /// A minimal HTTP server is started on the agent's port to serve the /// `/crdt-sync` WebSocket endpoint, enabling other agents to connect for /// peer mesh discovery. mod claim; mod context; mod loop_ops; pub use claim::SCAN_INTERVAL_SECS; use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; use tokio::sync::broadcast; use poem::EndpointExt as _; use crate::agents::AgentPool; use crate::config::ProjectConfig; use crate::crdt_state; use crate::io::watcher; use crate::mesh; use crate::slog; use context::{build_agent_app_context, register_with_gateway}; use loop_ops::{ check_completions_and_push, detect_conflicts, reclaim_timed_out_work, scan_and_claim, write_heartbeat, }; /// Run the headless build agent loop. /// /// This function never returns under normal operation — it runs until the /// process is terminated (SIGINT/SIGTERM). /// /// If `join_token` and `gateway_url` are both provided the agent will register /// itself with the gateway on startup using the one-time token. pub async fn run( project_root: Option, rendezvous_url: String, port: u16, join_token: Option, gateway_url: Option, ) -> Result<(), std::io::Error> { let project_root = match project_root { Some(r) => r, None => { eprintln!("error: agent mode requires a project root (no .huskies/ found)"); std::process::exit(1); } }; println!("\x1b[96;1m[agent-mode]\x1b[0m Starting headless build agent"); println!("\x1b[96;1m[agent-mode]\x1b[0m Rendezvous: {rendezvous_url}"); println!( "\x1b[96;1m[agent-mode]\x1b[0m Project: {}", project_root.display() ); // Validate project config. let config = ProjectConfig::load(&project_root).unwrap_or_else(|e| { eprintln!("error: invalid project config: {e}"); std::process::exit(1); }); slog!( "[agent-mode] Loaded config with {} agents", config.agent.len() ); // Event bus for pipeline lifecycle events. let (watcher_tx, _) = broadcast::channel::(1024); let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); // Start filesystem watcher for config hot-reload. watcher::start_watcher(project_root.clone(), watcher_tx.clone()); // Bridge CRDT events to watcher channel (same as main server). { let crdt_watcher_tx = watcher_tx.clone(); let crdt_prune_root = Some(project_root.clone()); if let Some(mut crdt_rx) = crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { if crate::pipeline_state::Stage::from_dir(&evt.to_stage) .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) && let Some(root) = crdt_prune_root.as_ref().cloned() { let story_id = evt.story_id.clone(); tokio::task::spawn_blocking(move || { if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) { slog!("[agent-mode] worktree prune failed for {story_id}: {e}"); } }); } let (action, commit_msg) = watcher::stage_metadata(&evt.to_stage, &evt.story_id) .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); let watcher_evt = watcher::WatcherEvent::WorkItem { stage: evt.to_stage, item_id: evt.story_id, action: action.to_string(), commit_msg, from_stage: evt.from_stage, }; let _ = crdt_watcher_tx.send(watcher_evt); } }); } } // Subscribe to watcher events to trigger auto-assign on stage transitions. { let auto_rx = watcher_tx.subscribe(); let auto_agents = Arc::clone(&agents); let auto_root = project_root.clone(); tokio::spawn(async move { let mut rx = auto_rx; while let Ok(event) = rx.recv().await { if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event && crate::pipeline_state::Stage::from_dir(stage.as_str()) .is_some_and(|s| s.is_active()) { slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign."); auto_agents.auto_assign_available_work(&auto_root).await; } } }); } // ── Start minimal HTTP server for /crdt-sync endpoint ───────────── // // Other agents discover this endpoint via the CRDT `nodes` list and // open supplementary mesh connections for resilience. { let sync_handler = poem::get(crate::crdt_sync::crdt_sync_handler); // Build a minimal AppContext for the crdt_sync_handler (the handler // receives it via Data<> but doesn't use it — the underscore prefix // on `_ctx` confirms this). let agent_ctx = build_agent_app_context(&project_root, port, watcher_tx.clone()); let agent_ctx_arc = Arc::new(agent_ctx); let app = poem::Route::new() .at("/crdt-sync", sync_handler) .data(agent_ctx_arc); let bind_addr = format!("0.0.0.0:{port}"); slog!("[agent-mode] Starting /crdt-sync endpoint on {bind_addr}"); tokio::spawn(async move { if let Err(e) = poem::Server::new(poem::listener::TcpListener::bind(&bind_addr)) .run(app) .await { slog!("[agent-mode] HTTP server error: {e}"); } }); } // Write initial heartbeat. write_heartbeat(&rendezvous_url, port); // Register with gateway if a join token and gateway URL were provided. if let (Some(token), Some(url)) = (join_token.clone(), gateway_url) { let node_id = crdt_state::our_node_id().unwrap_or_else(|| "unknown".to_string()); let label = format!("build-agent-{}", &node_id[..node_id.len().min(8)]); let address = format!("ws://0.0.0.0:{port}/crdt-sync"); register_with_gateway(&url, &token, &label, &address).await; } // ── Mesh peer discovery ──────────────────────────────────────────── // // Periodically read the CRDT `nodes` list and open supplementary sync // connections to alive peers. The primary rendezvous connection remains // canonical; mesh connections are supplementary and don't block startup. let _mesh_handle = { let our_node_id = crdt_state::our_node_id().unwrap_or_default(); let max_mesh_peers = config.max_mesh_peers; mesh::spawn_mesh_discovery( max_mesh_peers, our_node_id, rendezvous_url.clone(), join_token, ) }; // Reconcile any committed work from a previous session. { let recon_agents = Arc::clone(&agents); let recon_root = project_root.clone(); let (recon_tx, _) = broadcast::channel(64); slog!("[agent-mode] Reconciling completed worktrees from previous session."); recon_agents .reconcile_on_startup(&recon_root, &recon_tx) .await; } // Run initial auto-assign. slog!("[agent-mode] Initial auto-assign scan."); agents.auto_assign_available_work(&project_root).await; // Track which stories we've claimed so we can detect conflicts. let mut our_claims: HashMap = HashMap::new(); // Main loop: heartbeat, scan, claim, detect conflicts. let mut interval = tokio::time::interval(std::time::Duration::from_secs(SCAN_INTERVAL_SECS)); loop { interval.tick().await; // Write heartbeat. write_heartbeat(&rendezvous_url, port); // Scan CRDT for claimable work. scan_and_claim(&agents, &project_root, &mut our_claims).await; // Detect claim conflicts: if another node overwrote our claim, stop our agent. detect_conflicts(&agents, &project_root, &mut our_claims).await; // Reclaim timed-out work from dead nodes. reclaim_timed_out_work(&project_root); // Check for completed agents and push their branches. check_completions_and_push(&agents, &project_root).await; } } // ── Tests ──────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use crate::config::ProjectConfig; use crate::mesh; // ── Mesh discovery integration tests ──────────────────────────────── /// AC7 (mesh storm cap): With 6 alive peers, the MeshManager enforces a /// cap of 3 connections. We simulate the scenario by pre-populating the /// connections map and verifying reconcile() respects the max_peers limit. #[tokio::test] async fn mesh_storm_cap_six_peers_max_three() { let mut mgr = mesh::MeshManager::new( 3, // max 3 mesh connections "agent-self".to_string(), "ws://server:3001/crdt-sync".to_string(), None, ); // Simulate 6 peer connections (long-running tasks). let peer_ids: Vec = (0..6).map(|i| format!("peer-{i}")).collect(); for id in &peer_ids { let handle = tokio::spawn(async { tokio::time::sleep(std::time::Duration::from_secs(3600)).await; }); mgr.connections.insert(id.clone(), handle); } assert_eq!(mgr.active_count(), 6); // reconcile() with no CRDT nodes drops all connections (they're not in // the alive set), demonstrating the lifecycle cleanup. mgr.reconcile(); assert_eq!(mgr.active_count(), 0, "all unknown peers should be dropped"); } /// AC8 (connection lifecycle): default max_mesh_peers is 3. #[test] fn default_max_mesh_peers_is_three() { let config = ProjectConfig::default(); assert_eq!(config.max_mesh_peers, 3); } }