//! Main-loop operations: heartbeat, claim scanning, conflict detection, and branch pushing. use std::collections::HashMap; use std::path::Path; use crate::agents::AgentPool; use crate::crdt_state; use crate::slog; use super::claim::{CLAIM_TIMEOUT_SECS, should_self_claim}; /// Write this node's heartbeat to the CRDT `nodes` list. pub(super) fn write_heartbeat(rendezvous_url: &str, port: u16) { let Some(node_id) = crdt_state::our_node_id() else { return; }; let now = chrono::Utc::now().timestamp() as f64; let now_ms = chrono::Utc::now().timestamp_millis() as f64; // Advertise our crdt-sync endpoint. let address = format!("ws://0.0.0.0:{port}/crdt-sync"); crdt_state::write_node_presence(&node_id, &address, now, true); // Write millisecond-precision timestamp via LWW register. crdt_state::write_node_metadata(&node_id, "", None, now_ms); slog!( "[agent-mode] Heartbeat written: node={:.12}… rendezvous={rendezvous_url}", &node_id ); } /// Scan CRDT pipeline for unclaimed stories and claim them. pub(super) async fn scan_and_claim( agents: &AgentPool, project_root: &Path, our_claims: &mut HashMap, ) { let Some(items) = crdt_state::read_all_items() else { return; }; let Some(our_node) = crdt_state::our_node_id() else { return; }; for item in &items { // Only claim stories in execution stages (Coding, Qa, Merge). if !matches!( item.stage(), crate::pipeline_state::Stage::Coding { .. } | crate::pipeline_state::Stage::Qa | crate::pipeline_state::Stage::Merge { .. } ) { continue; } // Skip blocked stories (story 945: `Stage::Blocked` is the source of truth). if matches!( item.stage(), crate::pipeline_state::Stage::Blocked { .. } | crate::pipeline_state::Stage::MergeFailure { .. } | crate::pipeline_state::Stage::MergeFailureFinal { .. } | crate::pipeline_state::Stage::Archived { reason: crate::pipeline_state::ArchiveReason::Blocked { .. }, .. } ) { continue; } let item_claim = match item.stage() { crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(), crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; // If already claimed by us, skip. if item_claim.is_some_and(|c| c.agent.0 == our_node) { continue; } // If claimed by another node, respect the claim while it is fresh. // Once the TTL expires the claim is considered stale regardless of // whether the holder appears alive — displacement is purely TTL-driven. if let Some(claim) = item_claim && claim.agent.0 != our_node { let now = chrono::Utc::now().timestamp() as u64; let age = now.saturating_sub(claim.claimed_at.timestamp() as u64) as f64; if age < CLAIM_TIMEOUT_SECS { // Claim is still fresh — respect it. continue; } // Claim TTL has expired: displace the stale holder. slog!( "[agent-mode] Displacing stale claim on '{}' held by {:.12}… \ (age {}s > TTL {}s)", item.story_id(), claim.agent.0, age as u64, CLAIM_TIMEOUT_SECS as u64, ); } // Pre-spawn hash-based tie-break: only the node whose // SHA-256(node_id || story_id) is strictly lowest among all alive // candidates should write the CRDT claim. This eliminates the // thundering-herd of simultaneous LWW conflicts while keeping the // existing LWW + reclaim-stale logic as a safety net for clock skew // and partial alive-list views. let alive_peers: Vec = crdt_state::read_all_node_presence() .unwrap_or_default() .into_iter() .filter(|n| { let now_ms = chrono::Utc::now().timestamp_millis() as f64; let last_ms = n.last_seen_ms.unwrap_or(n.last_seen * 1000.0); n.alive && (now_ms - last_ms) / 1000.0 < CLAIM_TIMEOUT_SECS }) .map(|n| n.node_id) .collect(); if !should_self_claim(&our_node, item.story_id(), &alive_peers) { slog!( "[agent-mode] Hash tie-break: deferring claim on '{}' to lower-hash peer", item.story_id() ); continue; } // Try to claim this story. slog!( "[agent-mode] Claiming story '{}' for this node", item.story_id() ); if crdt_state::write_claim(item.story_id()) { let now = chrono::Utc::now().timestamp() as f64; our_claims.insert(item.story_id().to_string(), now); } } // Trigger auto-assign to start agents for newly claimed work. agents.auto_assign_available_work(project_root).await; } /// Detect if another node overwrote our claims (CRDT conflict resolution). /// If so, stop our local agent for that story. pub(super) async fn detect_conflicts( agents: &AgentPool, project_root: &Path, our_claims: &mut HashMap, ) { let lost: Vec = our_claims .keys() .filter(|story_id| !crdt_state::is_claimed_by_us(story_id)) .cloned() .collect(); for story_id in lost { slog!( "[agent-mode] Lost claim on '{}' to another node; stopping local agent.", story_id ); our_claims.remove(&story_id); // Stop any local agent for this story by looking up its name. if let Ok(agent_list) = agents.list_agents() { for info in agent_list { if info.story_id == story_id { let _ = agents .stop_agent(project_root, &story_id, &info.agent_name) .await; break; } } } // Release our claim (in case it wasn't fully overwritten). crdt_state::release_claim(&story_id); } } /// Reclaim work from nodes that have timed out (stale heartbeat). pub(super) fn reclaim_timed_out_work(_project_root: &Path) { let Some(items) = crdt_state::read_all_items() else { return; }; let now = chrono::Utc::now().timestamp() as f64; for item in &items { if !matches!( item.stage(), crate::pipeline_state::Stage::Coding { .. } | crate::pipeline_state::Stage::Qa | crate::pipeline_state::Stage::Merge { .. } ) { continue; } // Release the claim if the TTL has expired — regardless of whether the // holder is still alive. A node actively working should refresh its // claim before the TTL window closes. let reclaim_claim = match item.stage() { crate::pipeline_state::Stage::Coding { claim, .. } => claim.as_ref(), crate::pipeline_state::Stage::Merge { claim, .. } => claim.as_ref(), _ => None, }; if let Some(claim) = reclaim_claim { let claim_ts = claim.claimed_at.timestamp() as u64; let age = now as u64 - claim_ts.min(now as u64); if age as f64 >= CLAIM_TIMEOUT_SECS { slog!( "[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)", item.story_id(), claim.agent.0, age, ); crdt_state::release_claim(item.story_id()); } } } } /// Check for completed agents, push their feature branches to the remote, /// and report completion via CRDT. pub(super) async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) { let Ok(agent_list) = agents.list_agents() else { return; }; for info in agent_list { if !matches!( info.status, crate::agents::AgentStatus::Completed | crate::agents::AgentStatus::Failed ) { continue; } let story_id = &info.story_id; // Only push if this node still owns the claim. if !crdt_state::is_claimed_by_us(story_id) { continue; } slog!( "[agent-mode] Agent {} for '{}'; pushing feature branch.", if matches!(info.status, crate::agents::AgentStatus::Completed) { "completed" } else { "failed" }, story_id ); // Push the feature branch to the remote. if let Some(ref wt) = info.worktree_path { let push_result = push_feature_branch(wt, story_id); match push_result { Ok(()) => { slog!("[agent-mode] Pushed feature branch for '{story_id}' to remote."); } Err(e) => { slog!("[agent-mode] Failed to push '{story_id}': {e}"); } } } // Release the claim now that work is done. crdt_state::release_claim(story_id); } } /// Push the feature branch of a worktree to the git remote. pub(super) fn push_feature_branch(worktree_path: &str, story_id: &str) -> Result<(), String> { let branch = format!("feature/story-{story_id}"); // Try to push to 'origin'. If origin doesn't exist, try the first remote. let output = std::process::Command::new("git") .args(["push", "origin", &branch]) .current_dir(worktree_path) .output() .map_err(|e| format!("Failed to run git push: {e}"))?; if output.status.success() { Ok(()) } else { let stderr = String::from_utf8_lossy(&output.stderr); // If 'origin' doesn't exist, try to find any remote. if stderr.contains("does not appear to be a git repository") || stderr.contains("No such remote") { let remotes = std::process::Command::new("git") .args(["remote"]) .current_dir(worktree_path) .output() .map_err(|e| format!("Failed to list remotes: {e}"))?; let remote_list = String::from_utf8_lossy(&remotes.stdout); let first_remote = remote_list.lines().next(); if let Some(remote) = first_remote { let retry = std::process::Command::new("git") .args(["push", remote.trim(), &branch]) .current_dir(worktree_path) .output() .map_err(|e| format!("Failed to push to {remote}: {e}"))?; if retry.status.success() { return Ok(()); } return Err(format!( "git push to '{remote}' failed: {}", String::from_utf8_lossy(&retry.stderr) )); } // No remotes configured — not an error in agent mode, just skip. slog!("[agent-mode] No git remote configured; skipping push for '{story_id}'."); Ok(()) } else { Err(format!("git push failed: {stderr}")) } } } // ── Tests ──────────────────────────────────────────────────────────────── #[cfg(test)] mod tests { use super::*; #[test] fn push_feature_branch_handles_missing_worktree() { let result = push_feature_branch("/nonexistent/path", "test_story"); assert!(result.is_err()); } }