Files
huskies/server/src/agent_mode.rs
T

453 lines
16 KiB
Rust
Raw Normal View History

//! Headless build-agent mode for distributed, rendezvous-based story processing.
/// Headless build agent mode.
///
/// When invoked via `huskies agent --rendezvous ws://host:3001/crdt-sync`, this
/// module runs a headless loop that:
///
/// 1. Syncs CRDT state with the rendezvous peer.
/// 2. Writes a heartbeat to the CRDT `nodes` list.
/// 3. Scans for unclaimed stories in `2_current` and claims them via CRDT.
/// 4. Spawns Claude Code locally for each claimed story.
/// 5. Pushes the feature branch to the git remote when done.
/// 6. Reports completion by advancing the story stage via CRDT.
/// 7. Handles offline/reconnect: CRDT merges on reconnect, interrupted work
/// is reclaimed after a timeout.
///
/// No web UI, HTTP server, or chat interface is started.
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::broadcast;
use crate::agents::AgentPool;
use crate::config::ProjectConfig;
use crate::crdt_state;
use crate::io::watcher;
use crate::slog;
/// Default claim timeout in seconds. If a node has not updated its heartbeat
/// within this window, other nodes may reclaim the story.
const CLAIM_TIMEOUT_SECS: f64 = 600.0; // 10 minutes
/// Interval between heartbeat writes and work scans.
const SCAN_INTERVAL_SECS: u64 = 15;
/// Run the headless build agent loop.
///
/// This function never returns under normal operation — it runs until the
/// process is terminated (SIGINT/SIGTERM).
pub async fn run(
project_root: Option<PathBuf>,
rendezvous_url: String,
port: u16,
) -> Result<(), std::io::Error> {
let project_root = match project_root {
Some(r) => r,
None => {
eprintln!("error: agent mode requires a project root (no .huskies/ found)");
std::process::exit(1);
}
};
println!("\x1b[96;1m[agent-mode]\x1b[0m Starting headless build agent");
println!("\x1b[96;1m[agent-mode]\x1b[0m Rendezvous: {rendezvous_url}");
println!(
"\x1b[96;1m[agent-mode]\x1b[0m Project: {}",
project_root.display()
);
// Validate project config.
let config = ProjectConfig::load(&project_root).unwrap_or_else(|e| {
eprintln!("error: invalid project config: {e}");
std::process::exit(1);
});
slog!(
"[agent-mode] Loaded config with {} agents",
config.agent.len()
);
// Event bus for pipeline lifecycle events.
let (watcher_tx, _) = broadcast::channel::<watcher::WatcherEvent>(1024);
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));
// Start filesystem watcher for config hot-reload.
watcher::start_watcher(project_root.clone(), watcher_tx.clone());
// Bridge CRDT events to watcher channel (same as main server).
{
let crdt_watcher_tx = watcher_tx.clone();
let crdt_prune_root = Some(project_root.clone());
if let Some(mut crdt_rx) = crdt_state::subscribe() {
tokio::spawn(async move {
while let Ok(evt) = crdt_rx.recv().await {
if evt.to_stage == "6_archived"
&& let Some(root) = crdt_prune_root.as_ref().cloned()
{
let story_id = evt.story_id.clone();
tokio::task::spawn_blocking(move || {
if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) {
slog!("[agent-mode] worktree prune failed for {story_id}: {e}");
}
});
}
let (action, commit_msg) =
watcher::stage_metadata(&evt.to_stage, &evt.story_id)
.unwrap_or(("update", format!("huskies: update {}", evt.story_id)));
let watcher_evt = watcher::WatcherEvent::WorkItem {
stage: evt.to_stage,
item_id: evt.story_id,
action: action.to_string(),
commit_msg,
from_stage: evt.from_stage,
};
let _ = crdt_watcher_tx.send(watcher_evt);
}
});
}
}
// Subscribe to watcher events to trigger auto-assign on stage transitions.
{
let auto_rx = watcher_tx.subscribe();
let auto_agents = Arc::clone(&agents);
let auto_root = project_root.clone();
tokio::spawn(async move {
let mut rx = auto_rx;
while let Ok(event) = rx.recv().await {
if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event
&& matches!(stage.as_str(), "2_current" | "3_qa" | "4_merge")
{
slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign.");
auto_agents.auto_assign_available_work(&auto_root).await;
}
}
});
}
// Write initial heartbeat.
write_heartbeat(&rendezvous_url, port);
// Reconcile any committed work from a previous session.
{
let recon_agents = Arc::clone(&agents);
let recon_root = project_root.clone();
let (recon_tx, _) = broadcast::channel(64);
slog!("[agent-mode] Reconciling completed worktrees from previous session.");
recon_agents
.reconcile_on_startup(&recon_root, &recon_tx)
.await;
}
// Run initial auto-assign.
slog!("[agent-mode] Initial auto-assign scan.");
agents.auto_assign_available_work(&project_root).await;
// Track which stories we've claimed so we can detect conflicts.
let mut our_claims: HashMap<String, f64> = HashMap::new();
// Main loop: heartbeat, scan, claim, detect conflicts.
let mut interval = tokio::time::interval(std::time::Duration::from_secs(SCAN_INTERVAL_SECS));
loop {
interval.tick().await;
// Write heartbeat.
write_heartbeat(&rendezvous_url, port);
// Scan CRDT for claimable work.
scan_and_claim(&agents, &project_root, &mut our_claims).await;
// Detect claim conflicts: if another node overwrote our claim, stop our agent.
detect_conflicts(&agents, &project_root, &mut our_claims).await;
// Reclaim timed-out work from dead nodes.
reclaim_timed_out_work(&project_root);
// Check for completed agents and push their branches.
check_completions_and_push(&agents, &project_root).await;
}
}
/// Write this node's heartbeat to the CRDT `nodes` list.
fn write_heartbeat(rendezvous_url: &str, port: u16) {
let Some(node_id) = crdt_state::our_node_id() else {
return;
};
let now = chrono::Utc::now().timestamp() as f64;
// Advertise our crdt-sync endpoint.
let address = format!("ws://0.0.0.0:{port}/crdt-sync");
crdt_state::write_node_presence(&node_id, &address, now, true);
slog!(
"[agent-mode] Heartbeat written: node={:.12}… rendezvous={rendezvous_url}",
&node_id
);
}
/// Scan CRDT pipeline for unclaimed stories and claim them.
async fn scan_and_claim(
agents: &AgentPool,
project_root: &Path,
our_claims: &mut HashMap<String, f64>,
) {
let Some(items) = crdt_state::read_all_items() else {
return;
};
let Some(our_node) = crdt_state::our_node_id() else {
return;
};
for item in &items {
// Only claim stories in active stages.
if !matches!(item.stage.as_str(), "2_current" | "3_qa" | "4_merge") {
continue;
}
// Skip blocked stories.
if item.blocked == Some(true) {
continue;
}
// If already claimed by us, skip.
if item.claimed_by.as_deref() == Some(&our_node) {
continue;
}
// If claimed by another alive node and claim is fresh, skip.
if let Some(ref claimer) = item.claimed_by
&& !claimer.is_empty()
&& claimer != &our_node
&& let Some(claimed_at) = item.claimed_at
{
let now = chrono::Utc::now().timestamp() as f64;
if now - claimed_at < CLAIM_TIMEOUT_SECS && is_node_alive(claimer) {
continue;
}
}
// Try to claim this story.
slog!(
"[agent-mode] Claiming story '{}' for this node",
item.story_id
);
if crdt_state::write_claim(&item.story_id) {
let now = chrono::Utc::now().timestamp() as f64;
our_claims.insert(item.story_id.clone(), now);
}
}
// Trigger auto-assign to start agents for newly claimed work.
agents.auto_assign_available_work(project_root).await;
}
/// Detect if another node overwrote our claims (CRDT conflict resolution).
/// If so, stop our local agent for that story.
async fn detect_conflicts(
agents: &AgentPool,
project_root: &Path,
our_claims: &mut HashMap<String, f64>,
) {
let lost: Vec<String> = our_claims
.keys()
.filter(|story_id| !crdt_state::is_claimed_by_us(story_id))
.cloned()
.collect();
for story_id in lost {
slog!(
"[agent-mode] Lost claim on '{}' to another node; stopping local agent.",
story_id
);
our_claims.remove(&story_id);
// Stop any local agent for this story by looking up its name.
if let Ok(agent_list) = agents.list_agents() {
for info in agent_list {
if info.story_id == story_id {
let _ = agents
.stop_agent(project_root, &story_id, &info.agent_name)
.await;
break;
}
}
}
// Release our claim (in case it wasn't fully overwritten).
crdt_state::release_claim(&story_id);
}
}
/// Reclaim work from nodes that have timed out (stale heartbeat).
fn reclaim_timed_out_work(_project_root: &Path) {
let Some(items) = crdt_state::read_all_items() else {
return;
};
let now = chrono::Utc::now().timestamp() as f64;
for item in &items {
if !matches!(item.stage.as_str(), "2_current" | "3_qa" | "4_merge") {
continue;
}
// Check if the claim has timed out.
if let Some(ref claimer) = item.claimed_by {
if claimer.is_empty() {
continue;
}
if let Some(claimed_at) = item.claimed_at
&& now - claimed_at >= CLAIM_TIMEOUT_SECS
&& !is_node_alive(claimer)
{
slog!(
"[agent-mode] Reclaiming timed-out story '{}' from dead node {:.12}…",
item.story_id,
claimer
);
crdt_state::release_claim(&item.story_id);
}
}
}
}
/// Check if a node is alive according to the CRDT nodes list.
fn is_node_alive(node_id: &str) -> bool {
let Some(nodes) = crdt_state::read_all_node_presence() else {
return false;
};
let now = chrono::Utc::now().timestamp() as f64;
for node in &nodes {
if node.node_id == node_id {
// Node is considered alive if it's marked alive AND its heartbeat
// is within the timeout window.
return node.alive && (now - node.last_seen) < CLAIM_TIMEOUT_SECS;
}
}
false
}
/// Check for completed agents, push their feature branches to the remote,
/// and report completion via CRDT.
async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) {
let Ok(agent_list) = agents.list_agents() else {
return;
};
for info in agent_list {
if !matches!(
info.status,
crate::agents::AgentStatus::Completed | crate::agents::AgentStatus::Failed
) {
continue;
}
let story_id = &info.story_id;
// Only push if this node still owns the claim.
if !crdt_state::is_claimed_by_us(story_id) {
continue;
}
slog!(
"[agent-mode] Agent {} for '{}'; pushing feature branch.",
if matches!(info.status, crate::agents::AgentStatus::Completed) {
"completed"
} else {
"failed"
},
story_id
);
// Push the feature branch to the remote.
if let Some(ref wt) = info.worktree_path {
let push_result = push_feature_branch(wt, story_id);
match push_result {
Ok(()) => {
slog!("[agent-mode] Pushed feature branch for '{story_id}' to remote.");
}
Err(e) => {
slog!("[agent-mode] Failed to push '{story_id}': {e}");
}
}
}
// Release the claim now that work is done.
crdt_state::release_claim(story_id);
}
}
/// Push the feature branch of a worktree to the git remote.
fn push_feature_branch(worktree_path: &str, story_id: &str) -> Result<(), String> {
let branch = format!("feature/story-{story_id}");
// Try to push to 'origin'. If origin doesn't exist, try the first remote.
let output = std::process::Command::new("git")
.args(["push", "origin", &branch])
.current_dir(worktree_path)
.output()
.map_err(|e| format!("Failed to run git push: {e}"))?;
if output.status.success() {
Ok(())
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
// If 'origin' doesn't exist, try to find any remote.
if stderr.contains("does not appear to be a git repository")
|| stderr.contains("No such remote")
{
let remotes = std::process::Command::new("git")
.args(["remote"])
.current_dir(worktree_path)
.output()
.map_err(|e| format!("Failed to list remotes: {e}"))?;
let remote_list = String::from_utf8_lossy(&remotes.stdout);
let first_remote = remote_list.lines().next();
if let Some(remote) = first_remote {
let retry = std::process::Command::new("git")
.args(["push", remote.trim(), &branch])
.current_dir(worktree_path)
.output()
.map_err(|e| format!("Failed to push to {remote}: {e}"))?;
if retry.status.success() {
return Ok(());
}
return Err(format!(
"git push to '{remote}' failed: {}",
String::from_utf8_lossy(&retry.stderr)
));
}
// No remotes configured — not an error in agent mode, just skip.
slog!("[agent-mode] No git remote configured; skipping push for '{story_id}'.");
Ok(())
} else {
Err(format!("git push failed: {stderr}"))
}
}
}
// ── Tests ────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn is_node_alive_returns_false_for_unknown_node() {
// Without CRDT init, should return false.
assert!(!is_node_alive("nonexistent_node_id"));
}
#[test]
fn push_feature_branch_handles_missing_worktree() {
let result = push_feature_branch("/nonexistent/path", "test_story");
assert!(result.is_err());
}
#[test]
fn claim_timeout_is_ten_minutes() {
assert_eq!(CLAIM_TIMEOUT_SECS, 600.0);
}
}