From 6f7a0c7708c21042ac9b94dd2b64b694dbd41c4d Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 10 Apr 2026 18:46:44 +0000 Subject: [PATCH] huskies: merge 479_story_build_agent_mode_with_crdt_based_work_claiming --- server/src/agent_mode.rs | 449 +++++++++++++++++++++++++++++ server/src/crdt_state.rs | 144 +++++++++ server/src/crdt_sync.rs | 32 ++ server/src/crdt_wire.rs | 2 + server/src/db/mod.rs | 4 + server/src/http/mcp/diagnostics.rs | 2 + server/src/http/mod.rs | 2 + server/src/io/watcher.rs | 2 + server/src/main.rs | 74 ++++- server/src/pipeline_state.rs | 12 + 10 files changed, 714 insertions(+), 9 deletions(-) create mode 100644 server/src/agent_mode.rs diff --git a/server/src/agent_mode.rs b/server/src/agent_mode.rs new file mode 100644 index 00000000..e4258d9e --- /dev/null +++ b/server/src/agent_mode.rs @@ -0,0 +1,449 @@ +/// Headless build agent mode. +/// +/// When invoked via `huskies agent --rendezvous ws://host:3001/crdt-sync`, this +/// module runs a headless loop that: +/// +/// 1. Syncs CRDT state with the rendezvous peer. +/// 2. Writes a heartbeat to the CRDT `nodes` list. +/// 3. Scans for unclaimed stories in `2_current` and claims them via CRDT. +/// 4. Spawns Claude Code locally for each claimed story. +/// 5. Pushes the feature branch to the git remote when done. +/// 6. Reports completion by advancing the story stage via CRDT. +/// 7. Handles offline/reconnect: CRDT merges on reconnect, interrupted work +/// is reclaimed after a timeout. +/// +/// No web UI, HTTP server, or chat interface is started. +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tokio::sync::broadcast; + +use crate::agents::AgentPool; +use crate::config::ProjectConfig; +use crate::crdt_state; +use crate::io::watcher; +use crate::slog; + +/// Default claim timeout in seconds. If a node has not updated its heartbeat +/// within this window, other nodes may reclaim the story. +const CLAIM_TIMEOUT_SECS: f64 = 600.0; // 10 minutes + +/// Interval between heartbeat writes and work scans. +const SCAN_INTERVAL_SECS: u64 = 15; + +/// Run the headless build agent loop. +/// +/// This function never returns under normal operation — it runs until the +/// process is terminated (SIGINT/SIGTERM). +pub async fn run( + project_root: Option, + rendezvous_url: String, + port: u16, +) -> Result<(), std::io::Error> { + let project_root = match project_root { + Some(r) => r, + None => { + eprintln!("error: agent mode requires a project root (no .huskies/ found)"); + std::process::exit(1); + } + }; + + println!("\x1b[96;1m[agent-mode]\x1b[0m Starting headless build agent"); + println!("\x1b[96;1m[agent-mode]\x1b[0m Rendezvous: {rendezvous_url}"); + println!("\x1b[96;1m[agent-mode]\x1b[0m Project: {}", project_root.display()); + + // Validate project config. + let config = ProjectConfig::load(&project_root).unwrap_or_else(|e| { + eprintln!("error: invalid project config: {e}"); + std::process::exit(1); + }); + slog!("[agent-mode] Loaded config with {} agents", config.agent.len()); + + // Event bus for pipeline lifecycle events. + let (watcher_tx, _) = broadcast::channel::(1024); + let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); + + // Start filesystem watcher for config hot-reload. + watcher::start_watcher(project_root.clone(), watcher_tx.clone()); + + // Bridge CRDT events to watcher channel (same as main server). + { + let crdt_watcher_tx = watcher_tx.clone(); + let crdt_prune_root = Some(project_root.clone()); + if let Some(mut crdt_rx) = crdt_state::subscribe() { + tokio::spawn(async move { + while let Ok(evt) = crdt_rx.recv().await { + if evt.to_stage == "6_archived" + && let Some(root) = crdt_prune_root.as_ref().cloned() + { + let story_id = evt.story_id.clone(); + tokio::task::spawn_blocking(move || { + if let Err(e) = + crate::worktree::prune_worktree_sync(&root, &story_id) + { + slog!("[agent-mode] worktree prune failed for {story_id}: {e}"); + } + }); + } + let (action, commit_msg) = + watcher::stage_metadata(&evt.to_stage, &evt.story_id) + .unwrap_or(("update", format!("huskies: update {}", evt.story_id))); + let watcher_evt = watcher::WatcherEvent::WorkItem { + stage: evt.to_stage, + item_id: evt.story_id, + action: action.to_string(), + commit_msg, + from_stage: evt.from_stage, + }; + let _ = crdt_watcher_tx.send(watcher_evt); + } + }); + } + } + + // Subscribe to watcher events to trigger auto-assign on stage transitions. + { + let auto_rx = watcher_tx.subscribe(); + let auto_agents = Arc::clone(&agents); + let auto_root = project_root.clone(); + tokio::spawn(async move { + let mut rx = auto_rx; + while let Ok(event) = rx.recv().await { + if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event + && matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge") + { + slog!( + "[agent-mode] CRDT transition in {stage}/; triggering auto-assign." + ); + auto_agents.auto_assign_available_work(&auto_root).await; + } + } + }); + } + + // Write initial heartbeat. + write_heartbeat(&rendezvous_url, port); + + // Reconcile any committed work from a previous session. + { + let recon_agents = Arc::clone(&agents); + let recon_root = project_root.clone(); + let (recon_tx, _) = broadcast::channel(64); + slog!("[agent-mode] Reconciling completed worktrees from previous session."); + recon_agents + .reconcile_on_startup(&recon_root, &recon_tx) + .await; + } + + // Run initial auto-assign. + slog!("[agent-mode] Initial auto-assign scan."); + agents.auto_assign_available_work(&project_root).await; + + // Track which stories we've claimed so we can detect conflicts. + let mut our_claims: HashMap = HashMap::new(); + + // Main loop: heartbeat, scan, claim, detect conflicts. + let mut interval = tokio::time::interval(std::time::Duration::from_secs(SCAN_INTERVAL_SECS)); + loop { + interval.tick().await; + + // Write heartbeat. + write_heartbeat(&rendezvous_url, port); + + // Scan CRDT for claimable work. + scan_and_claim(&agents, &project_root, &mut our_claims).await; + + // Detect claim conflicts: if another node overwrote our claim, stop our agent. + detect_conflicts(&agents, &project_root, &mut our_claims).await; + + // Reclaim timed-out work from dead nodes. + reclaim_timed_out_work(&project_root); + + // Check for completed agents and push their branches. + check_completions_and_push(&agents, &project_root).await; + } +} + +/// Write this node's heartbeat to the CRDT `nodes` list. +fn write_heartbeat(rendezvous_url: &str, port: u16) { + let Some(node_id) = crdt_state::our_node_id() else { + return; + }; + let now = chrono::Utc::now().timestamp() as f64; + // Advertise our crdt-sync endpoint. + let address = format!("ws://0.0.0.0:{port}/crdt-sync"); + crdt_state::write_node_presence(&node_id, &address, now, true); + slog!( + "[agent-mode] Heartbeat written: node={:.12}… rendezvous={rendezvous_url}", + &node_id + ); +} + +/// Scan CRDT pipeline for unclaimed stories and claim them. +async fn scan_and_claim( + agents: &AgentPool, + project_root: &Path, + our_claims: &mut HashMap, +) { + let Some(items) = crdt_state::read_all_items() else { + return; + }; + let Some(our_node) = crdt_state::our_node_id() else { + return; + }; + + for item in &items { + // Only claim stories in active stages. + if !matches!(item.stage.as_str(), "2_current" | "3_qa" | "4_merge") { + continue; + } + + // Skip blocked stories. + if item.blocked == Some(true) { + continue; + } + + // If already claimed by us, skip. + if item.claimed_by.as_deref() == Some(&our_node) { + continue; + } + + // If claimed by another alive node and claim is fresh, skip. + if let Some(ref claimer) = item.claimed_by + && !claimer.is_empty() + && claimer != &our_node + && let Some(claimed_at) = item.claimed_at + { + let now = chrono::Utc::now().timestamp() as f64; + if now - claimed_at < CLAIM_TIMEOUT_SECS && is_node_alive(claimer) { + continue; + } + } + + // Try to claim this story. + slog!( + "[agent-mode] Claiming story '{}' for this node", + item.story_id + ); + if crdt_state::write_claim(&item.story_id) { + let now = chrono::Utc::now().timestamp() as f64; + our_claims.insert(item.story_id.clone(), now); + } + } + + // Trigger auto-assign to start agents for newly claimed work. + agents.auto_assign_available_work(project_root).await; +} + +/// Detect if another node overwrote our claims (CRDT conflict resolution). +/// If so, stop our local agent for that story. +async fn detect_conflicts( + agents: &AgentPool, + project_root: &Path, + our_claims: &mut HashMap, +) { + let lost: Vec = our_claims + .keys() + .filter(|story_id| !crdt_state::is_claimed_by_us(story_id)) + .cloned() + .collect(); + + for story_id in lost { + slog!( + "[agent-mode] Lost claim on '{}' to another node; stopping local agent.", + story_id + ); + our_claims.remove(&story_id); + + // Stop any local agent for this story by looking up its name. + if let Ok(agent_list) = agents.list_agents() { + for info in agent_list { + if info.story_id == story_id { + let _ = agents + .stop_agent(project_root, &story_id, &info.agent_name) + .await; + break; + } + } + } + + // Release our claim (in case it wasn't fully overwritten). + crdt_state::release_claim(&story_id); + } +} + +/// Reclaim work from nodes that have timed out (stale heartbeat). +fn reclaim_timed_out_work(_project_root: &Path) { + let Some(items) = crdt_state::read_all_items() else { + return; + }; + let now = chrono::Utc::now().timestamp() as f64; + + for item in &items { + if !matches!(item.stage.as_str(), "2_current" | "3_qa" | "4_merge") { + continue; + } + + // Check if the claim has timed out. + if let Some(ref claimer) = item.claimed_by { + if claimer.is_empty() { + continue; + } + if let Some(claimed_at) = item.claimed_at + && now - claimed_at >= CLAIM_TIMEOUT_SECS + && !is_node_alive(claimer) + { + slog!( + "[agent-mode] Reclaiming timed-out story '{}' from dead node {:.12}…", + item.story_id, + claimer + ); + crdt_state::release_claim(&item.story_id); + } + } + } +} + +/// Check if a node is alive according to the CRDT nodes list. +fn is_node_alive(node_id: &str) -> bool { + let Some(nodes) = crdt_state::read_all_node_presence() else { + return false; + }; + let now = chrono::Utc::now().timestamp() as f64; + + for node in &nodes { + if node.node_id == node_id { + // Node is considered alive if it's marked alive AND its heartbeat + // is within the timeout window. + return node.alive && (now - node.last_seen) < CLAIM_TIMEOUT_SECS; + } + } + false +} + +/// Check for completed agents, push their feature branches to the remote, +/// and report completion via CRDT. +async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) { + let Ok(agent_list) = agents.list_agents() else { + return; + }; + + for info in agent_list { + if !matches!( + info.status, + crate::agents::AgentStatus::Completed | crate::agents::AgentStatus::Failed + ) { + continue; + } + + let story_id = &info.story_id; + + // Only push if this node still owns the claim. + if !crdt_state::is_claimed_by_us(story_id) { + continue; + } + + slog!( + "[agent-mode] Agent {} for '{}'; pushing feature branch.", + if matches!(info.status, crate::agents::AgentStatus::Completed) { + "completed" + } else { + "failed" + }, + story_id + ); + + // Push the feature branch to the remote. + if let Some(ref wt) = info.worktree_path { + let push_result = push_feature_branch(wt, story_id); + match push_result { + Ok(()) => { + slog!("[agent-mode] Pushed feature branch for '{story_id}' to remote."); + } + Err(e) => { + slog!("[agent-mode] Failed to push '{story_id}': {e}"); + } + } + } + + // Release the claim now that work is done. + crdt_state::release_claim(story_id); + } +} + +/// Push the feature branch of a worktree to the git remote. +fn push_feature_branch(worktree_path: &str, story_id: &str) -> Result<(), String> { + let branch = format!("feature/story-{story_id}"); + + // Try to push to 'origin'. If origin doesn't exist, try the first remote. + let output = std::process::Command::new("git") + .args(["push", "origin", &branch]) + .current_dir(worktree_path) + .output() + .map_err(|e| format!("Failed to run git push: {e}"))?; + + if output.status.success() { + Ok(()) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + // If 'origin' doesn't exist, try to find any remote. + if stderr.contains("does not appear to be a git repository") + || stderr.contains("No such remote") + { + let remotes = std::process::Command::new("git") + .args(["remote"]) + .current_dir(worktree_path) + .output() + .map_err(|e| format!("Failed to list remotes: {e}"))?; + + let remote_list = String::from_utf8_lossy(&remotes.stdout); + let first_remote = remote_list.lines().next(); + + if let Some(remote) = first_remote { + let retry = std::process::Command::new("git") + .args(["push", remote.trim(), &branch]) + .current_dir(worktree_path) + .output() + .map_err(|e| format!("Failed to push to {remote}: {e}"))?; + + if retry.status.success() { + return Ok(()); + } + return Err(format!( + "git push to '{remote}' failed: {}", + String::from_utf8_lossy(&retry.stderr) + )); + } + + // No remotes configured — not an error in agent mode, just skip. + slog!("[agent-mode] No git remote configured; skipping push for '{story_id}'."); + Ok(()) + } else { + Err(format!("git push failed: {stderr}")) + } + } +} + +// ── Tests ──────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn is_node_alive_returns_false_for_unknown_node() { + // Without CRDT init, should return false. + assert!(!is_node_alive("nonexistent_node_id")); + } + + #[test] + fn push_feature_branch_handles_missing_worktree() { + let result = push_feature_branch("/nonexistent/path", "test_story"); + assert!(result.is_err()); + } + + #[test] + fn claim_timeout_is_ten_minutes() { + assert_eq!(CLAIM_TIMEOUT_SECS, 600.0); + } +} diff --git a/server/src/crdt_state.rs b/server/src/crdt_state.rs index 0a9ac38a..4d1dccd0 100644 --- a/server/src/crdt_state.rs +++ b/server/src/crdt_state.rs @@ -91,6 +91,14 @@ pub struct PipelineItemCrdt { pub retry_count: LwwRegisterCrdt, pub blocked: LwwRegisterCrdt, pub depends_on: LwwRegisterCrdt, + /// Node ID (hex-encoded Ed25519 pubkey) of the node that claimed this item. + /// Used for distributed work claiming — the LWW register resolves conflicts + /// deterministically so all nodes converge on the same claimer. + pub claimed_by: LwwRegisterCrdt, + /// Unix timestamp (seconds) when the claim was written. + /// Used for timeout-based reclaim: if a node crashes, other nodes can + /// reclaim the item after the timeout expires. + pub claimed_at: LwwRegisterCrdt, } /// CRDT node that holds a single peer's presence entry. @@ -119,6 +127,10 @@ pub struct PipelineItemView { pub retry_count: Option, pub blocked: Option, pub depends_on: Option>, + /// Node ID of the node that claimed this item (hex-encoded Ed25519 pubkey). + pub claimed_by: Option, + /// Unix timestamp when the item was claimed. + pub claimed_at: Option, } /// A snapshot of a single node presence entry derived from the CRDT document. @@ -366,6 +378,7 @@ where /// /// When the stage changes (or a new item is created), a [`CrdtEvent`] is /// broadcast so subscribers can react to the transition. +#[allow(clippy::too_many_arguments)] pub fn write_item( story_id: &str, stage: &str, @@ -374,6 +387,8 @@ pub fn write_item( retry_count: Option, blocked: Option, depends_on: Option<&str>, + claimed_by: Option<&str>, + claimed_at: Option, ) { let Some(state_mutex) = CRDT_STATE.get() else { return; @@ -419,6 +434,16 @@ pub fn write_item( s.crdt.doc.items[idx].depends_on.set(d.to_string()) }); } + if let Some(cb) = claimed_by { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_by.set(cb.to_string()) + }); + } + if let Some(ca) = claimed_at { + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_at.set(ca) + }); + } // Broadcast a CrdtEvent if the stage actually changed. let stage_changed = old_stage.as_deref() != Some(stage); @@ -445,6 +470,8 @@ pub fn write_item( "retry_count": retry_count.unwrap_or(0) as f64, "blocked": blocked.unwrap_or(false), "depends_on": depends_on.unwrap_or(""), + "claimed_by": claimed_by.unwrap_or(""), + "claimed_at": claimed_at.unwrap_or(0.0), }) .into(); @@ -573,6 +600,73 @@ pub fn our_node_id() -> Option { Some(hex::encode(&state.crdt.id)) } +/// Write a claim on a pipeline item via CRDT. +/// +/// Sets `claimed_by` to this node's ID and `claimed_at` to the current time. +/// The LWW register ensures deterministic conflict resolution — if two nodes +/// claim the same item simultaneously, both will converge to the same winner +/// after CRDT sync. +/// +/// Returns `true` if the claim was written, `false` if the item doesn't exist +/// or CRDT is not initialised. +pub fn write_claim(story_id: &str) -> bool { + let Some(node_id) = our_node_id() else { + return false; + }; + let now = chrono::Utc::now().timestamp() as f64; + + let Some(state_mutex) = CRDT_STATE.get() else { + return false; + }; + let Ok(mut state) = state_mutex.lock() else { + return false; + }; + + let Some(&idx) = state.index.get(story_id) else { + return false; + }; + + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_by.set(node_id.clone()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_at.set(now) + }); + + true +} + +/// Release a claim on a pipeline item (clear claimed_by and claimed_at). +pub fn release_claim(story_id: &str) { + let Some(state_mutex) = CRDT_STATE.get() else { + return; + }; + let Ok(mut state) = state_mutex.lock() else { + return; + }; + let Some(&idx) = state.index.get(story_id) else { + return; + }; + + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_by.set(String::new()) + }); + apply_and_persist(&mut state, |s| { + s.crdt.doc.items[idx].claimed_at.set(0.0) + }); +} + +/// Check if this node currently holds the claim on a pipeline item. +pub fn is_claimed_by_us(story_id: &str) -> bool { + let Some(node_id) = our_node_id() else { + return false; + }; + let Some(item) = read_item(story_id) else { + return false; + }; + item.claimed_by.as_deref() == Some(&node_id) +} + /// Write or update a node presence entry in the CRDT. /// /// If a node with the given `node_id` already exists, only `last_seen`, @@ -673,6 +767,8 @@ pub struct CrdtItemDump { pub retry_count: Option, pub blocked: Option, pub depends_on: Option>, + pub claimed_by: Option, + pub claimed_at: Option, /// Hex-encoded OpId of the list insert op — cross-reference with `crdt_ops`. pub content_index: String, pub is_deleted: bool, @@ -793,6 +889,15 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { _ => None, }; + let claimed_by = match item_crdt.claimed_by.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let claimed_at = match item_crdt.claimed_at.view() { + JsonValue::Number(n) if n > 0.0 => Some(n), + _ => None, + }; + let content_index = op.id.iter().map(|b| format!("{b:02x}")).collect::(); items.push(CrdtItemDump { @@ -803,6 +908,8 @@ pub fn dump_crdt_state(story_id_filter: Option<&str>) -> CrdtStateDump { retry_count, blocked, depends_on, + claimed_by, + claimed_at, content_index, is_deleted: op.is_deleted, }); @@ -941,6 +1048,15 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option { _ => None, }; + let claimed_by = match item.claimed_by.view() { + JsonValue::String(s) if !s.is_empty() => Some(s), + _ => None, + }; + let claimed_at = match item.claimed_at.view() { + JsonValue::Number(n) if n > 0.0 => Some(n), + _ => None, + }; + Some(PipelineItemView { story_id, stage, @@ -949,6 +1065,8 @@ fn extract_item_view(item: &PipelineItemCrdt) -> Option { retry_count, blocked, depends_on, + claimed_by, + claimed_at, }) } @@ -1049,6 +1167,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -1076,6 +1196,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -1106,6 +1228,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -1147,6 +1271,8 @@ mod tests { "retry_count": 2.0, "blocked": true, "depends_on": "[10,20]", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -1177,6 +1303,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); @@ -1213,6 +1341,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -1266,6 +1396,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -1385,6 +1517,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp); @@ -1406,6 +1540,8 @@ mod tests { "retry_count": 1.0, "blocked": false, "depends_on": "[10]", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp); @@ -1433,6 +1569,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(bft_json_crdt::op::ROOT_ID, item).sign(&kp); @@ -1479,6 +1617,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); @@ -1495,6 +1635,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let insert_op = crdt.doc.items.insert(ROOT_ID, target_item).sign(&kp); @@ -1608,6 +1750,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); diff --git a/server/src/crdt_sync.rs b/server/src/crdt_sync.rs index 976a174b..17ce32e1 100644 --- a/server/src/crdt_sync.rs +++ b/server/src/crdt_sync.rs @@ -398,6 +398,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); @@ -500,6 +502,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -662,6 +666,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -717,6 +723,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -785,6 +793,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); @@ -839,6 +849,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let phantom_op = source @@ -913,6 +925,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a); @@ -934,6 +948,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); @@ -1030,6 +1046,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); @@ -1080,6 +1098,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); @@ -1125,6 +1145,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); @@ -1158,6 +1180,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op1 = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); @@ -1237,6 +1261,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op1 = crdt_a.doc.items.insert(ROOT_ID, item).sign(&kp_a); @@ -1302,6 +1328,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); @@ -1368,6 +1396,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op_a1 = crdt_a.doc.items.insert(ROOT_ID, item_a).sign(&kp_a); @@ -1392,6 +1422,8 @@ name = "test" "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op_b1 = crdt_b.doc.items.insert(ROOT_ID, item_b).sign(&kp_b); diff --git a/server/src/crdt_wire.rs b/server/src/crdt_wire.rs index 19aeb242..a375297a 100644 --- a/server/src/crdt_wire.rs +++ b/server/src/crdt_wire.rs @@ -131,6 +131,8 @@ mod tests { "retry_count": 0.0, "blocked": false, "depends_on": "", + "claimed_by": "", + "claimed_at": 0.0, }) .into(); let op = crdt.doc.items.insert(ROOT_ID, item).sign(&kp); diff --git a/server/src/db/mod.rs b/server/src/db/mod.rs index 86570b14..e436cc44 100644 --- a/server/src/db/mod.rs +++ b/server/src/db/mod.rs @@ -201,6 +201,8 @@ pub fn write_item_with_content( retry_count, blocked, depends_on.as_deref(), + None, + None, ); // Shadow: pipeline_items table (only when DB is initialised). @@ -267,6 +269,8 @@ pub fn move_item_stage( retry_count, blocked, depends_on.as_deref(), + None, + None, ); // Shadow table. diff --git a/server/src/http/mcp/diagnostics.rs b/server/src/http/mcp/diagnostics.rs index faac56fd..f010d933 100644 --- a/server/src/http/mcp/diagnostics.rs +++ b/server/src/http/mcp/diagnostics.rs @@ -284,6 +284,8 @@ pub(super) fn tool_dump_crdt(args: &Value) -> Result { "retry_count": item.retry_count, "blocked": item.blocked, "depends_on": item.depends_on, + "claimed_by": item.claimed_by, + "claimed_at": item.claimed_at, "content_index": item.content_index, "is_deleted": item.is_deleted, }) diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index f2568a32..e65c9d93 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -161,6 +161,8 @@ pub fn debug_crdt_handler(req: &poem::Request) -> poem::Response { "retry_count": item.retry_count, "blocked": item.blocked, "depends_on": item.depends_on, + "claimed_by": item.claimed_by, + "claimed_at": item.claimed_at, "content_index": item.content_index, "is_deleted": item.is_deleted, }) diff --git a/server/src/io/watcher.rs b/server/src/io/watcher.rs index 57f49ccd..b239aa3a 100644 --- a/server/src/io/watcher.rs +++ b/server/src/io/watcher.rs @@ -347,6 +347,8 @@ pub(crate) fn sweep_done_to_archived(done_retention: Duration) { None, Some(false), None, + None, + None, ); slog!("[watcher] sweep: promoted {story_id} → 6_archived/"); } diff --git a/server/src/main.rs b/server/src/main.rs index b9cf7892..9c02d514 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -3,6 +3,7 @@ #![recursion_limit = "256"] mod agent_log; +mod agent_mode; mod agents; mod chat; mod config; @@ -46,6 +47,10 @@ struct CliArgs { path: Option, /// Whether the `init` subcommand was given. init: bool, + /// Whether the `agent` subcommand was given. + agent: bool, + /// Rendezvous WebSocket URL for agent mode (e.g. `ws://host:3001/crdt-sync`). + rendezvous: Option, } /// Parse CLI arguments into `CliArgs`, or exit early for `--help` / `--version`. @@ -53,6 +58,8 @@ fn parse_cli_args(args: &[String]) -> Result { let mut port: Option = None; let mut path: Option = None; let mut init = false; + let mut agent = false; + let mut rendezvous: Option = None; let mut i = 0; while i < args.len() { @@ -82,9 +89,23 @@ fn parse_cli_args(args: &[String]) -> Result { Err(_) => return Err(format!("invalid port value: '{val}'")), } } + "--rendezvous" => { + i += 1; + if i >= args.len() { + return Err("--rendezvous requires a value".to_string()); + } + rendezvous = Some(args[i].clone()); + } + a if a.starts_with("--rendezvous=") => { + let val = &a["--rendezvous=".len()..]; + rendezvous = Some(val.to_string()); + } "init" => { init = true; } + "agent" => { + agent = true; + } a if a.starts_with('-') => { return Err(format!("unknown option: {a}")); } @@ -98,17 +119,23 @@ fn parse_cli_args(args: &[String]) -> Result { i += 1; } - Ok(CliArgs { port, path, init }) + if agent && rendezvous.is_none() { + return Err("agent mode requires --rendezvous ".to_string()); + } + + Ok(CliArgs { port, path, init, agent, rendezvous }) } fn print_help() { println!("huskies [OPTIONS] [PATH]"); println!("huskies init [OPTIONS] [PATH]"); + println!("huskies agent --rendezvous [OPTIONS] [PATH]"); println!(); println!("Serve a huskies project."); println!(); println!("COMMANDS:"); - println!(" init Scaffold a new .huskies/ project and start the interactive setup wizard."); + println!(" init Scaffold a new .huskies/ project and start the interactive setup wizard."); + println!(" agent Run as a headless build agent — syncs CRDT state, claims and runs work."); println!(); println!("ARGS:"); println!( @@ -117,9 +144,11 @@ fn print_help() { ); println!(); println!("OPTIONS:"); - println!(" -h, --help Print this help and exit"); - println!(" -V, --version Print the version and exit"); - println!(" --port Port to listen on (default: 3001). Persisted to project.toml."); + println!(" -h, --help Print this help and exit"); + println!(" -V, --version Print the version and exit"); + println!(" --port Port to listen on (default: 3001). Persisted to project.toml."); + println!(" --rendezvous WebSocket URL of the rendezvous peer (agent mode only)."); + println!(" Example: ws://server:3001/crdt-sync"); } /// Resolve the optional positional path argument into an absolute `PathBuf`. @@ -169,6 +198,8 @@ async fn main() -> Result<(), std::io::Error> { }; let is_init = cli.init; + let is_agent = cli.agent; + let agent_rendezvous = cli.rendezvous.clone(); let explicit_path = resolve_path_arg(cli.path.as_deref(), &cwd); // Port resolution: CLI flag > project.toml (loaded later) > default. @@ -309,13 +340,36 @@ async fn main() -> Result<(), std::io::Error> { // (CRDT state layer is initialised above alongside the legacy pipeline.db.) // Start the CRDT sync rendezvous client if configured in project.toml. - if let Some(ref root) = *app_state.project_root.lock().unwrap() - && let Ok(cfg) = config::ProjectConfig::load(root) - && let Some(rendezvous_url) = cfg.rendezvous - { + // In agent mode, the --rendezvous flag overrides project.toml. + let rendezvous_url_for_sync = if is_agent { + agent_rendezvous.clone() + } else { + app_state + .project_root + .lock() + .unwrap() + .as_ref() + .and_then(|root| config::ProjectConfig::load(root).ok()) + .and_then(|cfg| cfg.rendezvous) + }; + if let Some(rendezvous_url) = rendezvous_url_for_sync { crdt_sync::spawn_rendezvous_client(rendezvous_url); } + // ── Agent mode: headless build agent ──────────────────────────────── + // + // When `huskies agent --rendezvous ` is invoked, skip the web UI, + // chat bots, and HTTP server entirely. Instead, run a headless loop that: + // 1. Syncs CRDT state with the rendezvous peer. + // 2. Scans for unclaimed work and claims it via CRDT. + // 3. Runs Claude Code locally for claimed stories. + // 4. Pushes feature branches and reports completion via CRDT. + if is_agent { + let agent_root = app_state.project_root.lock().unwrap().clone(); + let rendezvous = agent_rendezvous.expect("agent mode requires --rendezvous"); + return agent_mode::run(agent_root, rendezvous, port).await; + } + let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default())); // Event bus: broadcast channel for pipeline lifecycle events. @@ -886,6 +940,8 @@ name = "coder" assert_eq!(result.port, None); assert_eq!(result.path, None); assert!(!result.init); + assert!(!result.agent); + assert_eq!(result.rendezvous, None); } #[test] diff --git a/server/src/pipeline_state.rs b/server/src/pipeline_state.rs index 2911b3ea..488550e7 100644 --- a/server/src/pipeline_state.rs +++ b/server/src/pipeline_state.rs @@ -1145,6 +1145,8 @@ mod tests { retry_count: None, blocked: None, depends_on: Some(vec![10, 20]), + claimed_by: None, + claimed_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert_eq!(item.story_id, StoryId("42_story_test".to_string())); @@ -1164,6 +1166,8 @@ mod tests { retry_count: Some(2), blocked: None, depends_on: None, + claimed_by: None, + claimed_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Coding)); @@ -1180,6 +1184,8 @@ mod tests { retry_count: None, blocked: None, depends_on: None, + claimed_by: None, + claimed_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!(item.stage, Stage::Merge { .. })); @@ -1203,6 +1209,8 @@ mod tests { retry_count: None, blocked: Some(true), depends_on: None, + claimed_by: None, + claimed_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -1224,6 +1232,8 @@ mod tests { retry_count: None, blocked: Some(false), depends_on: None, + claimed_by: None, + claimed_at: None, }; let item = PipelineItem::try_from(&view).unwrap(); assert!(matches!( @@ -1245,6 +1255,8 @@ mod tests { retry_count: None, blocked: None, depends_on: None, + claimed_by: None, + claimed_at: None, }; let result = PipelineItem::try_from(&view); assert!(matches!(