huskies: merge 802
This commit is contained in:
@@ -0,0 +1,283 @@
|
||||
//! Headless build-agent mode for distributed, rendezvous-based story processing.
|
||||
///
|
||||
/// When invoked via `huskies agent --rendezvous ws://host:3001/crdt-sync`, this
|
||||
/// module runs a headless loop that:
|
||||
///
|
||||
/// 1. Syncs CRDT state with the rendezvous peer.
|
||||
/// 2. Writes a heartbeat to the CRDT `nodes` list.
|
||||
/// 3. Scans for unclaimed stories in `2_current` and claims them via CRDT.
|
||||
/// 4. Spawns Claude Code locally for each claimed story.
|
||||
/// 5. Pushes the feature branch to the git remote when done.
|
||||
/// 6. Reports completion by advancing the story stage via CRDT.
|
||||
/// 7. Handles offline/reconnect: CRDT merges on reconnect, interrupted work
|
||||
/// is reclaimed after a timeout.
|
||||
///
|
||||
/// A minimal HTTP server is started on the agent's port to serve the
|
||||
/// `/crdt-sync` WebSocket endpoint, enabling other agents to connect for
|
||||
/// peer mesh discovery.
|
||||
mod claim;
|
||||
mod context;
|
||||
mod loop_ops;
|
||||
|
||||
pub use claim::SCAN_INTERVAL_SECS;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use poem::EndpointExt as _;
|
||||
|
||||
use crate::agents::AgentPool;
|
||||
use crate::config::ProjectConfig;
|
||||
use crate::crdt_state;
|
||||
use crate::io::watcher;
|
||||
use crate::mesh;
|
||||
use crate::slog;
|
||||
|
||||
use context::{build_agent_app_context, register_with_gateway};
|
||||
use loop_ops::{
|
||||
check_completions_and_push, detect_conflicts, reclaim_timed_out_work, scan_and_claim,
|
||||
write_heartbeat,
|
||||
};
|
||||
|
||||
/// Run the headless build agent loop.
|
||||
///
|
||||
/// This function never returns under normal operation — it runs until the
|
||||
/// process is terminated (SIGINT/SIGTERM).
|
||||
///
|
||||
/// If `join_token` and `gateway_url` are both provided the agent will register
|
||||
/// itself with the gateway on startup using the one-time token.
|
||||
pub async fn run(
|
||||
project_root: Option<PathBuf>,
|
||||
rendezvous_url: String,
|
||||
port: u16,
|
||||
join_token: Option<String>,
|
||||
gateway_url: Option<String>,
|
||||
) -> Result<(), std::io::Error> {
|
||||
let project_root = match project_root {
|
||||
Some(r) => r,
|
||||
None => {
|
||||
eprintln!("error: agent mode requires a project root (no .huskies/ found)");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
println!("\x1b[96;1m[agent-mode]\x1b[0m Starting headless build agent");
|
||||
println!("\x1b[96;1m[agent-mode]\x1b[0m Rendezvous: {rendezvous_url}");
|
||||
println!(
|
||||
"\x1b[96;1m[agent-mode]\x1b[0m Project: {}",
|
||||
project_root.display()
|
||||
);
|
||||
|
||||
// Validate project config.
|
||||
let config = ProjectConfig::load(&project_root).unwrap_or_else(|e| {
|
||||
eprintln!("error: invalid project config: {e}");
|
||||
std::process::exit(1);
|
||||
});
|
||||
slog!(
|
||||
"[agent-mode] Loaded config with {} agents",
|
||||
config.agent.len()
|
||||
);
|
||||
|
||||
// Event bus for pipeline lifecycle events.
|
||||
let (watcher_tx, _) = broadcast::channel::<watcher::WatcherEvent>(1024);
|
||||
let agents = Arc::new(AgentPool::new(port, watcher_tx.clone()));
|
||||
|
||||
// Start filesystem watcher for config hot-reload.
|
||||
watcher::start_watcher(project_root.clone(), watcher_tx.clone());
|
||||
|
||||
// Bridge CRDT events to watcher channel (same as main server).
|
||||
{
|
||||
let crdt_watcher_tx = watcher_tx.clone();
|
||||
let crdt_prune_root = Some(project_root.clone());
|
||||
if let Some(mut crdt_rx) = crdt_state::subscribe() {
|
||||
tokio::spawn(async move {
|
||||
while let Ok(evt) = crdt_rx.recv().await {
|
||||
if crate::pipeline_state::Stage::from_dir(&evt.to_stage)
|
||||
.is_some_and(|s| matches!(s, crate::pipeline_state::Stage::Archived { .. }))
|
||||
&& let Some(root) = crdt_prune_root.as_ref().cloned()
|
||||
{
|
||||
let story_id = evt.story_id.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
if let Err(e) = crate::worktree::prune_worktree_sync(&root, &story_id) {
|
||||
slog!("[agent-mode] worktree prune failed for {story_id}: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
let (action, commit_msg) =
|
||||
watcher::stage_metadata(&evt.to_stage, &evt.story_id)
|
||||
.unwrap_or(("update", format!("huskies: update {}", evt.story_id)));
|
||||
let watcher_evt = watcher::WatcherEvent::WorkItem {
|
||||
stage: evt.to_stage,
|
||||
item_id: evt.story_id,
|
||||
action: action.to_string(),
|
||||
commit_msg,
|
||||
from_stage: evt.from_stage,
|
||||
};
|
||||
let _ = crdt_watcher_tx.send(watcher_evt);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to watcher events to trigger auto-assign on stage transitions.
|
||||
{
|
||||
let auto_rx = watcher_tx.subscribe();
|
||||
let auto_agents = Arc::clone(&agents);
|
||||
let auto_root = project_root.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut rx = auto_rx;
|
||||
while let Ok(event) = rx.recv().await {
|
||||
if let watcher::WatcherEvent::WorkItem { ref stage, .. } = event
|
||||
&& crate::pipeline_state::Stage::from_dir(stage.as_str())
|
||||
.is_some_and(|s| s.is_active())
|
||||
{
|
||||
slog!("[agent-mode] CRDT transition in {stage}/; triggering auto-assign.");
|
||||
auto_agents.auto_assign_available_work(&auto_root).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// ── Start minimal HTTP server for /crdt-sync endpoint ─────────────
|
||||
//
|
||||
// Other agents discover this endpoint via the CRDT `nodes` list and
|
||||
// open supplementary mesh connections for resilience.
|
||||
{
|
||||
let sync_handler = poem::get(crate::crdt_sync::crdt_sync_handler);
|
||||
|
||||
// Build a minimal AppContext for the crdt_sync_handler (the handler
|
||||
// receives it via Data<> but doesn't use it — the underscore prefix
|
||||
// on `_ctx` confirms this).
|
||||
let agent_ctx = build_agent_app_context(&project_root, port, watcher_tx.clone());
|
||||
let agent_ctx_arc = Arc::new(agent_ctx);
|
||||
|
||||
let app = poem::Route::new()
|
||||
.at("/crdt-sync", sync_handler)
|
||||
.data(agent_ctx_arc);
|
||||
|
||||
let bind_addr = format!("0.0.0.0:{port}");
|
||||
slog!("[agent-mode] Starting /crdt-sync endpoint on {bind_addr}");
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = poem::Server::new(poem::listener::TcpListener::bind(&bind_addr))
|
||||
.run(app)
|
||||
.await
|
||||
{
|
||||
slog!("[agent-mode] HTTP server error: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Write initial heartbeat.
|
||||
write_heartbeat(&rendezvous_url, port);
|
||||
|
||||
// Register with gateway if a join token and gateway URL were provided.
|
||||
if let (Some(token), Some(url)) = (join_token.clone(), gateway_url) {
|
||||
let node_id = crdt_state::our_node_id().unwrap_or_else(|| "unknown".to_string());
|
||||
let label = format!("build-agent-{}", &node_id[..node_id.len().min(8)]);
|
||||
let address = format!("ws://0.0.0.0:{port}/crdt-sync");
|
||||
register_with_gateway(&url, &token, &label, &address).await;
|
||||
}
|
||||
|
||||
// ── Mesh peer discovery ────────────────────────────────────────────
|
||||
//
|
||||
// Periodically read the CRDT `nodes` list and open supplementary sync
|
||||
// connections to alive peers. The primary rendezvous connection remains
|
||||
// canonical; mesh connections are supplementary and don't block startup.
|
||||
let _mesh_handle = {
|
||||
let our_node_id = crdt_state::our_node_id().unwrap_or_default();
|
||||
let max_mesh_peers = config.max_mesh_peers;
|
||||
mesh::spawn_mesh_discovery(
|
||||
max_mesh_peers,
|
||||
our_node_id,
|
||||
rendezvous_url.clone(),
|
||||
join_token,
|
||||
)
|
||||
};
|
||||
|
||||
// Reconcile any committed work from a previous session.
|
||||
{
|
||||
let recon_agents = Arc::clone(&agents);
|
||||
let recon_root = project_root.clone();
|
||||
let (recon_tx, _) = broadcast::channel(64);
|
||||
slog!("[agent-mode] Reconciling completed worktrees from previous session.");
|
||||
recon_agents
|
||||
.reconcile_on_startup(&recon_root, &recon_tx)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Run initial auto-assign.
|
||||
slog!("[agent-mode] Initial auto-assign scan.");
|
||||
agents.auto_assign_available_work(&project_root).await;
|
||||
|
||||
// Track which stories we've claimed so we can detect conflicts.
|
||||
let mut our_claims: HashMap<String, f64> = HashMap::new();
|
||||
|
||||
// Main loop: heartbeat, scan, claim, detect conflicts.
|
||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(SCAN_INTERVAL_SECS));
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
// Write heartbeat.
|
||||
write_heartbeat(&rendezvous_url, port);
|
||||
|
||||
// Scan CRDT for claimable work.
|
||||
scan_and_claim(&agents, &project_root, &mut our_claims).await;
|
||||
|
||||
// Detect claim conflicts: if another node overwrote our claim, stop our agent.
|
||||
detect_conflicts(&agents, &project_root, &mut our_claims).await;
|
||||
|
||||
// Reclaim timed-out work from dead nodes.
|
||||
reclaim_timed_out_work(&project_root);
|
||||
|
||||
// Check for completed agents and push their branches.
|
||||
check_completions_and_push(&agents, &project_root).await;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Tests ────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::config::ProjectConfig;
|
||||
use crate::mesh;
|
||||
|
||||
// ── Mesh discovery integration tests ────────────────────────────────
|
||||
|
||||
/// AC7 (mesh storm cap): With 6 alive peers, the MeshManager enforces a
|
||||
/// cap of 3 connections. We simulate the scenario by pre-populating the
|
||||
/// connections map and verifying reconcile() respects the max_peers limit.
|
||||
#[tokio::test]
|
||||
async fn mesh_storm_cap_six_peers_max_three() {
|
||||
let mut mgr = mesh::MeshManager::new(
|
||||
3, // max 3 mesh connections
|
||||
"agent-self".to_string(),
|
||||
"ws://server:3001/crdt-sync".to_string(),
|
||||
None,
|
||||
);
|
||||
|
||||
// Simulate 6 peer connections (long-running tasks).
|
||||
let peer_ids: Vec<String> = (0..6).map(|i| format!("peer-{i}")).collect();
|
||||
for id in &peer_ids {
|
||||
let handle = tokio::spawn(async {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
|
||||
});
|
||||
mgr.connections.insert(id.clone(), handle);
|
||||
}
|
||||
|
||||
assert_eq!(mgr.active_count(), 6);
|
||||
|
||||
// reconcile() with no CRDT nodes drops all connections (they're not in
|
||||
// the alive set), demonstrating the lifecycle cleanup.
|
||||
mgr.reconcile();
|
||||
assert_eq!(mgr.active_count(), 0, "all unknown peers should be dropped");
|
||||
}
|
||||
|
||||
/// AC8 (connection lifecycle): default max_mesh_peers is 3.
|
||||
#[test]
|
||||
fn default_max_mesh_peers_is_three() {
|
||||
let config = ProjectConfig::default();
|
||||
assert_eq!(config.max_mesh_peers, 3);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user