diff --git a/server/src/agent_mode.rs b/server/src/agent_mode.rs deleted file mode 100644 index db7dc1f2..00000000 --- a/server/src/agent_mode.rs +++ /dev/null @@ -1,929 +0,0 @@ -//! Headless build-agent mode for distributed, rendezvous-based story processing. -/// Headless build agent mode. -/// -/// When invoked via `huskies agent --rendezvous ws://host:3001/crdt-sync`, this -/// module runs a headless loop that: -/// -/// 1. Syncs CRDT state with the rendezvous peer. -/// 2. Writes a heartbeat to the CRDT `nodes` list. -/// 3. Scans for unclaimed stories in `2_current` and claims them via CRDT. -/// 4. Spawns Claude Code locally for each claimed story. -/// 5. Pushes the feature branch to the git remote when done. -/// 6. Reports completion by advancing the story stage via CRDT. -/// 7. Handles offline/reconnect: CRDT merges on reconnect, interrupted work -/// is reclaimed after a timeout. -/// -/// A minimal HTTP server is started on the agent's port to serve the -/// `/crdt-sync` WebSocket endpoint, enabling other agents to connect for -/// peer mesh discovery. -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use tokio::sync::broadcast; - -use poem::EndpointExt as _; - -use crate::agents::AgentPool; -use crate::config::ProjectConfig; -use crate::crdt_state; -use crate::io::watcher; -use crate::mesh; -use crate::slog; - -/// Default claim TTL in seconds. If a claim has not been refreshed within this -/// window, other nodes may displace the stale holder and claim the story. -/// A node actively working on a story should refresh its claim periodically. -pub(crate) const CLAIM_TIMEOUT_SECS: f64 = 1800.0; // 30 minutes - -// ── Hash-based tie-break ────────────────────────────────────────────────── - -/// Compute the claim-priority hash for a `(node_id, story_id)` pair. -/// -/// Uses SHA-256(`node_id` bytes ++ `story_id` bytes), truncated to the first -/// 8 bytes interpreted as a big-endian `u64`. This function is: -/// -/// * **Deterministic** — same inputs always produce the same output. -/// * **Stable across restarts** — depends only on the node's persistent id -/// and the story id, not on wall-clock time or random state. -/// * **Cross-implementation portable** — SHA-256 is a standard primitive; any -/// conforming implementation will produce identical values. -fn claim_hash(node_id: &str, story_id: &str) -> u64 { - use sha2::{Digest, Sha256}; - let mut hasher = Sha256::new(); - hasher.update(node_id.as_bytes()); - hasher.update(story_id.as_bytes()); - let digest = hasher.finalize(); - u64::from_be_bytes(digest[..8].try_into().expect("sha256 is 32 bytes")) -} - -/// Decide whether this node should be the one to claim `story_id`. -/// -/// Returns `true` iff `claim_hash(self_node_id, story_id)` is **strictly -/// lower** than the hash of every alive peer. When there are no alive peers -/// (single-node cluster) the result is always `true`. -/// -/// # Trade-off note -/// Because the winning node is determined purely by the hash of its id and the -/// story id, the distribution is uniform per story but a given node may -/// consistently "win" or "lose" across a set of stories depending on how its -/// id happens to hash. For 2–5 node clusters this imbalance is negligible in -/// practice: any node is the lowest-hash winner with probability ≈ 1/N for a -/// random story id, so the long-run distribution is approximately fair. For -/// clusters with many nodes (e.g. >10) the expected variance is larger and -/// operators may want a different work-distribution strategy. -pub fn should_self_claim( - self_node_id: &str, - story_id: &str, - alive_peer_node_ids: &[String], -) -> bool { - let my_hash = claim_hash(self_node_id, story_id); - for peer_id in alive_peer_node_ids { - // Skip self if it appears in the peer list. - if peer_id == self_node_id { - continue; - } - if claim_hash(peer_id, story_id) <= my_hash { - return false; - } - } - true -} - -/// Interval between heartbeat writes and work scans. -pub const SCAN_INTERVAL_SECS: u64 = 15; - -/// Run the headless build agent loop. -/// -/// This function never returns under normal operation — it runs until the -/// process is terminated (SIGINT/SIGTERM). -/// -/// If `join_token` and `gateway_url` are both provided the agent will register -/// itself with the gateway on startup using the one-time token. -pub async fn run( - project_root: Option, - rendezvous_url: String, - port: u16, - join_token: Option, - gateway_url: Option, -) -> Result<(), std::io::Error> { - let project_root = match project_root { - Some(r) => r, - None => { - eprintln!("error: agent mode requires a project root (no .huskies/ found)"); - std::process::exit(1); - } - }; - - println!("\x1b[96;1m[agent-mode]\x1b[0m Starting headless build agent"); - println!("\x1b[96;1m[agent-mode]\x1b[0m Rendezvous: {rendezvous_url}"); - println!( - "\x1b[96;1m[agent-mode]\x1b[0m Project: {}", - project_root.display() - ); - - // Validate project config. - let config = ProjectConfig::load(&project_root).unwrap_or_else(|e| { - eprintln!("error: invalid project config: {e}"); - std::process::exit(1); - }); - slog!( - "[agent-mode] Loaded config with {} agents", - config.agent.len() - ); - - // Event bus for pipeline lifecycle events. - let (watcher_tx, _) = broadcast::channel::(1024); - let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); - - // Start filesystem watcher for config hot-reload. - watcher::start_watcher(project_root.clone(), watcher_tx.clone()); - - // Bridge CRDT events to watcher channel (same as main server). - { - let crdt_watcher_tx = watcher_tx.clone(); - let crdt_prune_root = Some(project_root.clone()); - if let Some(mut crdt_rx) = crdt_state::subscribe() { - tokio::spawn(async move { - while let Ok(evt) = crdt_rx.recv().await { - if crate::pipeline_state::Stage::from_dir(&evt.to_stage) - .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) - && let Some(root) = crdt_prune_root.as_ref().cloned() - { - let story_id = evt.story_id.clone(); - tokio::task::spawn_blocking(move || { - if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) { - slog!("[agent-mode] worktree prune failed for {story_id}: {e}"); - } - }); - } - let (action, commit_msg) = - watcher::stage_metadata(&evt.to_stage, &evt.story_id) - .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); - let watcher_evt = watcher::WatcherEvent::WorkItem { - stage: evt.to_stage, - item_id: evt.story_id, - action: action.to_string(), - commit_msg, - from_stage: evt.from_stage, - }; - let _ = crdt_watcher_tx.send(watcher_evt); - } - }); - } - } - - // Subscribe to watcher events to trigger auto-assign on stage transitions. - { - let auto_rx = watcher_tx.subscribe(); - let auto_agents = Arc::clone(&agents); - let auto_root = project_root.clone(); - tokio::spawn(async move { - let mut rx = auto_rx; - while let Ok(event) = rx.recv().await { - if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event - && crate::pipeline_state::Stage::from_dir(stage.as_str()) - .is_some_and(|s| s.is_active()) - { - slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign."); - auto_agents.auto_assign_available_work(&auto_root).await; - } - } - }); - } - - // ── Start minimal HTTP server for /crdt-sync endpoint ───────────── - // - // Other agents discover this endpoint via the CRDT `nodes` list and - // open supplementary mesh connections for resilience. - { - let sync_handler = poem::get(crate::crdt_sync::crdt_sync_handler); - - // Build a minimal AppContext for the crdt_sync_handler (the handler - // receives it via Data<> but doesn't use it — the underscore prefix - // on `_ctx` confirms this). - let agent_ctx = build_agent_app_context(&project_root, port, watcher_tx.clone()); - let agent_ctx_arc = Arc::new(agent_ctx); - - let app = poem::Route::new() - .at("/crdt-sync", sync_handler) - .data(agent_ctx_arc); - - let bind_addr = format!("0.0.0.0:{port}"); - slog!("[agent-mode] Starting /crdt-sync endpoint on {bind_addr}"); - tokio::spawn(async move { - if let Err(e) = poem::Server::new(poem::listener::TcpListener::bind(&bind_addr)) - .run(app) - .await - { - slog!("[agent-mode] HTTP server error: {e}"); - } - }); - } - - // Write initial heartbeat. - write_heartbeat(&rendezvous_url, port); - - // Register with gateway if a join token and gateway URL were provided. - if let (Some(token), Some(url)) = (join_token.clone(), gateway_url) { - let node_id = crdt_state::our_node_id().unwrap_or_else(|| "unknown".to_string()); - let label = format!("build-agent-{}", &node_id[..node_id.len().min(8)]); - let address = format!("ws://0.0.0.0:{port}/crdt-sync"); - register_with_gateway(&url, &token, &label, &address).await; - } - - // ── Mesh peer discovery ────────────────────────���─────────────────── - // - // Periodically read the CRDT `nodes` list and open supplementary sync - // connections to alive peers. The primary rendezvous connection remains - // canonical; mesh connections are supplementary and don't block startup. - let _mesh_handle = { - let our_node_id = crdt_state::our_node_id().unwrap_or_default(); - let max_mesh_peers = config.max_mesh_peers; - mesh::spawn_mesh_discovery( - max_mesh_peers, - our_node_id, - rendezvous_url.clone(), - join_token, - ) - }; - - // Reconcile any committed work from a previous session. - { - let recon_agents = Arc::clone(&agents); - let recon_root = project_root.clone(); - let (recon_tx, _) = broadcast::channel(64); - slog!("[agent-mode] Reconciling completed worktrees from previous session."); - recon_agents - .reconcile_on_startup(&recon_root, &recon_tx) - .await; - } - - // Run initial auto-assign. - slog!("[agent-mode] Initial auto-assign scan."); - agents.auto_assign_available_work(&project_root).await; - - // Track which stories we've claimed so we can detect conflicts. - let mut our_claims: HashMap = HashMap::new(); - - // Main loop: heartbeat, scan, claim, detect conflicts. - let mut interval = tokio::time::interval(std::time::Duration::from_secs(SCAN_INTERVAL_SECS)); - loop { - interval.tick().await; - - // Write heartbeat. - write_heartbeat(&rendezvous_url, port); - - // Scan CRDT for claimable work. - scan_and_claim(&agents, &project_root, &mut our_claims).await; - - // Detect claim conflicts: if another node overwrote our claim, stop our agent. - detect_conflicts(&agents, &project_root, &mut our_claims).await; - - // Reclaim timed-out work from dead nodes. - reclaim_timed_out_work(&project_root); - - // Check for completed agents and push their branches. - check_completions_and_push(&agents, &project_root).await; - } -} - -/// Write this node's heartbeat to the CRDT `nodes` list. -fn write_heartbeat(rendezvous_url: &str, port: u16) { - let Some(node_id) = crdt_state::our_node_id() else { - return; - }; - let now = chrono::Utc::now().timestamp() as f64; - let now_ms = chrono::Utc::now().timestamp_millis() as f64; - // Advertise our crdt-sync endpoint. - let address = format!("ws://0.0.0.0:{port}/crdt-sync"); - crdt_state::write_node_presence(&node_id, &address, now, true); - // Write millisecond-precision timestamp via LWW register. - crdt_state::write_node_metadata(&node_id, "", None, now_ms); - slog!( - "[agent-mode] Heartbeat written: node={:.12}… rendezvous={rendezvous_url}", - &node_id - ); -} - -/// Scan CRDT pipeline for unclaimed stories and claim them. -async fn scan_and_claim( - agents: &AgentPool, - project_root: &Path, - our_claims: &mut HashMap, -) { - let Some(items) = crdt_state::read_all_items() else { - return; - }; - let Some(our_node) = crdt_state::our_node_id() else { - return; - }; - - for item in &items { - // Only claim stories in active stages. - if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) { - continue; - } - - // Skip blocked stories. - if item.blocked == Some(true) { - continue; - } - - // If already claimed by us, skip. - if item.claimed_by.as_deref() == Some(&our_node) { - continue; - } - - // If claimed by another node, respect the claim while it is fresh. - // Once the TTL expires the claim is considered stale regardless of - // whether the holder appears alive — displacement is purely TTL-driven. - if let Some(ref claimer) = item.claimed_by - && !claimer.is_empty() - && claimer != &our_node - && let Some(claimed_at) = item.claimed_at - { - let now = chrono::Utc::now().timestamp() as f64; - let age = now - claimed_at; - if age < CLAIM_TIMEOUT_SECS { - // Claim is still fresh — respect it. - continue; - } - // Claim TTL has expired: displace the stale holder. - slog!( - "[agent-mode] Displacing stale claim on '{}' held by {:.12}… \ - (age {}s > TTL {}s)", - item.story_id, - claimer, - age as u64, - CLAIM_TIMEOUT_SECS as u64, - ); - } - - // Pre-spawn hash-based tie-break: only the node whose - // SHA-256(node_id || story_id) is strictly lowest among all alive - // candidates should write the CRDT claim. This eliminates the - // thundering-herd of simultaneous LWW conflicts while keeping the - // existing LWW + reclaim-stale logic as a safety net for clock skew - // and partial alive-list views. - let alive_peers: Vec = crdt_state::read_all_node_presence() - .unwrap_or_default() - .into_iter() - .filter(|n| { - let now_ms = chrono::Utc::now().timestamp_millis() as f64; - let last_ms = n.last_seen_ms.unwrap_or(n.last_seen * 1000.0); - n.alive && (now_ms - last_ms) / 1000.0 < CLAIM_TIMEOUT_SECS - }) - .map(|n| n.node_id) - .collect(); - if !should_self_claim(&our_node, &item.story_id, &alive_peers) { - slog!( - "[agent-mode] Hash tie-break: deferring claim on '{}' to lower-hash peer", - item.story_id - ); - continue; - } - - // Try to claim this story. - slog!( - "[agent-mode] Claiming story '{}' for this node", - item.story_id - ); - if crdt_state::write_claim(&item.story_id) { - let now = chrono::Utc::now().timestamp() as f64; - our_claims.insert(item.story_id.clone(), now); - } - } - - // Trigger auto-assign to start agents for newly claimed work. - agents.auto_assign_available_work(project_root).await; -} - -/// Detect if another node overwrote our claims (CRDT conflict resolution). -/// If so, stop our local agent for that story. -async fn detect_conflicts( - agents: &AgentPool, - project_root: &Path, - our_claims: &mut HashMap, -) { - let lost: Vec = our_claims - .keys() - .filter(|story_id| !crdt_state::is_claimed_by_us(story_id)) - .cloned() - .collect(); - - for story_id in lost { - slog!( - "[agent-mode] Lost claim on '{}' to another node; stopping local agent.", - story_id - ); - our_claims.remove(&story_id); - - // Stop any local agent for this story by looking up its name. - if let Ok(agent_list) = agents.list_agents() { - for info in agent_list { - if info.story_id == story_id { - let _ = agents - .stop_agent(project_root, &story_id, &info.agent_name) - .await; - break; - } - } - } - - // Release our claim (in case it wasn't fully overwritten). - crdt_state::release_claim(&story_id); - } -} - -/// Reclaim work from nodes that have timed out (stale heartbeat). -fn reclaim_timed_out_work(_project_root: &Path) { - let Some(items) = crdt_state::read_all_items() else { - return; - }; - let now = chrono::Utc::now().timestamp() as f64; - - for item in &items { - if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) { - continue; - } - - // Release the claim if the TTL has expired — regardless of whether the - // holder is still alive. A node actively working should refresh its - // claim before the TTL window closes. - if let Some(ref claimer) = item.claimed_by { - if claimer.is_empty() { - continue; - } - if let Some(claimed_at) = item.claimed_at - && now - claimed_at >= CLAIM_TIMEOUT_SECS - { - slog!( - "[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)", - item.story_id, - claimer, - (now - claimed_at) as u64, - ); - crdt_state::release_claim(&item.story_id); - } - } - } -} - -/// Check for completed agents, push their feature branches to the remote, -/// and report completion via CRDT. -async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) { - let Ok(agent_list) = agents.list_agents() else { - return; - }; - - for info in agent_list { - if !matches!( - info.status, - crate::agents::AgentStatus::Completed | crate::agents::AgentStatus::Failed - ) { - continue; - } - - let story_id = &info.story_id; - - // Only push if this node still owns the claim. - if !crdt_state::is_claimed_by_us(story_id) { - continue; - } - - slog!( - "[agent-mode] Agent {} for '{}'; pushing feature branch.", - if matches!(info.status, crate::agents::AgentStatus::Completed) { - "completed" - } else { - "failed" - }, - story_id - ); - - // Push the feature branch to the remote. - if let Some(ref wt) = info.worktree_path { - let push_result = push_feature_branch(wt, story_id); - match push_result { - Ok(()) => { - slog!("[agent-mode] Pushed feature branch for '{story_id}' to remote."); - } - Err(e) => { - slog!("[agent-mode] Failed to push '{story_id}': {e}"); - } - } - } - - // Release the claim now that work is done. - crdt_state::release_claim(story_id); - } -} - -/// Push the feature branch of a worktree to the git remote. -fn push_feature_branch(worktree_path: &str, story_id: &str) -> Result<(), String> { - let branch = format!("feature/story-{story_id}"); - - // Try to push to 'origin'. If origin doesn't exist, try the first remote. - let output = std::process::Command::new("git") - .args(["push", "origin", &branch]) - .current_dir(worktree_path) - .output() - .map_err(|e| format!("Failed to run git push: {e}"))?; - - if output.status.success() { - Ok(()) - } else { - let stderr = String::from_utf8_lossy(&output.stderr); - // If 'origin' doesn't exist, try to find any remote. - if stderr.contains("does not appear to be a git repository") - || stderr.contains("No such remote") - { - let remotes = std::process::Command::new("git") - .args(["remote"]) - .current_dir(worktree_path) - .output() - .map_err(|e| format!("Failed to list remotes: {e}"))?; - - let remote_list = String::from_utf8_lossy(&remotes.stdout); - let first_remote = remote_list.lines().next(); - - if let Some(remote) = first_remote { - let retry = std::process::Command::new("git") - .args(["push", remote.trim(), &branch]) - .current_dir(worktree_path) - .output() - .map_err(|e| format!("Failed to push to {remote}: {e}"))?; - - if retry.status.success() { - return Ok(()); - } - return Err(format!( - "git push to '{remote}' failed: {}", - String::from_utf8_lossy(&retry.stderr) - )); - } - - // No remotes configured — not an error in agent mode, just skip. - slog!("[agent-mode] No git remote configured; skipping push for '{story_id}'."); - Ok(()) - } else { - Err(format!("git push failed: {stderr}")) - } - } -} - -// ── Gateway registration ────────────────────────────────────────────────── - -/// Register this build agent with a gateway using a one-time join token. -/// -/// POSTs `{ token, label, address }` to `{gateway_url}/gateway/register`. On -/// success the gateway stores the agent and it will appear in the gateway UI. -async fn register_with_gateway(gateway_url: &str, token: &str, label: &str, address: &str) { - let client = reqwest::Client::new(); - let url = format!("{}/gateway/register", gateway_url.trim_end_matches('/')); - let body = serde_json::json!({ - "token": token, - "label": label, - "address": address, - }); - match client.post(&url).json(&body).send().await { - Ok(resp) if resp.status().is_success() => { - slog!("[agent-mode] Registered with gateway at {gateway_url}"); - } - Ok(resp) => { - slog!( - "[agent-mode] Gateway registration failed: HTTP {}", - resp.status() - ); - } - Err(e) => { - slog!("[agent-mode] Gateway registration error: {e}"); - } - } -} - -/// Build a minimal [`AppContext`] for the agent-mode HTTP server. -/// -/// The `/crdt-sync` handler receives `Data<&Arc>` but doesn't -/// actually use it (the parameter is named `_ctx`). We construct a -/// lightweight context with just enough state to satisfy Poem's data -/// extractor. -fn build_agent_app_context( - project_root: &Path, - port: u16, - watcher_tx: broadcast::Sender, -) -> crate::http::context::AppContext { - let state = crate::state::SessionState::default(); - *state.project_root.lock().unwrap() = Some(project_root.to_path_buf()); - let store_path = project_root.join(".huskies").join("store.json"); - let store = Arc::new( - crate::store::JsonFileStore::from_path(store_path) - .unwrap_or_else(|e| panic!("Failed to open store: {e}")), - ); - let (reconciliation_tx, _) = broadcast::channel(64); - let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); - let timer_store = Arc::new(crate::service::timer::TimerStore::load( - project_root.join(".huskies").join("timers.json"), - )); - let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); - let services = Arc::new(crate::services::Services { - project_root: project_root.to_path_buf(), - agents: Arc::clone(&agents), - bot_name: "Agent".to_string(), - bot_user_id: String::new(), - ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), - perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), - pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), - permission_timeout_secs: 120, - status: agents.status_broadcaster(), - }); - crate::http::context::AppContext { - state: Arc::new(state), - store, - workflow: Arc::new(std::sync::Mutex::new( - crate::workflow::WorkflowState::default(), - )), - services, - watcher_tx, - reconciliation_tx, - perm_tx, - qa_app_process: Arc::new(std::sync::Mutex::new(None)), - bot_shutdown: None, - matrix_shutdown_tx: None, - timer_store, - } -} - -// ── Tests ──────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn push_feature_branch_handles_missing_worktree() { - let result = push_feature_branch("/nonexistent/path", "test_story"); - assert!(result.is_err()); - } - - #[test] - fn claim_timeout_is_thirty_minutes() { - assert_eq!(CLAIM_TIMEOUT_SECS, 1800.0); - } - - /// AC: seed a stale claim older than the TTL, attempt a new claim from a - /// different agent, assert the new claim succeeds and displacement is logged. - #[test] - fn stale_claim_displaced_and_logged() { - use crate::crdt_state::{init_for_test, our_node_id, read_item, write_claim, write_item}; - - init_for_test(); - - let story_id = "718_test_stale_displacement"; - let stale_holder = "staledeadbeef0000000000000000000000000000"; - // Place claimed_at well beyond the TTL so the claim is unambiguously stale. - let stale_time = chrono::Utc::now().timestamp() as f64 - CLAIM_TIMEOUT_SECS - 300.0; - - // Seed the story with a stale claim from a foreign node. - write_item( - story_id, - "2_current", - Some("Stale Claim Displacement Test"), - None, - None, - None, - None, - Some(stale_holder), - Some(stale_time), - None, - ); - - // Confirm the stale claim is in place. - let before = read_item(story_id).expect("item should exist"); - assert_eq!( - before.claimed_by.as_deref(), - Some(stale_holder), - "pre-condition: item should be claimed by the stale holder" - ); - let age = chrono::Utc::now().timestamp() as f64 - before.claimed_at.unwrap_or(0.0); - assert!( - age >= CLAIM_TIMEOUT_SECS, - "pre-condition: claim age ({age}s) must exceed TTL ({CLAIM_TIMEOUT_SECS}s)" - ); - - // Log the displacement (this is what scan_and_claim does before write_claim). - crate::slog!( - "[agent-mode] Displacing stale claim on '{}' held by {:.12}… \ - (age {}s > TTL {}s)", - story_id, - stale_holder, - age as u64, - CLAIM_TIMEOUT_SECS as u64, - ); - - // The new agent writes its claim, overwriting the stale one via LWW. - let success = write_claim(story_id); - assert!( - success, - "write_claim must succeed for a story with a stale claim" - ); - - // Verify the new claim belongs to this node, not the stale holder. - let our_id = our_node_id().expect("node id should be available after init_for_test"); - let after = read_item(story_id).expect("item should still exist"); - assert_eq!( - after.claimed_by.as_deref(), - Some(our_id.as_str()), - "new claim should have displaced the stale holder" - ); - assert_ne!( - after.claimed_by.as_deref(), - Some(stale_holder), - "stale holder must no longer own the claim" - ); - - // Verify the displacement was logged. - let logs = - crate::log_buffer::global().get_recent(100, Some("Displacing stale claim"), None); - assert!( - !logs.is_empty(), - "displacement must be written to the server log" - ); - let last_log = logs.last().unwrap(); - assert!( - last_log.contains(story_id), - "log entry must name the story; got: {last_log}" - ); - assert!( - last_log.contains(&stale_holder[..12]), - "log entry must include the stale holder's id prefix; got: {last_log}" - ); - } - - // ── should_self_claim unit tests ────────────────────────────────────── - - /// AC1 + AC6: single-node cluster always claims (no peers → trivially lowest). - #[test] - fn should_self_claim_single_node_always_claims() { - assert!(should_self_claim("node-a", "story-1", &[])); - assert!(should_self_claim("node-a", "story-2", &[])); - assert!(should_self_claim("any-node", "any-story", &[])); - } - - /// AC1: self wins when its hash is strictly lower than a peer's hash. - /// We compute the actual hashes to construct a deterministic test. - #[test] - fn should_self_claim_lower_hash_wins() { - let self_id = "node-alpha"; - let peer_id = "node-beta"; - let story_id = "99_story_test"; - - let self_hash = claim_hash(self_id, story_id); - let peer_hash = claim_hash(peer_id, story_id); - - let result = should_self_claim(self_id, story_id, &[peer_id.to_string()]); - // Result must agree with the actual hash comparison. - assert_eq!(result, self_hash < peer_hash); - } - - /// AC1: self loses when a peer has a strictly lower hash. - #[test] - fn should_self_claim_higher_hash_loses() { - let self_id = "node-beta"; - let peer_id = "node-alpha"; - let story_id = "99_story_test"; - - let self_hash = claim_hash(self_id, story_id); - let peer_hash = claim_hash(peer_id, story_id); - - let result = should_self_claim(self_id, story_id, &[peer_id.to_string()]); - assert_eq!(result, self_hash < peer_hash); - } - - /// AC2: hash is stable — calling with the same inputs always returns the same result. - #[test] - fn claim_hash_is_deterministic() { - let h1 = claim_hash("stable-node", "stable-story"); - let h2 = claim_hash("stable-node", "stable-story"); - assert_eq!(h1, h2); - } - - /// AC2: SHA-256("node-a" ++ "story-1") first 8 bytes == known constant. - /// This pins the exact hash output so regressions are caught immediately. - #[test] - fn claim_hash_known_value() { - // sha256("node-astory-1") first 8 bytes, big-endian u64. - // Pre-computed: echo -n "node-astory-1" | sha256sum - // = 5c1e7c8e7d9f1a3b... - // We verify by round-tripping: compute once and assert stability. - let h = claim_hash("node-a", "story-1"); - assert_eq!(claim_hash("node-a", "story-1"), h, "hash must be stable"); - // The value is non-zero (sanity check). - assert_ne!(h, 0, "hash should not be zero"); - } - - /// AC1: self appears in peer list (shouldn't happen in practice but must - /// be handled correctly — self entry is skipped, so it still wins if it's - /// the only entry). - #[test] - fn should_self_claim_ignores_self_in_peer_list() { - let node_id = "node-solo"; - let story_id = "42_story_x"; - // Self appears in peer list — must be ignored so result is true. - assert!(should_self_claim(node_id, story_id, &[node_id.to_string()])); - } - - /// AC5: integration test — two nodes, deterministic in both orders. - /// - /// Both "node-left" and "node-right" independently evaluate - /// `should_self_claim`. Exactly one must return `true`. The winner must - /// be the same regardless of which node's perspective we evaluate first. - #[test] - fn two_nodes_exactly_one_wins_deterministically() { - let node_a = "node-left"; - let node_b = "node-right"; - let story = "100_story_contested"; - - let a_claims = should_self_claim(node_a, story, &[node_b.to_string()]); - let b_claims = should_self_claim(node_b, story, &[node_a.to_string()]); - - // Exactly one must win. - assert_ne!( - a_claims, b_claims, - "exactly one of the two nodes must win the tie-break" - ); - - // Result is stable: re-evaluating in the opposite order gives the same winner. - let a_again = should_self_claim(node_a, story, &[node_b.to_string()]); - let b_again = should_self_claim(node_b, story, &[node_a.to_string()]); - assert_eq!( - a_claims, a_again, - "should_self_claim must be deterministic for node_a" - ); - assert_eq!( - b_claims, b_again, - "should_self_claim must be deterministic for node_b" - ); - } - - /// AC5: verify with multiple stories — each story has exactly one winner. - #[test] - fn two_nodes_each_story_has_exactly_one_winner() { - let node_a = "build-agent-aabbcc"; - let node_b = "build-agent-ddeeff"; - let stories = [ - "1_story_alpha", - "2_story_beta", - "3_story_gamma", - "4_story_delta", - "5_story_epsilon", - ]; - - for story in &stories { - let a_wins = should_self_claim(node_a, story, &[node_b.to_string()]); - let b_wins = should_self_claim(node_b, story, &[node_a.to_string()]); - assert_ne!( - a_wins, b_wins, - "story '{story}': exactly one node must win, got a={a_wins} b={b_wins}" - ); - } - } - - // ── Mesh discovery integration tests ──────────────────────────────── - - /// AC7 (mesh storm cap): With 6 alive peers, the MeshManager enforces a - /// cap of 3 connections. We simulate the scenario by pre-populating the - /// connections map and verifying reconcile() respects the max_peers limit. - #[tokio::test] - async fn mesh_storm_cap_six_peers_max_three() { - let mut mgr = mesh::MeshManager::new( - 3, // max 3 mesh connections - "agent-self".to_string(), - "ws://server:3001/crdt-sync".to_string(), - None, - ); - - // Simulate 6 peer connections (long-running tasks). - let peer_ids: Vec = (0..6).map(|i| format!("peer-{i}")).collect(); - for id in &peer_ids { - let handle = tokio::spawn(async { - tokio::time::sleep(std::time::Duration::from_secs(3600)).await; - }); - mgr.connections.insert(id.clone(), handle); - } - - assert_eq!(mgr.active_count(), 6); - - // reconcile() with no CRDT nodes drops all connections (they're not in - // the alive set), demonstrating the lifecycle cleanup. - mgr.reconcile(); - assert_eq!(mgr.active_count(), 0, "all unknown peers should be dropped"); - } - - /// AC8 (connection lifecycle): default max_mesh_peers is 3. - #[test] - fn default_max_mesh_peers_is_three() { - let config = ProjectConfig::default(); - assert_eq!(config.max_mesh_peers, 3); - } -} diff --git a/server/src/agent_mode/claim.rs b/server/src/agent_mode/claim.rs new file mode 100644 index 00000000..8f86aa37 --- /dev/null +++ b/server/src/agent_mode/claim.rs @@ -0,0 +1,293 @@ +//! Claim ownership logic: deterministic hash-based tie-breaking and TTL constants. + +/// Default claim TTL in seconds. If a claim has not been refreshed within this +/// window, other nodes may displace the stale holder and claim the story. +/// A node actively working on a story should refresh its claim periodically. +pub(crate) const CLAIM_TIMEOUT_SECS: f64 = 1800.0; // 30 minutes + +/// Interval between heartbeat writes and work scans. +pub const SCAN_INTERVAL_SECS: u64 = 15; + +// ── Hash-based tie-break ────────────────────────────────────────────────── + +/// Compute the claim-priority hash for a `(node_id, story_id)` pair. +/// +/// Uses SHA-256(`node_id` bytes ++ `story_id` bytes), truncated to the first +/// 8 bytes interpreted as a big-endian `u64`. This function is: +/// +/// * **Deterministic** — same inputs always produce the same output. +/// * **Stable across restarts** — depends only on the node's persistent id +/// and the story id, not on wall-clock time or random state. +/// * **Cross-implementation portable** — SHA-256 is a standard primitive; any +/// conforming implementation will produce identical values. +pub(super) fn claim_hash(node_id: &str, story_id: &str) -> u64 { + use sha2::{Digest, Sha256}; + let mut hasher = Sha256::new(); + hasher.update(node_id.as_bytes()); + hasher.update(story_id.as_bytes()); + let digest = hasher.finalize(); + u64::from_be_bytes(digest[..8].try_into().expect("sha256 is 32 bytes")) +} + +/// Decide whether this node should be the one to claim `story_id`. +/// +/// Returns `true` iff `claim_hash(self_node_id, story_id)` is **strictly +/// lower** than the hash of every alive peer. When there are no alive peers +/// (single-node cluster) the result is always `true`. +/// +/// # Trade-off note +/// Because the winning node is determined purely by the hash of its id and the +/// story id, the distribution is uniform per story but a given node may +/// consistently "win" or "lose" across a set of stories depending on how its +/// id happens to hash. For 2–5 node clusters this imbalance is negligible in +/// practice: any node is the lowest-hash winner with probability ≈ 1/N for a +/// random story id, so the long-run distribution is approximately fair. For +/// clusters with many nodes (e.g. >10) the expected variance is larger and +/// operators may want a different work-distribution strategy. +pub fn should_self_claim( + self_node_id: &str, + story_id: &str, + alive_peer_node_ids: &[String], +) -> bool { + let my_hash = claim_hash(self_node_id, story_id); + for peer_id in alive_peer_node_ids { + // Skip self if it appears in the peer list. + if peer_id == self_node_id { + continue; + } + if claim_hash(peer_id, story_id) <= my_hash { + return false; + } + } + true +} + +// ── Tests ──────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn claim_timeout_is_thirty_minutes() { + assert_eq!(CLAIM_TIMEOUT_SECS, 1800.0); + } + + /// AC: seed a stale claim older than the TTL, attempt a new claim from a + /// different agent, assert the new claim succeeds and displacement is logged. + #[test] + fn stale_claim_displaced_and_logged() { + use crate::crdt_state::{init_for_test, our_node_id, read_item, write_claim, write_item}; + + init_for_test(); + + let story_id = "718_test_stale_displacement"; + let stale_holder = "staledeadbeef0000000000000000000000000000"; + // Place claimed_at well beyond the TTL so the claim is unambiguously stale. + let stale_time = chrono::Utc::now().timestamp() as f64 - CLAIM_TIMEOUT_SECS - 300.0; + + // Seed the story with a stale claim from a foreign node. + write_item( + story_id, + "2_current", + Some("Stale Claim Displacement Test"), + None, + None, + None, + None, + Some(stale_holder), + Some(stale_time), + None, + ); + + // Confirm the stale claim is in place. + let before = read_item(story_id).expect("item should exist"); + assert_eq!( + before.claimed_by.as_deref(), + Some(stale_holder), + "pre-condition: item should be claimed by the stale holder" + ); + let age = chrono::Utc::now().timestamp() as f64 - before.claimed_at.unwrap_or(0.0); + assert!( + age >= CLAIM_TIMEOUT_SECS, + "pre-condition: claim age ({age}s) must exceed TTL ({CLAIM_TIMEOUT_SECS}s)" + ); + + // Log the displacement (this is what scan_and_claim does before write_claim). + crate::slog!( + "[agent-mode] Displacing stale claim on '{}' held by {:.12}… \ + (age {}s > TTL {}s)", + story_id, + stale_holder, + age as u64, + CLAIM_TIMEOUT_SECS as u64, + ); + + // The new agent writes its claim, overwriting the stale one via LWW. + let success = write_claim(story_id); + assert!( + success, + "write_claim must succeed for a story with a stale claim" + ); + + // Verify the new claim belongs to this node, not the stale holder. + let our_id = our_node_id().expect("node id should be available after init_for_test"); + let after = read_item(story_id).expect("item should still exist"); + assert_eq!( + after.claimed_by.as_deref(), + Some(our_id.as_str()), + "new claim should have displaced the stale holder" + ); + assert_ne!( + after.claimed_by.as_deref(), + Some(stale_holder), + "stale holder must no longer own the claim" + ); + + // Verify the displacement was logged. + let logs = + crate::log_buffer::global().get_recent(100, Some("Displacing stale claim"), None); + assert!( + !logs.is_empty(), + "displacement must be written to the server log" + ); + let last_log = logs.last().unwrap(); + assert!( + last_log.contains(story_id), + "log entry must name the story; got: {last_log}" + ); + assert!( + last_log.contains(&stale_holder[..12]), + "log entry must include the stale holder's id prefix; got: {last_log}" + ); + } + + // ── should_self_claim unit tests ────────────────────────────────────── + + /// AC1 + AC6: single-node cluster always claims (no peers → trivially lowest). + #[test] + fn should_self_claim_single_node_always_claims() { + assert!(should_self_claim("node-a", "story-1", &[])); + assert!(should_self_claim("node-a", "story-2", &[])); + assert!(should_self_claim("any-node", "any-story", &[])); + } + + /// AC1: self wins when its hash is strictly lower than a peer's hash. + /// We compute the actual hashes to construct a deterministic test. + #[test] + fn should_self_claim_lower_hash_wins() { + let self_id = "node-alpha"; + let peer_id = "node-beta"; + let story_id = "99_story_test"; + + let self_hash = claim_hash(self_id, story_id); + let peer_hash = claim_hash(peer_id, story_id); + + let result = should_self_claim(self_id, story_id, &[peer_id.to_string()]); + // Result must agree with the actual hash comparison. + assert_eq!(result, self_hash < peer_hash); + } + + /// AC1: self loses when a peer has a strictly lower hash. + #[test] + fn should_self_claim_higher_hash_loses() { + let self_id = "node-beta"; + let peer_id = "node-alpha"; + let story_id = "99_story_test"; + + let self_hash = claim_hash(self_id, story_id); + let peer_hash = claim_hash(peer_id, story_id); + + let result = should_self_claim(self_id, story_id, &[peer_id.to_string()]); + assert_eq!(result, self_hash < peer_hash); + } + + /// AC2: hash is stable — calling with the same inputs always returns the same result. + #[test] + fn claim_hash_is_deterministic() { + let h1 = claim_hash("stable-node", "stable-story"); + let h2 = claim_hash("stable-node", "stable-story"); + assert_eq!(h1, h2); + } + + /// AC2: SHA-256("node-a" ++ "story-1") first 8 bytes == known constant. + /// This pins the exact hash output so regressions are caught immediately. + #[test] + fn claim_hash_known_value() { + // sha256("node-astory-1") first 8 bytes, big-endian u64. + // Pre-computed: echo -n "node-astory-1" | sha256sum + // = 5c1e7c8e7d9f1a3b... + // We verify by round-tripping: compute once and assert stability. + let h = claim_hash("node-a", "story-1"); + assert_eq!(claim_hash("node-a", "story-1"), h, "hash must be stable"); + // The value is non-zero (sanity check). + assert_ne!(h, 0, "hash should not be zero"); + } + + /// AC1: self appears in peer list (shouldn't happen in practice but must + /// be handled correctly — self entry is skipped, so it still wins if it's + /// the only entry). + #[test] + fn should_self_claim_ignores_self_in_peer_list() { + let node_id = "node-solo"; + let story_id = "42_story_x"; + // Self appears in peer list — must be ignored so result is true. + assert!(should_self_claim(node_id, story_id, &[node_id.to_string()])); + } + + /// AC5: integration test — two nodes, deterministic in both orders. + /// + /// Both "node-left" and "node-right" independently evaluate + /// `should_self_claim`. Exactly one must return `true`. The winner must + /// be the same regardless of which node's perspective we evaluate first. + #[test] + fn two_nodes_exactly_one_wins_deterministically() { + let node_a = "node-left"; + let node_b = "node-right"; + let story = "100_story_contested"; + + let a_claims = should_self_claim(node_a, story, &[node_b.to_string()]); + let b_claims = should_self_claim(node_b, story, &[node_a.to_string()]); + + // Exactly one must win. + assert_ne!( + a_claims, b_claims, + "exactly one of the two nodes must win the tie-break" + ); + + // Result is stable: re-evaluating in the opposite order gives the same winner. + let a_again = should_self_claim(node_a, story, &[node_b.to_string()]); + let b_again = should_self_claim(node_b, story, &[node_a.to_string()]); + assert_eq!( + a_claims, a_again, + "should_self_claim must be deterministic for node_a" + ); + assert_eq!( + b_claims, b_again, + "should_self_claim must be deterministic for node_b" + ); + } + + /// AC5: verify with multiple stories — each story has exactly one winner. + #[test] + fn two_nodes_each_story_has_exactly_one_winner() { + let node_a = "build-agent-aabbcc"; + let node_b = "build-agent-ddeeff"; + let stories = [ + "1_story_alpha", + "2_story_beta", + "3_story_gamma", + "4_story_delta", + "5_story_epsilon", + ]; + + for story in &stories { + let a_wins = should_self_claim(node_a, story, &[node_b.to_string()]); + let b_wins = should_self_claim(node_b, story, &[node_a.to_string()]); + assert_ne!( + a_wins, b_wins, + "story '{story}': exactly one node must win, got a={a_wins} b={b_wins}" + ); + } + } +} diff --git a/server/src/agent_mode/context.rs b/server/src/agent_mode/context.rs new file mode 100644 index 00000000..a048dd0a --- /dev/null +++ b/server/src/agent_mode/context.rs @@ -0,0 +1,94 @@ +//! Agent-mode HTTP context construction and gateway registration. + +use std::path::Path; +use std::sync::Arc; +use tokio::sync::broadcast; + +use crate::agents::AgentPool; +use crate::io::watcher; +use crate::slog; + +/// Register this build agent with a gateway using a one-time join token. +/// +/// POSTs `{ token, label, address }` to `{gateway_url}/gateway/register`. On +/// success the gateway stores the agent and it will appear in the gateway UI. +pub(super) async fn register_with_gateway( + gateway_url: &str, + token: &str, + label: &str, + address: &str, +) { + let client = reqwest::Client::new(); + let url = format!("{}/gateway/register", gateway_url.trim_end_matches('/')); + let body = serde_json::json!({ + "token": token, + "label": label, + "address": address, + }); + match client.post(&url).json(&body).send().await { + Ok(resp) if resp.status().is_success() => { + slog!("[agent-mode] Registered with gateway at {gateway_url}"); + } + Ok(resp) => { + slog!( + "[agent-mode] Gateway registration failed: HTTP {}", + resp.status() + ); + } + Err(e) => { + slog!("[agent-mode] Gateway registration error: {e}"); + } + } +} + +/// Build a minimal [`AppContext`] for the agent-mode HTTP server. +/// +/// The `/crdt-sync` handler receives `Data<&Arc>` but doesn't +/// actually use it (the parameter is named `_ctx`). We construct a +/// lightweight context with just enough state to satisfy Poem's data +/// extractor. +pub(super) fn build_agent_app_context( + project_root: &Path, + port: u16, + watcher_tx: broadcast::Sender, +) -> crate::http::context::AppContext { + let state = crate::state::SessionState::default(); + *state.project_root.lock().unwrap() = Some(project_root.to_path_buf()); + let store_path = project_root.join(".huskies").join("store.json"); + let store = Arc::new( + crate::store::JsonFileStore::from_path(store_path) + .unwrap_or_else(|e| panic!("Failed to open store: {e}")), + ); + let (reconciliation_tx, _) = broadcast::channel(64); + let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); + let timer_store = Arc::new(crate::service::timer::TimerStore::load( + project_root.join(".huskies").join("timers.json"), + )); + let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); + let services = Arc::new(crate::services::Services { + project_root: project_root.to_path_buf(), + agents: Arc::clone(&agents), + bot_name: "Agent".to_string(), + bot_user_id: String::new(), + ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())), + perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), + pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())), + permission_timeout_secs: 120, + status: agents.status_broadcaster(), + }); + crate::http::context::AppContext { + state: Arc::new(state), + store, + workflow: Arc::new(std::sync::Mutex::new( + crate::workflow::WorkflowState::default(), + )), + services, + watcher_tx, + reconciliation_tx, + perm_tx, + qa_app_process: Arc::new(std::sync::Mutex::new(None)), + bot_shutdown: None, + matrix_shutdown_tx: None, + timer_store, + } +} diff --git a/server/src/agent_mode/loop_ops.rs b/server/src/agent_mode/loop_ops.rs new file mode 100644 index 00000000..bffa2d1e --- /dev/null +++ b/server/src/agent_mode/loop_ops.rs @@ -0,0 +1,308 @@ +//! Main-loop operations: heartbeat, claim scanning, conflict detection, and branch pushing. + +use std::collections::HashMap; +use std::path::Path; + +use crate::agents::AgentPool; +use crate::crdt_state; +use crate::slog; + +use super::claim::{CLAIM_TIMEOUT_SECS, should_self_claim}; + +/// Write this node's heartbeat to the CRDT `nodes` list. +pub(super) fn write_heartbeat(rendezvous_url: &str, port: u16) { + let Some(node_id) = crdt_state::our_node_id() else { + return; + }; + let now = chrono::Utc::now().timestamp() as f64; + let now_ms = chrono::Utc::now().timestamp_millis() as f64; + // Advertise our crdt-sync endpoint. + let address = format!("ws://0.0.0.0:{port}/crdt-sync"); + crdt_state::write_node_presence(&node_id, &address, now, true); + // Write millisecond-precision timestamp via LWW register. + crdt_state::write_node_metadata(&node_id, "", None, now_ms); + slog!( + "[agent-mode] Heartbeat written: node={:.12}… rendezvous={rendezvous_url}", + &node_id + ); +} + +/// Scan CRDT pipeline for unclaimed stories and claim them. +pub(super) async fn scan_and_claim( + agents: &AgentPool, + project_root: &Path, + our_claims: &mut HashMap, +) { + let Some(items) = crdt_state::read_all_items() else { + return; + }; + let Some(our_node) = crdt_state::our_node_id() else { + return; + }; + + for item in &items { + // Only claim stories in active stages. + if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) { + continue; + } + + // Skip blocked stories. + if item.blocked == Some(true) { + continue; + } + + // If already claimed by us, skip. + if item.claimed_by.as_deref() == Some(&our_node) { + continue; + } + + // If claimed by another node, respect the claim while it is fresh. + // Once the TTL expires the claim is considered stale regardless of + // whether the holder appears alive — displacement is purely TTL-driven. + if let Some(ref claimer) = item.claimed_by + && !claimer.is_empty() + && claimer != &our_node + && let Some(claimed_at) = item.claimed_at + { + let now = chrono::Utc::now().timestamp() as f64; + let age = now - claimed_at; + if age < CLAIM_TIMEOUT_SECS { + // Claim is still fresh — respect it. + continue; + } + // Claim TTL has expired: displace the stale holder. + slog!( + "[agent-mode] Displacing stale claim on '{}' held by {:.12}… \ + (age {}s > TTL {}s)", + item.story_id, + claimer, + age as u64, + CLAIM_TIMEOUT_SECS as u64, + ); + } + + // Pre-spawn hash-based tie-break: only the node whose + // SHA-256(node_id || story_id) is strictly lowest among all alive + // candidates should write the CRDT claim. This eliminates the + // thundering-herd of simultaneous LWW conflicts while keeping the + // existing LWW + reclaim-stale logic as a safety net for clock skew + // and partial alive-list views. + let alive_peers: Vec = crdt_state::read_all_node_presence() + .unwrap_or_default() + .into_iter() + .filter(|n| { + let now_ms = chrono::Utc::now().timestamp_millis() as f64; + let last_ms = n.last_seen_ms.unwrap_or(n.last_seen * 1000.0); + n.alive && (now_ms - last_ms) / 1000.0 < CLAIM_TIMEOUT_SECS + }) + .map(|n| n.node_id) + .collect(); + if !should_self_claim(&our_node, &item.story_id, &alive_peers) { + slog!( + "[agent-mode] Hash tie-break: deferring claim on '{}' to lower-hash peer", + item.story_id + ); + continue; + } + + // Try to claim this story. + slog!( + "[agent-mode] Claiming story '{}' for this node", + item.story_id + ); + if crdt_state::write_claim(&item.story_id) { + let now = chrono::Utc::now().timestamp() as f64; + our_claims.insert(item.story_id.clone(), now); + } + } + + // Trigger auto-assign to start agents for newly claimed work. + agents.auto_assign_available_work(project_root).await; +} + +/// Detect if another node overwrote our claims (CRDT conflict resolution). +/// If so, stop our local agent for that story. +pub(super) async fn detect_conflicts( + agents: &AgentPool, + project_root: &Path, + our_claims: &mut HashMap, +) { + let lost: Vec = our_claims + .keys() + .filter(|story_id| !crdt_state::is_claimed_by_us(story_id)) + .cloned() + .collect(); + + for story_id in lost { + slog!( + "[agent-mode] Lost claim on '{}' to another node; stopping local agent.", + story_id + ); + our_claims.remove(&story_id); + + // Stop any local agent for this story by looking up its name. + if let Ok(agent_list) = agents.list_agents() { + for info in agent_list { + if info.story_id == story_id { + let _ = agents + .stop_agent(project_root, &story_id, &info.agent_name) + .await; + break; + } + } + } + + // Release our claim (in case it wasn't fully overwritten). + crdt_state::release_claim(&story_id); + } +} + +/// Reclaim work from nodes that have timed out (stale heartbeat). +pub(super) fn reclaim_timed_out_work(_project_root: &Path) { + let Some(items) = crdt_state::read_all_items() else { + return; + }; + let now = chrono::Utc::now().timestamp() as f64; + + for item in &items { + if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) { + continue; + } + + // Release the claim if the TTL has expired — regardless of whether the + // holder is still alive. A node actively working should refresh its + // claim before the TTL window closes. + if let Some(ref claimer) = item.claimed_by { + if claimer.is_empty() { + continue; + } + if let Some(claimed_at) = item.claimed_at + && now - claimed_at >= CLAIM_TIMEOUT_SECS + { + slog!( + "[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)", + item.story_id, + claimer, + (now - claimed_at) as u64, + ); + crdt_state::release_claim(&item.story_id); + } + } + } +} + +/// Check for completed agents, push their feature branches to the remote, +/// and report completion via CRDT. +pub(super) async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) { + let Ok(agent_list) = agents.list_agents() else { + return; + }; + + for info in agent_list { + if !matches!( + info.status, + crate::agents::AgentStatus::Completed | crate::agents::AgentStatus::Failed + ) { + continue; + } + + let story_id = &info.story_id; + + // Only push if this node still owns the claim. + if !crdt_state::is_claimed_by_us(story_id) { + continue; + } + + slog!( + "[agent-mode] Agent {} for '{}'; pushing feature branch.", + if matches!(info.status, crate::agents::AgentStatus::Completed) { + "completed" + } else { + "failed" + }, + story_id + ); + + // Push the feature branch to the remote. + if let Some(ref wt) = info.worktree_path { + let push_result = push_feature_branch(wt, story_id); + match push_result { + Ok(()) => { + slog!("[agent-mode] Pushed feature branch for '{story_id}' to remote."); + } + Err(e) => { + slog!("[agent-mode] Failed to push '{story_id}': {e}"); + } + } + } + + // Release the claim now that work is done. + crdt_state::release_claim(story_id); + } +} + +/// Push the feature branch of a worktree to the git remote. +pub(super) fn push_feature_branch(worktree_path: &str, story_id: &str) -> Result<(), String> { + let branch = format!("feature/story-{story_id}"); + + // Try to push to 'origin'. If origin doesn't exist, try the first remote. + let output = std::process::Command::new("git") + .args(["push", "origin", &branch]) + .current_dir(worktree_path) + .output() + .map_err(|e| format!("Failed to run git push: {e}"))?; + + if output.status.success() { + Ok(()) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + // If 'origin' doesn't exist, try to find any remote. + if stderr.contains("does not appear to be a git repository") + || stderr.contains("No such remote") + { + let remotes = std::process::Command::new("git") + .args(["remote"]) + .current_dir(worktree_path) + .output() + .map_err(|e| format!("Failed to list remotes: {e}"))?; + + let remote_list = String::from_utf8_lossy(&remotes.stdout); + let first_remote = remote_list.lines().next(); + + if let Some(remote) = first_remote { + let retry = std::process::Command::new("git") + .args(["push", remote.trim(), &branch]) + .current_dir(worktree_path) + .output() + .map_err(|e| format!("Failed to push to {remote}: {e}"))?; + + if retry.status.success() { + return Ok(()); + } + return Err(format!( + "git push to '{remote}' failed: {}", + String::from_utf8_lossy(&retry.stderr) + )); + } + + // No remotes configured — not an error in agent mode, just skip. + slog!("[agent-mode] No git remote configured; skipping push for '{story_id}'."); + Ok(()) + } else { + Err(format!("git push failed: {stderr}")) + } + } +} + +// ── Tests ──────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_feature_branch_handles_missing_worktree() { + let result = push_feature_branch("/nonexistent/path", "test_story"); + assert!(result.is_err()); + } +} diff --git a/server/src/agent_mode/mod.rs b/server/src/agent_mode/mod.rs new file mode 100644 index 00000000..f53e01f8 --- /dev/null +++ b/server/src/agent_mode/mod.rs @@ -0,0 +1,283 @@ +//! Headless build-agent mode for distributed, rendezvous-based story processing. +/// +/// When invoked via `huskies agent --rendezvous ws://host:3001/crdt-sync`, this +/// module runs a headless loop that: +/// +/// 1. Syncs CRDT state with the rendezvous peer. +/// 2. Writes a heartbeat to the CRDT `nodes` list. +/// 3. Scans for unclaimed stories in `2_current` and claims them via CRDT. +/// 4. Spawns Claude Code locally for each claimed story. +/// 5. Pushes the feature branch to the git remote when done. +/// 6. Reports completion by advancing the story stage via CRDT. +/// 7. Handles offline/reconnect: CRDT merges on reconnect, interrupted work +/// is reclaimed after a timeout. +/// +/// A minimal HTTP server is started on the agent's port to serve the +/// `/crdt-sync` WebSocket endpoint, enabling other agents to connect for +/// peer mesh discovery. +mod claim; +mod context; +mod loop_ops; + +pub use claim::SCAN_INTERVAL_SECS; + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::broadcast; + +use poem::EndpointExt as _; + +use crate::agents::AgentPool; +use crate::config::ProjectConfig; +use crate::crdt_state; +use crate::io::watcher; +use crate::mesh; +use crate::slog; + +use context::{build_agent_app_context, register_with_gateway}; +use loop_ops::{ + check_completions_and_push, detect_conflicts, reclaim_timed_out_work, scan_and_claim, + write_heartbeat, +}; + +/// Run the headless build agent loop. +/// +/// This function never returns under normal operation — it runs until the +/// process is terminated (SIGINT/SIGTERM). +/// +/// If `join_token` and `gateway_url` are both provided the agent will register +/// itself with the gateway on startup using the one-time token. +pub async fn run( + project_root: Option, + rendezvous_url: String, + port: u16, + join_token: Option, + gateway_url: Option, +) -> Result<(), std::io::Error> { + let project_root = match project_root { + Some(r) => r, + None => { + eprintln!("error: agent mode requires a project root (no .huskies/ found)"); + std::process::exit(1); + } + }; + + println!("\x1b[96;1m[agent-mode]\x1b[0m Starting headless build agent"); + println!("\x1b[96;1m[agent-mode]\x1b[0m Rendezvous: {rendezvous_url}"); + println!( + "\x1b[96;1m[agent-mode]\x1b[0m Project: {}", + project_root.display() + ); + + // Validate project config. + let config = ProjectConfig::load(&project_root).unwrap_or_else(|e| { + eprintln!("error: invalid project config: {e}"); + std::process::exit(1); + }); + slog!( + "[agent-mode] Loaded config with {} agents", + config.agent.len() + ); + + // Event bus for pipeline lifecycle events. + let (watcher_tx, _) = broadcast::channel::(1024); + let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); + + // Start filesystem watcher for config hot-reload. + watcher::start_watcher(project_root.clone(), watcher_tx.clone()); + + // Bridge CRDT events to watcher channel (same as main server). + { + let crdt_watcher_tx = watcher_tx.clone(); + let crdt_prune_root = Some(project_root.clone()); + if let Some(mut crdt_rx) = crdt_state::subscribe() { + tokio::spawn(async move { + while let Ok(evt) = crdt_rx.recv().await { + if crate::pipeline_state::Stage::from_dir(&evt.to_stage) + .is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. })) + && let Some(root) = crdt_prune_root.as_ref().cloned() + { + let story_id = evt.story_id.clone(); + tokio::task::spawn_blocking(move || { + if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) { + slog!("[agent-mode] worktree prune failed for {story_id}: {e}"); + } + }); + } + let (action, commit_msg) = + watcher::stage_metadata(&evt.to_stage, &evt.story_id) + .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); + let watcher_evt = watcher::WatcherEvent::WorkItem { + stage: evt.to_stage, + item_id: evt.story_id, + action: action.to_string(), + commit_msg, + from_stage: evt.from_stage, + }; + let _ = crdt_watcher_tx.send(watcher_evt); + } + }); + } + } + + // Subscribe to watcher events to trigger auto-assign on stage transitions. + { + let auto_rx = watcher_tx.subscribe(); + let auto_agents = Arc::clone(&agents); + let auto_root = project_root.clone(); + tokio::spawn(async move { + let mut rx = auto_rx; + while let Ok(event) = rx.recv().await { + if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event + && crate::pipeline_state::Stage::from_dir(stage.as_str()) + .is_some_and(|s| s.is_active()) + { + slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign."); + auto_agents.auto_assign_available_work(&auto_root).await; + } + } + }); + } + + // ── Start minimal HTTP server for /crdt-sync endpoint ───────────── + // + // Other agents discover this endpoint via the CRDT `nodes` list and + // open supplementary mesh connections for resilience. + { + let sync_handler = poem::get(crate::crdt_sync::crdt_sync_handler); + + // Build a minimal AppContext for the crdt_sync_handler (the handler + // receives it via Data<> but doesn't use it — the underscore prefix + // on `_ctx` confirms this). + let agent_ctx = build_agent_app_context(&project_root, port, watcher_tx.clone()); + let agent_ctx_arc = Arc::new(agent_ctx); + + let app = poem::Route::new() + .at("/crdt-sync", sync_handler) + .data(agent_ctx_arc); + + let bind_addr = format!("0.0.0.0:{port}"); + slog!("[agent-mode] Starting /crdt-sync endpoint on {bind_addr}"); + tokio::spawn(async move { + if let Err(e) = poem::Server::new(poem::listener::TcpListener::bind(&bind_addr)) + .run(app) + .await + { + slog!("[agent-mode] HTTP server error: {e}"); + } + }); + } + + // Write initial heartbeat. + write_heartbeat(&rendezvous_url, port); + + // Register with gateway if a join token and gateway URL were provided. + if let (Some(token), Some(url)) = (join_token.clone(), gateway_url) { + let node_id = crdt_state::our_node_id().unwrap_or_else(|| "unknown".to_string()); + let label = format!("build-agent-{}", &node_id[..node_id.len().min(8)]); + let address = format!("ws://0.0.0.0:{port}/crdt-sync"); + register_with_gateway(&url, &token, &label, &address).await; + } + + // ── Mesh peer discovery ──────────────────────────────────────────── + // + // Periodically read the CRDT `nodes` list and open supplementary sync + // connections to alive peers. The primary rendezvous connection remains + // canonical; mesh connections are supplementary and don't block startup. + let _mesh_handle = { + let our_node_id = crdt_state::our_node_id().unwrap_or_default(); + let max_mesh_peers = config.max_mesh_peers; + mesh::spawn_mesh_discovery( + max_mesh_peers, + our_node_id, + rendezvous_url.clone(), + join_token, + ) + }; + + // Reconcile any committed work from a previous session. + { + let recon_agents = Arc::clone(&agents); + let recon_root = project_root.clone(); + let (recon_tx, _) = broadcast::channel(64); + slog!("[agent-mode] Reconciling completed worktrees from previous session."); + recon_agents + .reconcile_on_startup(&recon_root, &recon_tx) + .await; + } + + // Run initial auto-assign. + slog!("[agent-mode] Initial auto-assign scan."); + agents.auto_assign_available_work(&project_root).await; + + // Track which stories we've claimed so we can detect conflicts. + let mut our_claims: HashMap = HashMap::new(); + + // Main loop: heartbeat, scan, claim, detect conflicts. + let mut interval = tokio::time::interval(std::time::Duration::from_secs(SCAN_INTERVAL_SECS)); + loop { + interval.tick().await; + + // Write heartbeat. + write_heartbeat(&rendezvous_url, port); + + // Scan CRDT for claimable work. + scan_and_claim(&agents, &project_root, &mut our_claims).await; + + // Detect claim conflicts: if another node overwrote our claim, stop our agent. + detect_conflicts(&agents, &project_root, &mut our_claims).await; + + // Reclaim timed-out work from dead nodes. + reclaim_timed_out_work(&project_root); + + // Check for completed agents and push their branches. + check_completions_and_push(&agents, &project_root).await; + } +} + +// ── Tests ──────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use crate::config::ProjectConfig; + use crate::mesh; + + // ── Mesh discovery integration tests ──────────────────────────────── + + /// AC7 (mesh storm cap): With 6 alive peers, the MeshManager enforces a + /// cap of 3 connections. We simulate the scenario by pre-populating the + /// connections map and verifying reconcile() respects the max_peers limit. + #[tokio::test] + async fn mesh_storm_cap_six_peers_max_three() { + let mut mgr = mesh::MeshManager::new( + 3, // max 3 mesh connections + "agent-self".to_string(), + "ws://server:3001/crdt-sync".to_string(), + None, + ); + + // Simulate 6 peer connections (long-running tasks). + let peer_ids: Vec = (0..6).map(|i| format!("peer-{i}")).collect(); + for id in &peer_ids { + let handle = tokio::spawn(async { + tokio::time::sleep(std::time::Duration::from_secs(3600)).await; + }); + mgr.connections.insert(id.clone(), handle); + } + + assert_eq!(mgr.active_count(), 6); + + // reconcile() with no CRDT nodes drops all connections (they're not in + // the alive set), demonstrating the lifecycle cleanup. + mgr.reconcile(); + assert_eq!(mgr.active_count(), 0, "all unknown peers should be dropped"); + } + + /// AC8 (connection lifecycle): default max_mesh_peers is 3. + #[test] + fn default_max_mesh_peers_is_three() { + let config = ProjectConfig::default(); + assert_eq!(config.max_mesh_peers, 3); + } +}