Files
huskies/server/src/agent_mode.rs
T
2026-04-28 10:51:59 +00:00

930 lines
35 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Headless build-agent mode for distributed, rendezvous-based story processing.
/// Headless build agent mode.
///
/// When invoked via `huskies agent --rendezvous ws://host:3001/crdt-sync`, this
/// module runs a headless loop that:
///
/// 1. Syncs CRDT state with the rendezvous peer.
/// 2. Writes a heartbeat to the CRDT `nodes` list.
/// 3. Scans for unclaimed stories in `2_current` and claims them via CRDT.
/// 4. Spawns Claude Code locally for each claimed story.
/// 5. Pushes the feature branch to the git remote when done.
/// 6. Reports completion by advancing the story stage via CRDT.
/// 7. Handles offline/reconnect: CRDT merges on reconnect, interrupted work
/// is reclaimed after a timeout.
///
/// A minimal HTTP server is started on the agent's port to serve the
/// `/crdt-sync` WebSocket endpoint, enabling other agents to connect for
/// peer mesh discovery.
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::broadcast;
use poem::EndpointExt as _;
use crate::agents::AgentPool;
use crate::config::ProjectConfig;
use crate::crdt_state;
use crate::io::watcher;
use crate::mesh;
use crate::slog;
/// Default claim TTL in seconds. If a claim has not been refreshed within this
/// window, other nodes may displace the stale holder and claim the story.
/// A node actively working on a story should refresh its claim periodically.
pub(crate) const CLAIM_TIMEOUT_SECS: f64 = 1800.0; // 30 minutes
// ── Hash-based tie-break ──────────────────────────────────────────────────
/// Compute the claim-priority hash for a `(node_id, story_id)` pair.
///
/// Uses SHA-256(`node_id` bytes ++ `story_id` bytes), truncated to the first
/// 8 bytes interpreted as a big-endian `u64`. This function is:
///
/// * **Deterministic** — same inputs always produce the same output.
/// * **Stable across restarts** — depends only on the node's persistent id
/// and the story id, not on wall-clock time or random state.
/// * **Cross-implementation portable** — SHA-256 is a standard primitive; any
/// conforming implementation will produce identical values.
fn claim_hash(node_id: &str, story_id: &str) -> u64 {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(node_id.as_bytes());
hasher.update(story_id.as_bytes());
let digest = hasher.finalize();
u64::from_be_bytes(digest[..8].try_into().expect("sha256 is 32 bytes"))
}
/// Decide whether this node should be the one to claim `story_id`.
///
/// Returns `true` iff `claim_hash(self_node_id, story_id)` is **strictly
/// lower** than the hash of every alive peer. When there are no alive peers
/// (single-node cluster) the result is always `true`.
///
/// # Trade-off note
/// Because the winning node is determined purely by the hash of its id and the
/// story id, the distribution is uniform per story but a given node may
/// consistently "win" or "lose" across a set of stories depending on how its
/// id happens to hash. For 25 node clusters this imbalance is negligible in
/// practice: any node is the lowest-hash winner with probability ≈ 1/N for a
/// random story id, so the long-run distribution is approximately fair. For
/// clusters with many nodes (e.g. >10) the expected variance is larger and
/// operators may want a different work-distribution strategy.
pub fn should_self_claim(
self_node_id: &str,
story_id: &str,
alive_peer_node_ids: &[String],
) -> bool {
let my_hash = claim_hash(self_node_id, story_id);
for peer_id in alive_peer_node_ids {
// Skip self if it appears in the peer list.
if peer_id == self_node_id {
continue;
}
if claim_hash(peer_id, story_id) <= my_hash {
return false;
}
}
true
}
/// Interval between heartbeat writes and work scans.
pub const SCAN_INTERVAL_SECS: u64 = 15;
/// Run the headless build agent loop.
///
/// This function never returns under normal operation — it runs until the
/// process is terminated (SIGINT/SIGTERM).
///
/// If `join_token` and `gateway_url` are both provided the agent will register
/// itself with the gateway on startup using the one-time token.
pub async fn run(
project_root: Option<PathBuf>,
rendezvous_url: String,
port: u16,
join_token: Option<String>,
gateway_url: Option<String>,
) -> Result<(), std::io::Error> {
let project_root = match project_root {
Some(r) => r,
None => {
eprintln!("error: agent mode requires a project root (no .huskies/ found)");
std::process::exit(1);
}
};
println!("\x1b[96;1m[agent-mode]\x1b[0m Starting headless build agent");
println!("\x1b[96;1m[agent-mode]\x1b[0m Rendezvous: {rendezvous_url}");
println!(
"\x1b[96;1m[agent-mode]\x1b[0m Project: {}",
project_root.display()
);
// Validate project config.
let config = ProjectConfig::load(&project_root).unwrap_or_else(|e| {
eprintln!("error: invalid project config: {e}");
std::process::exit(1);
});
slog!(
"[agent-mode] Loaded config with {} agents",
config.agent.len()
);
// Event bus for pipeline lifecycle events.
let (watcher_tx, _) = broadcast::channel::<watcher::WatcherEvent>(1024);
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));
// Start filesystem watcher for config hot-reload.
watcher::start_watcher(project_root.clone(), watcher_tx.clone());
// Bridge CRDT events to watcher channel (same as main server).
{
let crdt_watcher_tx = watcher_tx.clone();
let crdt_prune_root = Some(project_root.clone());
if let Some(mut crdt_rx) = crdt_state::subscribe() {
tokio::spawn(async move {
while let Ok(evt) = crdt_rx.recv().await {
if crate::pipeline_state::Stage::from_dir(&evt.to_stage)
.is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. }))
&& let Some(root) = crdt_prune_root.as_ref().cloned()
{
let story_id = evt.story_id.clone();
tokio::task::spawn_blocking(move || {
if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) {
slog!("[agent-mode] worktree prune failed for {story_id}: {e}");
}
});
}
let (action, commit_msg) =
watcher::stage_metadata(&evt.to_stage, &evt.story_id)
.unwrap_or(("update", format!("huskies: update {}", evt.story_id)));
let watcher_evt = watcher::WatcherEvent::WorkItem {
stage: evt.to_stage,
item_id: evt.story_id,
action: action.to_string(),
commit_msg,
from_stage: evt.from_stage,
};
let _ = crdt_watcher_tx.send(watcher_evt);
}
});
}
}
// Subscribe to watcher events to trigger auto-assign on stage transitions.
{
let auto_rx = watcher_tx.subscribe();
let auto_agents = Arc::clone(&agents);
let auto_root = project_root.clone();
tokio::spawn(async move {
let mut rx = auto_rx;
while let Ok(event) = rx.recv().await {
if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event
&& crate::pipeline_state::Stage::from_dir(stage.as_str())
.is_some_and(|s| s.is_active())
{
slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign.");
auto_agents.auto_assign_available_work(&auto_root).await;
}
}
});
}
// ── Start minimal HTTP server for /crdt-sync endpoint ─────────────
//
// Other agents discover this endpoint via the CRDT `nodes` list and
// open supplementary mesh connections for resilience.
{
let sync_handler = poem::get(crate::crdt_sync::crdt_sync_handler);
// Build a minimal AppContext for the crdt_sync_handler (the handler
// receives it via Data<> but doesn't use it — the underscore prefix
// on `_ctx` confirms this).
let agent_ctx = build_agent_app_context(&project_root, port, watcher_tx.clone());
let agent_ctx_arc = Arc::new(agent_ctx);
let app = poem::Route::new()
.at("/crdt-sync", sync_handler)
.data(agent_ctx_arc);
let bind_addr = format!("0.0.0.0:{port}");
slog!("[agent-mode] Starting /crdt-sync endpoint on {bind_addr}");
tokio::spawn(async move {
if let Err(e) = poem::Server::new(poem::listener::TcpListener::bind(&bind_addr))
.run(app)
.await
{
slog!("[agent-mode] HTTP server error: {e}");
}
});
}
// Write initial heartbeat.
write_heartbeat(&rendezvous_url, port);
// Register with gateway if a join token and gateway URL were provided.
if let (Some(token), Some(url)) = (join_token.clone(), gateway_url) {
let node_id = crdt_state::our_node_id().unwrap_or_else(|| "unknown".to_string());
let label = format!("build-agent-{}", &node_id[..node_id.len().min(8)]);
let address = format!("ws://0.0.0.0:{port}/crdt-sync");
register_with_gateway(&url, &token, &label, &address).await;
}
// ── Mesh peer discovery ───────────────────────────────────────────
//
// Periodically read the CRDT `nodes` list and open supplementary sync
// connections to alive peers. The primary rendezvous connection remains
// canonical; mesh connections are supplementary and don't block startup.
let _mesh_handle = {
let our_node_id = crdt_state::our_node_id().unwrap_or_default();
let max_mesh_peers = config.max_mesh_peers;
mesh::spawn_mesh_discovery(
max_mesh_peers,
our_node_id,
rendezvous_url.clone(),
join_token,
)
};
// Reconcile any committed work from a previous session.
{
let recon_agents = Arc::clone(&agents);
let recon_root = project_root.clone();
let (recon_tx, _) = broadcast::channel(64);
slog!("[agent-mode] Reconciling completed worktrees from previous session.");
recon_agents
.reconcile_on_startup(&recon_root, &recon_tx)
.await;
}
// Run initial auto-assign.
slog!("[agent-mode] Initial auto-assign scan.");
agents.auto_assign_available_work(&project_root).await;
// Track which stories we've claimed so we can detect conflicts.
let mut our_claims: HashMap<String, f64> = HashMap::new();
// Main loop: heartbeat, scan, claim, detect conflicts.
let mut interval = tokio::time::interval(std::time::Duration::from_secs(SCAN_INTERVAL_SECS));
loop {
interval.tick().await;
// Write heartbeat.
write_heartbeat(&rendezvous_url, port);
// Scan CRDT for claimable work.
scan_and_claim(&agents, &project_root, &mut our_claims).await;
// Detect claim conflicts: if another node overwrote our claim, stop our agent.
detect_conflicts(&agents, &project_root, &mut our_claims).await;
// Reclaim timed-out work from dead nodes.
reclaim_timed_out_work(&project_root);
// Check for completed agents and push their branches.
check_completions_and_push(&agents, &project_root).await;
}
}
/// Write this node's heartbeat to the CRDT `nodes` list.
fn write_heartbeat(rendezvous_url: &str, port: u16) {
let Some(node_id) = crdt_state::our_node_id() else {
return;
};
let now = chrono::Utc::now().timestamp() as f64;
let now_ms = chrono::Utc::now().timestamp_millis() as f64;
// Advertise our crdt-sync endpoint.
let address = format!("ws://0.0.0.0:{port}/crdt-sync");
crdt_state::write_node_presence(&node_id, &address, now, true);
// Write millisecond-precision timestamp via LWW register.
crdt_state::write_node_metadata(&node_id, "", None, now_ms);
slog!(
"[agent-mode] Heartbeat written: node={:.12}… rendezvous={rendezvous_url}",
&node_id
);
}
/// Scan CRDT pipeline for unclaimed stories and claim them.
async fn scan_and_claim(
agents: &AgentPool,
project_root: &Path,
our_claims: &mut HashMap<String, f64>,
) {
let Some(items) = crdt_state::read_all_items() else {
return;
};
let Some(our_node) = crdt_state::our_node_id() else {
return;
};
for item in &items {
// Only claim stories in active stages.
if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) {
continue;
}
// Skip blocked stories.
if item.blocked == Some(true) {
continue;
}
// If already claimed by us, skip.
if item.claimed_by.as_deref() == Some(&our_node) {
continue;
}
// If claimed by another node, respect the claim while it is fresh.
// Once the TTL expires the claim is considered stale regardless of
// whether the holder appears alive — displacement is purely TTL-driven.
if let Some(ref claimer) = item.claimed_by
&& !claimer.is_empty()
&& claimer != &our_node
&& let Some(claimed_at) = item.claimed_at
{
let now = chrono::Utc::now().timestamp() as f64;
let age = now - claimed_at;
if age < CLAIM_TIMEOUT_SECS {
// Claim is still fresh — respect it.
continue;
}
// Claim TTL has expired: displace the stale holder.
slog!(
"[agent-mode] Displacing stale claim on '{}' held by {:.12}… \
(age {}s > TTL {}s)",
item.story_id,
claimer,
age as u64,
CLAIM_TIMEOUT_SECS as u64,
);
}
// Pre-spawn hash-based tie-break: only the node whose
// SHA-256(node_id || story_id) is strictly lowest among all alive
// candidates should write the CRDT claim. This eliminates the
// thundering-herd of simultaneous LWW conflicts while keeping the
// existing LWW + reclaim-stale logic as a safety net for clock skew
// and partial alive-list views.
let alive_peers: Vec<String> = crdt_state::read_all_node_presence()
.unwrap_or_default()
.into_iter()
.filter(|n| {
let now_ms = chrono::Utc::now().timestamp_millis() as f64;
let last_ms = n.last_seen_ms.unwrap_or(n.last_seen * 1000.0);
n.alive && (now_ms - last_ms) / 1000.0 < CLAIM_TIMEOUT_SECS
})
.map(|n| n.node_id)
.collect();
if !should_self_claim(&our_node, &item.story_id, &alive_peers) {
slog!(
"[agent-mode] Hash tie-break: deferring claim on '{}' to lower-hash peer",
item.story_id
);
continue;
}
// Try to claim this story.
slog!(
"[agent-mode] Claiming story '{}' for this node",
item.story_id
);
if crdt_state::write_claim(&item.story_id) {
let now = chrono::Utc::now().timestamp() as f64;
our_claims.insert(item.story_id.clone(), now);
}
}
// Trigger auto-assign to start agents for newly claimed work.
agents.auto_assign_available_work(project_root).await;
}
/// Detect if another node overwrote our claims (CRDT conflict resolution).
/// If so, stop our local agent for that story.
async fn detect_conflicts(
agents: &AgentPool,
project_root: &Path,
our_claims: &mut HashMap<String, f64>,
) {
let lost: Vec<String> = our_claims
.keys()
.filter(|story_id| !crdt_state::is_claimed_by_us(story_id))
.cloned()
.collect();
for story_id in lost {
slog!(
"[agent-mode] Lost claim on '{}' to another node; stopping local agent.",
story_id
);
our_claims.remove(&story_id);
// Stop any local agent for this story by looking up its name.
if let Ok(agent_list) = agents.list_agents() {
for info in agent_list {
if info.story_id == story_id {
let _ = agents
.stop_agent(project_root, &story_id, &info.agent_name)
.await;
break;
}
}
}
// Release our claim (in case it wasn't fully overwritten).
crdt_state::release_claim(&story_id);
}
}
/// Reclaim work from nodes that have timed out (stale heartbeat).
fn reclaim_timed_out_work(_project_root: &Path) {
let Some(items) = crdt_state::read_all_items() else {
return;
};
let now = chrono::Utc::now().timestamp() as f64;
for item in &items {
if !crate::pipeline_state::Stage::from_dir(&item.stage).is_some_and(|s| s.is_active()) {
continue;
}
// Release the claim if the TTL has expired — regardless of whether the
// holder is still alive. A node actively working should refresh its
// claim before the TTL window closes.
if let Some(ref claimer) = item.claimed_by {
if claimer.is_empty() {
continue;
}
if let Some(claimed_at) = item.claimed_at
&& now - claimed_at >= CLAIM_TIMEOUT_SECS
{
slog!(
"[agent-mode] Releasing stale claim on '{}' held by {:.12}… (age {}s)",
item.story_id,
claimer,
(now - claimed_at) as u64,
);
crdt_state::release_claim(&item.story_id);
}
}
}
}
/// Check for completed agents, push their feature branches to the remote,
/// and report completion via CRDT.
async fn check_completions_and_push(agents: &AgentPool, _project_root: &Path) {
let Ok(agent_list) = agents.list_agents() else {
return;
};
for info in agent_list {
if !matches!(
info.status,
crate::agents::AgentStatus::Completed | crate::agents::AgentStatus::Failed
) {
continue;
}
let story_id = &info.story_id;
// Only push if this node still owns the claim.
if !crdt_state::is_claimed_by_us(story_id) {
continue;
}
slog!(
"[agent-mode] Agent {} for '{}'; pushing feature branch.",
if matches!(info.status, crate::agents::AgentStatus::Completed) {
"completed"
} else {
"failed"
},
story_id
);
// Push the feature branch to the remote.
if let Some(ref wt) = info.worktree_path {
let push_result = push_feature_branch(wt, story_id);
match push_result {
Ok(()) => {
slog!("[agent-mode] Pushed feature branch for '{story_id}' to remote.");
}
Err(e) => {
slog!("[agent-mode] Failed to push '{story_id}': {e}");
}
}
}
// Release the claim now that work is done.
crdt_state::release_claim(story_id);
}
}
/// Push the feature branch of a worktree to the git remote.
fn push_feature_branch(worktree_path: &str, story_id: &str) -> Result<(), String> {
let branch = format!("feature/story-{story_id}");
// Try to push to 'origin'. If origin doesn't exist, try the first remote.
let output = std::process::Command::new("git")
.args(["push", "origin", &branch])
.current_dir(worktree_path)
.output()
.map_err(|e| format!("Failed to run git push: {e}"))?;
if output.status.success() {
Ok(())
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
// If 'origin' doesn't exist, try to find any remote.
if stderr.contains("does not appear to be a git repository")
|| stderr.contains("No such remote")
{
let remotes = std::process::Command::new("git")
.args(["remote"])
.current_dir(worktree_path)
.output()
.map_err(|e| format!("Failed to list remotes: {e}"))?;
let remote_list = String::from_utf8_lossy(&remotes.stdout);
let first_remote = remote_list.lines().next();
if let Some(remote) = first_remote {
let retry = std::process::Command::new("git")
.args(["push", remote.trim(), &branch])
.current_dir(worktree_path)
.output()
.map_err(|e| format!("Failed to push to {remote}: {e}"))?;
if retry.status.success() {
return Ok(());
}
return Err(format!(
"git push to '{remote}' failed: {}",
String::from_utf8_lossy(&retry.stderr)
));
}
// No remotes configured — not an error in agent mode, just skip.
slog!("[agent-mode] No git remote configured; skipping push for '{story_id}'.");
Ok(())
} else {
Err(format!("git push failed: {stderr}"))
}
}
}
// ── Gateway registration ──────────────────────────────────────────────────
/// Register this build agent with a gateway using a one-time join token.
///
/// POSTs `{ token, label, address }` to `{gateway_url}/gateway/register`. On
/// success the gateway stores the agent and it will appear in the gateway UI.
async fn register_with_gateway(gateway_url: &str, token: &str, label: &str, address: &str) {
let client = reqwest::Client::new();
let url = format!("{}/gateway/register", gateway_url.trim_end_matches('/'));
let body = serde_json::json!({
"token": token,
"label": label,
"address": address,
});
match client.post(&url).json(&body).send().await {
Ok(resp) if resp.status().is_success() => {
slog!("[agent-mode] Registered with gateway at {gateway_url}");
}
Ok(resp) => {
slog!(
"[agent-mode] Gateway registration failed: HTTP {}",
resp.status()
);
}
Err(e) => {
slog!("[agent-mode] Gateway registration error: {e}");
}
}
}
/// Build a minimal [`AppContext`] for the agent-mode HTTP server.
///
/// The `/crdt-sync` handler receives `Data<&Arc<AppContext>>` but doesn't
/// actually use it (the parameter is named `_ctx`). We construct a
/// lightweight context with just enough state to satisfy Poem's data
/// extractor.
fn build_agent_app_context(
project_root: &Path,
port: u16,
watcher_tx: broadcast::Sender<watcher::WatcherEvent>,
) -> crate::http::context::AppContext {
let state = crate::state::SessionState::default();
*state.project_root.lock().unwrap() = Some(project_root.to_path_buf());
let store_path = project_root.join(".huskies").join("store.json");
let store = Arc::new(
crate::store::JsonFileStore::from_path(store_path)
.unwrap_or_else(|e| panic!("Failed to open store: {e}")),
);
let (reconciliation_tx, _) = broadcast::channel(64);
let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel();
let timer_store = Arc::new(crate::service::timer::TimerStore::load(
project_root.join(".huskies").join("timers.json"),
));
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));
let services = Arc::new(crate::services::Services {
project_root: project_root.to_path_buf(),
agents: Arc::clone(&agents),
bot_name: "Agent".to_string(),
bot_user_id: String::new(),
ambient_rooms: Arc::new(std::sync::Mutex::new(std::collections::HashSet::new())),
perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)),
pending_perm_replies: Arc::new(tokio::sync::Mutex::new(std::collections::HashMap::new())),
permission_timeout_secs: 120,
status: agents.status_broadcaster(),
});
crate::http::context::AppContext {
state: Arc::new(state),
store,
workflow: Arc::new(std::sync::Mutex::new(
crate::workflow::WorkflowState::default(),
)),
services,
watcher_tx,
reconciliation_tx,
perm_tx,
qa_app_process: Arc::new(std::sync::Mutex::new(None)),
bot_shutdown: None,
matrix_shutdown_tx: None,
timer_store,
}
}
// ── Tests ────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_feature_branch_handles_missing_worktree() {
let result = push_feature_branch("/nonexistent/path", "test_story");
assert!(result.is_err());
}
#[test]
fn claim_timeout_is_thirty_minutes() {
assert_eq!(CLAIM_TIMEOUT_SECS, 1800.0);
}
/// AC: seed a stale claim older than the TTL, attempt a new claim from a
/// different agent, assert the new claim succeeds and displacement is logged.
#[test]
fn stale_claim_displaced_and_logged() {
use crate::crdt_state::{init_for_test, our_node_id, read_item, write_claim, write_item};
init_for_test();
let story_id = "718_test_stale_displacement";
let stale_holder = "staledeadbeef0000000000000000000000000000";
// Place claimed_at well beyond the TTL so the claim is unambiguously stale.
let stale_time = chrono::Utc::now().timestamp() as f64 - CLAIM_TIMEOUT_SECS - 300.0;
// Seed the story with a stale claim from a foreign node.
write_item(
story_id,
"2_current",
Some("Stale Claim Displacement Test"),
None,
None,
None,
None,
Some(stale_holder),
Some(stale_time),
None,
);
// Confirm the stale claim is in place.
let before = read_item(story_id).expect("item should exist");
assert_eq!(
before.claimed_by.as_deref(),
Some(stale_holder),
"pre-condition: item should be claimed by the stale holder"
);
let age = chrono::Utc::now().timestamp() as f64 - before.claimed_at.unwrap_or(0.0);
assert!(
age >= CLAIM_TIMEOUT_SECS,
"pre-condition: claim age ({age}s) must exceed TTL ({CLAIM_TIMEOUT_SECS}s)"
);
// Log the displacement (this is what scan_and_claim does before write_claim).
crate::slog!(
"[agent-mode] Displacing stale claim on '{}' held by {:.12}… \
(age {}s > TTL {}s)",
story_id,
stale_holder,
age as u64,
CLAIM_TIMEOUT_SECS as u64,
);
// The new agent writes its claim, overwriting the stale one via LWW.
let success = write_claim(story_id);
assert!(
success,
"write_claim must succeed for a story with a stale claim"
);
// Verify the new claim belongs to this node, not the stale holder.
let our_id = our_node_id().expect("node id should be available after init_for_test");
let after = read_item(story_id).expect("item should still exist");
assert_eq!(
after.claimed_by.as_deref(),
Some(our_id.as_str()),
"new claim should have displaced the stale holder"
);
assert_ne!(
after.claimed_by.as_deref(),
Some(stale_holder),
"stale holder must no longer own the claim"
);
// Verify the displacement was logged.
let logs =
crate::log_buffer::global().get_recent(100, Some("Displacing stale claim"), None);
assert!(
!logs.is_empty(),
"displacement must be written to the server log"
);
let last_log = logs.last().unwrap();
assert!(
last_log.contains(story_id),
"log entry must name the story; got: {last_log}"
);
assert!(
last_log.contains(&stale_holder[..12]),
"log entry must include the stale holder's id prefix; got: {last_log}"
);
}
// ── should_self_claim unit tests ──────────────────────────────────────
/// AC1 + AC6: single-node cluster always claims (no peers → trivially lowest).
#[test]
fn should_self_claim_single_node_always_claims() {
assert!(should_self_claim("node-a", "story-1", &[]));
assert!(should_self_claim("node-a", "story-2", &[]));
assert!(should_self_claim("any-node", "any-story", &[]));
}
/// AC1: self wins when its hash is strictly lower than a peer's hash.
/// We compute the actual hashes to construct a deterministic test.
#[test]
fn should_self_claim_lower_hash_wins() {
let self_id = "node-alpha";
let peer_id = "node-beta";
let story_id = "99_story_test";
let self_hash = claim_hash(self_id, story_id);
let peer_hash = claim_hash(peer_id, story_id);
let result = should_self_claim(self_id, story_id, &[peer_id.to_string()]);
// Result must agree with the actual hash comparison.
assert_eq!(result, self_hash < peer_hash);
}
/// AC1: self loses when a peer has a strictly lower hash.
#[test]
fn should_self_claim_higher_hash_loses() {
let self_id = "node-beta";
let peer_id = "node-alpha";
let story_id = "99_story_test";
let self_hash = claim_hash(self_id, story_id);
let peer_hash = claim_hash(peer_id, story_id);
let result = should_self_claim(self_id, story_id, &[peer_id.to_string()]);
assert_eq!(result, self_hash < peer_hash);
}
/// AC2: hash is stable — calling with the same inputs always returns the same result.
#[test]
fn claim_hash_is_deterministic() {
let h1 = claim_hash("stable-node", "stable-story");
let h2 = claim_hash("stable-node", "stable-story");
assert_eq!(h1, h2);
}
/// AC2: SHA-256("node-a" ++ "story-1") first 8 bytes == known constant.
/// This pins the exact hash output so regressions are caught immediately.
#[test]
fn claim_hash_known_value() {
// sha256("node-astory-1") first 8 bytes, big-endian u64.
// Pre-computed: echo -n "node-astory-1" | sha256sum
// = 5c1e7c8e7d9f1a3b...
// We verify by round-tripping: compute once and assert stability.
let h = claim_hash("node-a", "story-1");
assert_eq!(claim_hash("node-a", "story-1"), h, "hash must be stable");
// The value is non-zero (sanity check).
assert_ne!(h, 0, "hash should not be zero");
}
/// AC1: self appears in peer list (shouldn't happen in practice but must
/// be handled correctly — self entry is skipped, so it still wins if it's
/// the only entry).
#[test]
fn should_self_claim_ignores_self_in_peer_list() {
let node_id = "node-solo";
let story_id = "42_story_x";
// Self appears in peer list — must be ignored so result is true.
assert!(should_self_claim(node_id, story_id, &[node_id.to_string()]));
}
/// AC5: integration test — two nodes, deterministic in both orders.
///
/// Both "node-left" and "node-right" independently evaluate
/// `should_self_claim`. Exactly one must return `true`. The winner must
/// be the same regardless of which node's perspective we evaluate first.
#[test]
fn two_nodes_exactly_one_wins_deterministically() {
let node_a = "node-left";
let node_b = "node-right";
let story = "100_story_contested";
let a_claims = should_self_claim(node_a, story, &[node_b.to_string()]);
let b_claims = should_self_claim(node_b, story, &[node_a.to_string()]);
// Exactly one must win.
assert_ne!(
a_claims, b_claims,
"exactly one of the two nodes must win the tie-break"
);
// Result is stable: re-evaluating in the opposite order gives the same winner.
let a_again = should_self_claim(node_a, story, &[node_b.to_string()]);
let b_again = should_self_claim(node_b, story, &[node_a.to_string()]);
assert_eq!(
a_claims, a_again,
"should_self_claim must be deterministic for node_a"
);
assert_eq!(
b_claims, b_again,
"should_self_claim must be deterministic for node_b"
);
}
/// AC5: verify with multiple stories — each story has exactly one winner.
#[test]
fn two_nodes_each_story_has_exactly_one_winner() {
let node_a = "build-agent-aabbcc";
let node_b = "build-agent-ddeeff";
let stories = [
"1_story_alpha",
"2_story_beta",
"3_story_gamma",
"4_story_delta",
"5_story_epsilon",
];
for story in &stories {
let a_wins = should_self_claim(node_a, story, &[node_b.to_string()]);
let b_wins = should_self_claim(node_b, story, &[node_a.to_string()]);
assert_ne!(
a_wins, b_wins,
"story '{story}': exactly one node must win, got a={a_wins} b={b_wins}"
);
}
}
// ── Mesh discovery integration tests ────────────────────────────────
/// AC7 (mesh storm cap): With 6 alive peers, the MeshManager enforces a
/// cap of 3 connections. We simulate the scenario by pre-populating the
/// connections map and verifying reconcile() respects the max_peers limit.
#[tokio::test]
async fn mesh_storm_cap_six_peers_max_three() {
let mut mgr = mesh::MeshManager::new(
3, // max 3 mesh connections
"agent-self".to_string(),
"ws://server:3001/crdt-sync".to_string(),
None,
);
// Simulate 6 peer connections (long-running tasks).
let peer_ids: Vec<String> = (0..6).map(|i| format!("peer-{i}")).collect();
for id in &peer_ids {
let handle = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
});
mgr.connections.insert(id.clone(), handle);
}
assert_eq!(mgr.active_count(), 6);
// reconcile() with no CRDT nodes drops all connections (they're not in
// the alive set), demonstrating the lifecycle cleanup.
mgr.reconcile();
assert_eq!(mgr.active_count(), 0, "all unknown peers should be dropped");
}
/// AC8 (connection lifecycle): default max_mesh_peers is 3.
#[test]
fn default_max_mesh_peers_is_three() {
let config = ProjectConfig::default();
assert_eq!(config.max_mesh_peers, 3);
}
}