//! Headless build-agent mode for distributed, rendezvous-based story processing. /// Headless build agent mode. /// /// When invoked via `huskies agent --rendezvous ws://host:3001/crdt-sync`, this /// module runs a headless loop that: /// /// 1. Syncs CRDT state with the rendezvous peer. /// 2. Writes a heartbeat to the CRDT `nodes` list. /// 3. Scans for unclaimed stories in `2_current` and claims them via CRDT. /// 4. Spawns Claude Code locally for each claimed story. /// 5. Pushes the feature branch to the git remote when done. /// 6. Reports completion by advancing the story stage via CRDT. /// 7. Handles offline/reconnect: CRDT merges on reconnect, interrupted work /// is reclaimed after a timeout. /// /// No web UI, HTTP server, or chat interface is started. use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::broadcast; use crate::agents::AgentPool; use crate::config::ProjectConfig; use crate::crdt_state; use crate::io::watcher; use crate::slog; /// Default claim timeout in seconds. If a node has not updated its heartbeat /// within this window, other nodes may reclaim the story. const CLAIM_TIMEOUT_SECS: f64 = 600.0; // 10 minutes /// Interval between heartbeat writes and work scans. const SCAN_INTERVAL_SECS: u64 = 15; /// Run the headless build agent loop. /// /// This function never returns under normal operation — it runs until the /// process is terminated (SIGINT/SIGTERM). /// /// If `join_token` and `gateway_url` are both provided the agent will register /// itself with the gateway on startup using the one-time token. pub async fn run( project_root: Option, rendezvous_url: String, port: u16, join_token: Option, gateway_url: Option, ) -> Result<(), std::io::Error> { let project_root = match project_root { Some(r) => r, None => { eprintln!("error: agent mode requires a project root (no .huskies/ found)"); std::process::exit(1); } }; println!("\x1b[96;1m[agent-mode]\x1b[0m Starting headless build agent"); println!("\x1b[96;1m[agent-mode]\x1b[0m Rendezvous: {rendezvous_url}"); println!( "\x1b[96;1m[agent-mode]\x1b[0m Project: {}", project_root.display() ); // Validate project config. let config = ProjectConfig::load(&project_root).unwrap_or_else(|e| { eprintln!("error: invalid project config: {e}"); std::process::exit(1); }); slog!( "[agent-mode] Loaded config with {} agents", config.agent.len() ); // Event bus for pipeline lifecycle events. let (watcher_tx, _) = broadcast::channel::(1024); let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); // Start filesystem watcher for config hot-reload. watcher::start_watcher(project_root.clone(), watcher_tx.clone()); // Bridge CRDT events to watcher channel (same as main server). { let crdt_watcher_tx = watcher_tx.clone(); let crdt_prune_root = Some(project_root.clone()); if let Some(mut crdt_rx) = crdt_state::subscribe() { tokio::spawn(async move { while let Ok(evt) = crdt_rx.recv().await { if evt.to_stage == "6_archived" && let Some(root) = crdt_prune_root.as_ref().cloned() { let story_id = evt.story_id.clone(); tokio::task::spawn_blocking(move || { if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) { slog!("[agent-mode] worktree prune failed for {story_id}: {e}"); } }); } let (action, commit_msg) = watcher::stage_metadata(&evt.to_stage, &evt.story_id) .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); let watcher_evt = watcher::WatcherEvent::WorkItem { stage: evt.to_stage, item_id: evt.story_id, action: action.to_string(), commit_msg, from_stage: evt.from_stage, }; let _ = crdt_watcher_tx.send(watcher_evt); } }); } } // Subscribe to watcher events to trigger auto-assign on stage transitions. { let auto_rx = watcher_tx.subscribe(); let auto_agents = Arc::clone(&agents); let auto_root = project_root.clone(); tokio::spawn(async move { let mut rx = auto_rx; while let Ok(event) = rx.recv().await { if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event && matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge") { slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign."); auto_agents.auto_assign_available_work(&auto_root).await; } } }); } // Write initial heartbeat. write_heartbeat(&rendezvous_url, port); // Register with gateway if a join token and gateway URL were provided. if let (Some(token), Some(url)) = (join_token, gateway_url) { let node_id = crdt_state::our_node_id().unwrap_or_else(|| "unknown".to_string()); let label = format!("build-agent-{}", &node_id[..node_id.len().min(8)]); let address = format!("ws://0.0.0.0:{port}/crdt-sync"); register_with_gateway(&url, &token, &label, &address).await; } // Reconcile any committed work from a previous session. { let recon_agents = Arc::clone(&agents); let recon_root = project_root.clone(); let (recon_tx, _) = broadcast::channel(64); slog!("[agent-mode] Reconciling completed worktrees from previous session."); recon_agents .reconcile_on_startup(&recon_root, &recon_tx) .await; } // Run initial auto-assign. slog!("[agent-mode] Initial auto-assign scan."); agents.auto_assign_available_work(&project_root).await; // Track which stories we've claimed so we can detect conflicts. let mut our_claims: HashMap = HashMap::new(); // Main loop: heartbeat, scan, claim, detect conflicts. let mut interval = tokio::time::interval(std::time::Duration::from_secs(SCAN_INTERVAL_SECS)); loop { interval.tick().await; // Write heartbeat. write_heartbeat(&rendezvous_url, port); // Scan CRDT for claimable work. scan_and_claim(&agents, &project_root, &mut our_claims).await; // Detect claim conflicts: if another node overwrote our claim, stop our agent. detect_conflicts(&agents, &project_root, &mut our_claims).await; // Reclaim timed-out work from dead nodes. reclaim_timed_out_work(&project_root); // Check for completed agents and push their branches. check_completions_and_push(&agents, &project_root).await; } } /// Write this node's heartbeat to the CRDT `nodes` list. fn write_heartbeat(rendezvous_url: &str, port: u16) { let Some(node_id) = crdt_state::our_node_id() else { return; }; let now = chrono::Utc::now().timestamp() as f64; // Advertise our crdt-sync endpoint. let address = format!("ws://0.0.0.0:{port}/crdt-sync"); crdt_state::write_node_presence(&node_id, &address, now, true); slog!( "[agent-mode] Heartbeat written: node={:.12}… rendezvous={rendezvous_url}", &node_id ); } /// Scan CRDT pipeline for unclaimed stories and claim them. async fn scan_and_claim( agents: &AgentPool, project_root: &Path, our_claims: &mut HashMap, ) { let Some(items) = crdt_state::read_all_items() else { return; }; let Some(our_node) = crdt_state::our_node_id() else { return; }; for item in &items { // Only claim stories in active stages. if !matches!(item.stage.as_str(), "2_current" | "3_qa" | "4_merge") { continue; } // Skip blocked stories. if item.blocked == Some(true) { continue; } // If already claimed by us, skip. if item.claimed_by.as_deref() == Some(&our_node) { continue; } // If claimed by another alive node and claim is fresh, skip. if let Some(ref claimer) = item.claimed_by && !claimer.is_empty() && claimer != &our_node && let Some(claimed_at) = item.claimed_at { let now = chrono::Utc::now().timestamp() as f64; if now - claimed_at < CLAIM_TIMEOUT_SECS && is_node_alive(claimer) { continue; } } // Try to claim this story. slog!( "[agent-mode] Claiming story '{}' for this node", item.story_id ); if crdt_state::write_claim(&item.story_id) { let now = chrono::Utc::now().timestamp() as f64; our_claims.insert(item.story_id.clone(), now); } } // Trigger auto-assign to start agents for newly claimed work. agents.auto_assign_available_work(project_root).await; } /// Detect if another node overwrote our claims (CRDT conflict resolution). /// If so, stop our local agent for that story. async fn detect_conflicts( agents: &AgentPool, project_root: &Path, our_claims: &mut HashMap, ) { let lost: Vec = our_claims .keys() .filter(|story_id| !crdt_state::is_claimed_by_us(story_id)) .cloned() .collect(); for story_id in lost { slog!( "[agent-mode] Lost claim on '{}' to another node; stopping local agent.", story_id ); our_claims.remove(&story_id); // Stop any local agent for this story by looking up its name. if let Ok(agent_list) = agents.list_agents() { for info in agent_list { if info.story_id == story_id { let _ = agents .stop_agent(project_root, &story_id, &info.agent_name) .await; break; } } } // Release our claim (in case it wasn't fully overwritten). crdt_state::release_claim(&story_id); } } /// Reclaim work from nodes that have timed out (stale heartbeat). fn reclaim_timed_out_work(_project_root: &Path) { let Some(items) = crdt_state::read_all_items() else { return; }; let now = chrono::Utc::now().timestamp() as f64; for item in &items { if !matches!(item.stage.as_str(), "2_current" | "3_qa" | "4_merge") { continue; } // Check if the claim has timed out. if let Some(ref claimer) = item.claimed_by { if claimer.is_empty() { continue; } if let Some(claimed_at) = item.claimed_at && now - claimed_at >= CLAIM_TIMEOUT_SECS && !is_node_alive(claimer) { slog!( "[agent-mode] Reclaiming timed-out story '{}' from dead node {:.12}…", item.story_id, claimer ); crdt_state::release_claim(&item.story_id); } } } } /// Check if a node is alive according to the CRDT nodes list. fn is_node_alive(node_id: &str) -> bool { let Some(nodes) = crdt_state::read_all_node_presence() else { return false; }; let now = chrono::Utc::now().timestamp() as f64; for node in &nodes { if node.node_id == node_id { // Node is considered alive if it's marked alive AND its heartbeat // is within the timeout window. return node.alive && (now - node.last_seen) < CLAIM_TIMEOUT_SECS; } } false } /// Check for completed agents, push their feature branches to the remote, /// and report completion via CRDT. async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) { let Ok(agent_list) = agents.list_agents() else { return; }; for info in agent_list { if !matches!( info.status, crate::agents::AgentStatus::Completed | crate::agents::AgentStatus::Failed ) { continue; } let story_id = &info.story_id; // Only push if this node still owns the claim. if !crdt_state::is_claimed_by_us(story_id) { continue; } slog!( "[agent-mode] Agent {} for '{}'; pushing feature branch.", if matches!(info.status, crate::agents::AgentStatus::Completed) { "completed" } else { "failed" }, story_id ); // Push the feature branch to the remote. if let Some(ref wt) = info.worktree_path { let push_result = push_feature_branch(wt, story_id); match push_result { Ok(()) => { slog!("[agent-mode] Pushed feature branch for '{story_id}' to remote."); } Err(e) => { slog!("[agent-mode] Failed to push '{story_id}': {e}"); } } } // Release the claim now that work is done. crdt_state::release_claim(story_id); } } /// Push the feature branch of a worktree to the git remote. fn push_feature_branch(worktree_path: &str, story_id: &str) -> Result<(), String> { let branch = format!("feature/story-{story_id}"); // Try to push to 'origin'. If origin doesn't exist, try the first remote. let output = std::process::Command::new("git") .args(["push", "origin", &branch]) .current_dir(worktree_path) .output() .map_err(|e| format!("Failed to run git push: {e}"))?; if output.status.success() { Ok(()) } else { let stderr = String::from_utf8_lossy(&output.stderr); // If 'origin' doesn't exist, try to find any remote. if stderr.contains("does not appear to be a git repository") || stderr.contains("No such remote") { let remotes = std::process::Command::new("git") .args(["remote"]) .current_dir(worktree_path) .output() .map_err(|e| format!("Failed to list remotes: {e}"))?; let remote_list = String::from_utf8_lossy(&remotes.stdout); let first_remote = remote_list.lines().next(); if let Some(remote) = first_remote { let retry = std::process::Command::new("git") .args(["push", remote.trim(), &branch]) .current_dir(worktree_path) .output() .map_err(|e| format!("Failed to push to {remote}: {e}"))?; if retry.status.success() { return Ok(()); } return Err(format!( "git push to '{remote}' failed: {}", String::from_utf8_lossy(&retry.stderr) )); } // No remotes configured — not an error in agent mode, just skip. slog!("[agent-mode] No git remote configured; skipping push for '{story_id}'."); Ok(()) } else { Err(format!("git push failed: {stderr}")) } } } // ── Gateway registration ────────────────────────────────────────────────── /// Register this build agent with a gateway using a one-time join token. /// /// POSTs `{ token, label, address }` to `{gateway_url}/gateway/register`. On /// success the gateway stores the agent and it will appear in the gateway UI. async fn register_with_gateway(gateway_url: &str, token: &str, label: &str, address: &str) { let client = reqwest::Client::new(); let url = format!("{}/gateway/register", gateway_url.trim_end_matches('/')); let body = serde_json::json!({ "token": token, "label": label, "address": address, }); match client.post(&url).json(&body).send().await { Ok(resp) if resp.status().is_success() => { slog!("[agent-mode] Registered with gateway at {gateway_url}"); } Ok(resp) => { slog!( "[agent-mode] Gateway registration failed: HTTP {}", resp.status() ); } Err(e) => { slog!("[agent-mode] Gateway registration error: {e}"); } } } // ── Tests ──────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; #[test] fn is_node_alive_returns_false_for_unknown_node() { // Without CRDT init, should return false. assert!(!is_node_alive("nonexistent_node_id")); } #[test] fn push_feature_branch_handles_missing_worktree() { let result = push_feature_branch("/nonexistent/path", "test_story"); assert!(result.is_err()); } #[test] fn claim_timeout_is_ten_minutes() { assert_eq!(CLAIM_TIMEOUT_SECS, 600.0); } }