Files
huskies/server/src/agent_mode/loop_ops.rs
T

311 lines
11 KiB
Rust
Raw Normal View History

2026-04-28 18:59:10 +00:00
//! Main-loop operations: heartbeat, claim scanning, conflict detection, and branch pushing.
use std::collections::HashMap;
use std::path::Path;
use crate::agents::AgentPool;
use crate::crdt_state;
use crate::slog;
use super::claim::{CLAIM_TIMEOUT_SECS, should_self_claim};
/// Write this node's heartbeat to the CRDT `nodes` list.
pub(super) fn write_heartbeat(rendezvous_url: &str, port: u16) {
let Some(node_id) = crdt_state::our_node_id() else {
return;
};
let now = chrono::Utc::now().timestamp() as f64;
let now_ms = chrono::Utc::now().timestamp_millis() as f64;
// Advertise our crdt-sync endpoint.
let address = format!("ws://0.0.0.0:{port}/crdt-sync");
crdt_state::write_node_presence(&node_id, &address, now, true);
// Write millisecond-precision timestamp via LWW register.
crdt_state::write_node_metadata(&node_id, "", None, now_ms);
slog!(
"[agent-mode] Heartbeat written: node={:.12}… rendezvous={rendezvous_url}",
&node_id
);
}
/// Scan CRDT pipeline for unclaimed stories and claim them.
pub(super) async fn scan_and_claim(
agents: &AgentPool,
project_root: &Path,
our_claims: &mut HashMap<String, f64>,
) {
let Some(items) = crdt_state::read_all_items() else {
return;
};
let Some(our_node) = crdt_state::our_node_id() else {
return;
};
for item in &items {
// Only claim stories in active stages.
2026-05-12 17:03:41 +00:00
if !crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| s.is_active())
{
2026-04-28 18:59:10 +00:00
continue;
}
// Skip blocked stories.
2026-05-12 17:03:41 +00:00
if item.blocked() {
2026-04-28 18:59:10 +00:00
continue;
}
// If already claimed by us, skip.
2026-05-12 17:03:41 +00:00
if item.claimed_by() == Some(our_node.as_str()) {
2026-04-28 18:59:10 +00:00
continue;
}
// If claimed by another node, respect the claim while it is fresh.
// Once the TTL expires the claim is considered stale regardless of
// whether the holder appears alive — displacement is purely TTL-driven.
2026-05-12 17:03:41 +00:00
if let Some(claimer) = item.claimed_by()
2026-04-28 18:59:10 +00:00
&& !claimer.is_empty()
2026-05-12 17:03:41 +00:00
&& claimer != our_node.as_str()
&& let Some(claimed_at) = item.claimed_at()
2026-04-28 18:59:10 +00:00
{
let now = chrono::Utc::now().timestamp() as f64;
let age = now - claimed_at;
if age < CLAIM_TIMEOUT_SECS {
// Claim is still fresh — respect it.
continue;
}
// Claim TTL has expired: displace the stale holder.
slog!(
"[agent-mode] Displacing stale claim on '{}' held by {:.12}… \
(age {}s > TTL {}s)",
2026-05-12 17:03:41 +00:00
item.story_id(),
2026-04-28 18:59:10 +00:00
claimer,
age as u64,
CLAIM_TIMEOUT_SECS as u64,
);
}
// Pre-spawn hash-based tie-break: only the node whose
// SHA-256(node_id || story_id) is strictly lowest among all alive
// candidates should write the CRDT claim. This eliminates the
// thundering-herd of simultaneous LWW conflicts while keeping the
// existing LWW + reclaim-stale logic as a safety net for clock skew
// and partial alive-list views.
let alive_peers: Vec<String> = crdt_state::read_all_node_presence()
.unwrap_or_default()
.into_iter()
.filter(|n| {
let now_ms = chrono::Utc::now().timestamp_millis() as f64;
let last_ms = n.last_seen_ms.unwrap_or(n.last_seen * 1000.0);
n.alive && (now_ms - last_ms) / 1000.0 < CLAIM_TIMEOUT_SECS
})
.map(|n| n.node_id)
.collect();
2026-05-12 17:03:41 +00:00
if !should_self_claim(&our_node, item.story_id(), &alive_peers) {
2026-04-28 18:59:10 +00:00
slog!(
"[agent-mode] Hash tie-break: deferring claim on '{}' to lower-hash peer",
2026-05-12 17:03:41 +00:00
item.story_id()
2026-04-28 18:59:10 +00:00
);
continue;
}
// Try to claim this story.
slog!(
"[agent-mode] Claiming story '{}' for this node",
2026-05-12 17:03:41 +00:00
item.story_id()
2026-04-28 18:59:10 +00:00
);
2026-05-12 17:03:41 +00:00
if crdt_state::write_claim(item.story_id()) {
2026-04-28 18:59:10 +00:00
let now = chrono::Utc::now().timestamp() as f64;
2026-05-12 17:03:41 +00:00
our_claims.insert(item.story_id().to_string(), now);
2026-04-28 18:59:10 +00:00
}
}
// Trigger auto-assign to start agents for newly claimed work.
agents.auto_assign_available_work(project_root).await;
}
/// Detect if another node overwrote our claims (CRDT conflict resolution).
/// If so, stop our local agent for that story.
pub(super) async fn detect_conflicts(
agents: &AgentPool,
project_root: &Path,
our_claims: &mut HashMap<String, f64>,
) {
let lost: Vec<String> = our_claims
.keys()
.filter(|story_id| !crdt_state::is_claimed_by_us(story_id))
.cloned()
.collect();
for story_id in lost {
slog!(
"[agent-mode] Lost claim on '{}' to another node; stopping local agent.",
story_id
);
our_claims.remove(&story_id);
// Stop any local agent for this story by looking up its name.
if let Ok(agent_list) = agents.list_agents() {
for info in agent_list {
if info.story_id == story_id {
let _ = agents
.stop_agent(project_root, &story_id, &info.agent_name)
.await;
break;
}
}
}
// Release our claim (in case it wasn't fully overwritten).
crdt_state::release_claim(&story_id);
}
}
/// Reclaim work from nodes that have timed out (stale heartbeat).
pub(super) fn reclaim_timed_out_work(_project_root: &Path) {
let Some(items) = crdt_state::read_all_items() else {
return;
};
let now = chrono::Utc::now().timestamp() as f64;
for item in &items {
2026-05-12 17:03:41 +00:00
if !crate::pipeline_state::Stage::from_dir(item.stage_str()).is_some_and(|s| s.is_active())
{
2026-04-28 18:59:10 +00:00
continue;
}
// Release the claim if the TTL has expired — regardless of whether the
// holder is still alive. A node actively working should refresh its
// claim before the TTL window closes.
2026-05-12 17:03:41 +00:00
if let Some(claimer) = item.claimed_by() {
2026-04-28 18:59:10 +00:00
if claimer.is_empty() {
continue;
}
2026-05-12 17:03:41 +00:00
if let Some(claimed_at) = item.claimed_at()
2026-04-28 18:59:10 +00:00
&& now - claimed_at >= CLAIM_TIMEOUT_SECS
{
slog!(
"[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)",
2026-05-12 17:03:41 +00:00
item.story_id(),
2026-04-28 18:59:10 +00:00
claimer,
(now - claimed_at) as u64,
);
2026-05-12 17:03:41 +00:00
crdt_state::release_claim(item.story_id());
2026-04-28 18:59:10 +00:00
}
}
}
}
/// Check for completed agents, push their feature branches to the remote,
/// and report completion via CRDT.
pub(super) async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) {
let Ok(agent_list) = agents.list_agents() else {
return;
};
for info in agent_list {
if !matches!(
info.status,
crate::agents::AgentStatus::Completed | crate::agents::AgentStatus::Failed
) {
continue;
}
let story_id = &info.story_id;
// Only push if this node still owns the claim.
if !crdt_state::is_claimed_by_us(story_id) {
continue;
}
slog!(
"[agent-mode] Agent {} for '{}'; pushing feature branch.",
if matches!(info.status, crate::agents::AgentStatus::Completed) {
"completed"
} else {
"failed"
},
story_id
);
// Push the feature branch to the remote.
if let Some(ref wt) = info.worktree_path {
let push_result = push_feature_branch(wt, story_id);
match push_result {
Ok(()) => {
slog!("[agent-mode] Pushed feature branch for '{story_id}' to remote.");
}
Err(e) => {
slog!("[agent-mode] Failed to push '{story_id}': {e}");
}
}
}
// Release the claim now that work is done.
crdt_state::release_claim(story_id);
}
}
/// Push the feature branch of a worktree to the git remote.
pub(super) fn push_feature_branch(worktree_path: &str, story_id: &str) -> Result<(), String> {
let branch = format!("feature/story-{story_id}");
// Try to push to 'origin'. If origin doesn't exist, try the first remote.
let output = std::process::Command::new("git")
.args(["push", "origin", &branch])
.current_dir(worktree_path)
.output()
.map_err(|e| format!("Failed to run git push: {e}"))?;
if output.status.success() {
Ok(())
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
// If 'origin' doesn't exist, try to find any remote.
if stderr.contains("does not appear to be a git repository")
|| stderr.contains("No such remote")
{
let remotes = std::process::Command::new("git")
.args(["remote"])
.current_dir(worktree_path)
.output()
.map_err(|e| format!("Failed to list remotes: {e}"))?;
let remote_list = String::from_utf8_lossy(&remotes.stdout);
let first_remote = remote_list.lines().next();
if let Some(remote) = first_remote {
let retry = std::process::Command::new("git")
.args(["push", remote.trim(), &branch])
.current_dir(worktree_path)
.output()
.map_err(|e| format!("Failed to push to {remote}: {e}"))?;
if retry.status.success() {
return Ok(());
}
return Err(format!(
"git push to '{remote}' failed: {}",
String::from_utf8_lossy(&retry.stderr)
));
}
// No remotes configured — not an error in agent mode, just skip.
slog!("[agent-mode] No git remote configured; skipping push for '{story_id}'.");
Ok(())
} else {
Err(format!("git push failed: {stderr}"))
}
}
}
// ── Tests ────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_feature_branch_handles_missing_worktree() {
let result = push_feature_branch("/nonexistent/path", "test_story");
assert!(result.is_err());
}
}