diff --git a/server/src/agents.rs b/server/src/agents.rs index ea062dc..800bb32 100644 --- a/server/src/agents.rs +++ b/server/src/agents.rs @@ -9,8 +9,12 @@ use std::io::{BufRead, BufReader}; use std::path::{Path, PathBuf}; use std::process::Command; use std::sync::{Arc, Mutex}; +use std::time::Instant; use tokio::sync::broadcast; +/// Default TTL for completed/failed agent entries: 1 hour. +pub const DEFAULT_AGENT_TTL_SECS: u64 = 3600; + /// Events emitted during server startup reconciliation to broadcast real-time /// progress to connected WebSocket clients. #[derive(Debug, Clone, Serialize)] @@ -193,6 +197,9 @@ struct StoryAgent { project_root: Option, /// UUID identifying the log file for this session. log_session_id: Option, + /// Timestamp when the agent entered a terminal state (Completed/Failed). + /// Used by the TTL reaper to remove stale entries. + completed_at: Option, } /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. @@ -337,6 +344,7 @@ impl AgentPool { completion: None, project_root: Some(project_root.to_path_buf()), log_session_id: Some(log_session_id.clone()), + completed_at: None, }, ); } @@ -419,6 +427,7 @@ impl AgentPool { && let Some(agent) = agents.get_mut(&key_clone) { agent.status = AgentStatus::Failed; + agent.completed_at = Some(Instant::now()); } let _ = tx_clone.send(AgentEvent::Error { story_id: sid.clone(), @@ -845,6 +854,7 @@ impl AgentPool { if let Err(e) = move_story_to_archived(&project_root, story_id) { slog!("[pipeline] Failed to archive '{story_id}': {e}"); } + self.remove_agents_for_story(story_id); // Mergemaster slot is now free — pick up any other items in 4_merge/. self.auto_assign_available_work(&project_root).await; // TODO: Re-enable worktree cleanup once we have persistent agent logs. @@ -963,6 +973,7 @@ impl AgentPool { } else { AgentStatus::Failed }; + agent.completed_at = Some(Instant::now()); (agent.tx.clone(), agent.session_id.clone()) }; @@ -1051,8 +1062,11 @@ impl AgentPool { }); } - // Gates passed — archive the story. + // Gates passed — archive the story and clean up agent entries. let story_archived = move_story_to_archived(project_root, story_id).is_ok(); + if story_archived { + self.remove_agents_for_story(story_id); + } // Clean up the worktree if it exists. let worktree_cleaned_up = if wt_path.exists() { @@ -1117,6 +1131,11 @@ impl AgentPool { agent_name: &str, status: AgentStatus, ) -> broadcast::Sender { + let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) { + Some(Instant::now()) + } else { + None + }; let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); @@ -1133,6 +1152,7 @@ impl AgentPool { completion: None, project_root: None, log_session_id: None, + completed_at, }, ); tx @@ -1148,6 +1168,11 @@ impl AgentPool { status: AgentStatus, worktree_path: PathBuf, ) -> broadcast::Sender { + let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) { + Some(Instant::now()) + } else { + None + }; let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); @@ -1168,6 +1193,7 @@ impl AgentPool { completion: None, project_root: None, log_session_id: None, + completed_at, }, ); tx @@ -1494,6 +1520,11 @@ impl AgentPool { project_root: PathBuf, completion: CompletionReport, ) -> broadcast::Sender { + let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) { + Some(Instant::now()) + } else { + None + }; let (tx, _) = broadcast::channel::(64); let key = composite_key(story_id, agent_name); let mut agents = self.agents.lock().unwrap(); @@ -1510,6 +1541,7 @@ impl AgentPool { completion: Some(completion), project_root: Some(project_root), log_session_id: None, + completed_at, }, ); tx @@ -1569,6 +1601,63 @@ impl AgentPool { check_orphaned_agents(&agents); } }); + /// Remove all agent entries for a given story_id from the pool. + /// + /// Called when a story is archived so that stale entries don't accumulate. + /// Returns the number of entries removed. + pub fn remove_agents_for_story(&self, story_id: &str) -> usize { + let mut agents = match self.agents.lock() { + Ok(a) => a, + Err(e) => { + slog!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); + return 0; + } + }; + let prefix = format!("{story_id}:"); + let keys_to_remove: Vec = agents + .keys() + .filter(|k| k.starts_with(&prefix)) + .cloned() + .collect(); + let count = keys_to_remove.len(); + for key in &keys_to_remove { + agents.remove(key); + } + if count > 0 { + slog!("[agents] Removed {count} agent entries for archived story '{story_id}'"); + } + count + } + + /// Reap agent entries in terminal states (Completed/Failed) whose `completed_at` + /// timestamp is older than `ttl`. Returns the number of entries reaped. + pub fn reap_expired_agents(&self, ttl: std::time::Duration) -> usize { + let mut agents = match self.agents.lock() { + Ok(a) => a, + Err(e) => { + slog!("[reaper] Failed to lock pool for TTL reaping: {e}"); + return 0; + } + }; + let now = Instant::now(); + let keys_to_remove: Vec = agents + .iter() + .filter(|(_, agent)| { + matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) + && agent + .completed_at + .is_some_and(|t| now.duration_since(t) >= ttl) + }) + .map(|(k, _)| k.clone()) + .collect(); + let count = keys_to_remove.len(); + for key in &keys_to_remove { + agents.remove(key); + } + if count > 0 { + slog!("[reaper] Reaped {count} expired agent entries (TTL: {}s)", ttl.as_secs()); + } + count } } @@ -1793,6 +1882,7 @@ async fn run_server_owned_completion( } else { AgentStatus::Failed }; + agent.completed_at = Some(Instant::now()); agent.tx.clone() }; @@ -4651,13 +4741,10 @@ name = "qa" fn resolve_simple_conflicts_additive() { let input = "\ before -<<<<<<< HEAD ours line 1 ours line 2 -======= theirs line 1 theirs line 2 ->>>>>>> feature/branch after "; let result = resolve_simple_conflicts(input).unwrap(); @@ -4688,17 +4775,11 @@ after fn resolve_simple_conflicts_multiple_blocks() { let input = "\ header -<<<<<<< HEAD ours block 1 -======= theirs block 1 ->>>>>>> feature middle -<<<<<<< HEAD ours block 2 -======= theirs block 2 ->>>>>>> feature footer "; let result = resolve_simple_conflicts(input).unwrap(); @@ -4715,7 +4796,6 @@ footer #[test] fn resolve_simple_conflicts_malformed_no_separator() { let input = "\ -<<<<<<< HEAD ours >>>>>>> feature "; @@ -4728,7 +4808,6 @@ ours let input = "\ <<<<<<< HEAD ours -======= theirs "; let result = resolve_simple_conflicts(input); @@ -5004,6 +5083,7 @@ theirs assert!(report.had_conflicts, "should report conflicts"); } +<<<<<<< HEAD // ── process health monitoring tests ────────────────────────────────────── /// Demonstrates that the PTY read-loop inactivity timeout fires when no output @@ -5081,5 +5161,155 @@ theirs matches!(event, AgentEvent::Error { .. }), "expected AgentEvent::Error, got: {event:?}" ); +======= + // ── remove_agents_for_story tests ──────────────────────────────────────── + + #[test] + fn remove_agents_for_story_removes_all_entries() { + let pool = AgentPool::new(3001); + pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed); + pool.inject_test_agent("story_a", "qa", AgentStatus::Failed); + pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); + + let removed = pool.remove_agents_for_story("story_a"); + assert_eq!(removed, 2, "should remove both agents for story_a"); + + let agents = pool.list_agents().unwrap(); + assert_eq!(agents.len(), 1, "only story_b agent should remain"); + assert_eq!(agents[0].story_id, "story_b"); + } + + #[test] + fn remove_agents_for_story_returns_zero_when_no_match() { + let pool = AgentPool::new(3001); + pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); + + let removed = pool.remove_agents_for_story("nonexistent"); + assert_eq!(removed, 0); + + let agents = pool.list_agents().unwrap(); + assert_eq!(agents.len(), 1, "existing agents should not be affected"); + } + + // ── reap_expired_agents tests ──────────────────────────────────────────── + + #[test] + fn reap_expired_agents_removes_old_completed_entries() { + let pool = AgentPool::new(3001); + + // Inject a completed agent with an artificial old completed_at. + { + let (tx, _) = broadcast::channel::(64); + let key = composite_key("old_story", "coder-1"); + let mut agents = pool.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: "coder-1".to_string(), + status: AgentStatus::Completed, + worktree_info: None, + session_id: None, + tx, + task_handle: None, + event_log: Arc::new(Mutex::new(Vec::new())), + completion: None, + project_root: None, + log_session_id: None, + // Set completed_at 2 hours ago. + completed_at: Some(Instant::now() - std::time::Duration::from_secs(7200)), + }, + ); + } + // Inject a recently completed agent. + pool.inject_test_agent("new_story", "coder-1", AgentStatus::Completed); + // Inject a running agent (should not be reaped). + pool.inject_test_agent("active_story", "coder-2", AgentStatus::Running); + + // Reap with a 1-hour TTL — only the old entry should be removed. + let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(3600)); + assert_eq!(reaped, 1, "should reap only the old completed entry"); + + let agents = pool.list_agents().unwrap(); + assert_eq!(agents.len(), 2, "new_story and active_story should remain"); + assert!( + agents.iter().all(|a| a.story_id != "old_story"), + "old_story should have been reaped" + ); + } + + #[test] + fn reap_expired_agents_removes_old_failed_entries() { + let pool = AgentPool::new(3001); + + // Inject a failed agent with an old completed_at. + { + let (tx, _) = broadcast::channel::(64); + let key = composite_key("failed_old", "coder-1"); + let mut agents = pool.agents.lock().unwrap(); + agents.insert( + key, + StoryAgent { + agent_name: "coder-1".to_string(), + status: AgentStatus::Failed, + worktree_info: None, + session_id: None, + tx, + task_handle: None, + event_log: Arc::new(Mutex::new(Vec::new())), + completion: None, + project_root: None, + log_session_id: None, + completed_at: Some(Instant::now() - std::time::Duration::from_secs(7200)), + }, + ); + } + + let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(3600)); + assert_eq!(reaped, 1); + assert!(pool.list_agents().unwrap().is_empty()); + } + + #[test] + fn reap_expired_agents_skips_running_entries() { + let pool = AgentPool::new(3001); + pool.inject_test_agent("running_story", "coder-1", AgentStatus::Running); + + let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(0)); + assert_eq!(reaped, 0, "running agents should never be reaped"); + } + + // ── archive + cleanup integration test ─────────────────────────────────── + + #[tokio::test] + async fn archiving_story_removes_agent_entries_from_pool() { + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + // Set up story in 2_current/ + let current = root.join(".story_kit/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("60_story_cleanup.md"), "test").unwrap(); + + let pool = AgentPool::new(3001); + pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); + pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed); + pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running); + + // Verify all 3 agents exist. + assert_eq!(pool.list_agents().unwrap().len(), 3); + + // Archive the story. + move_story_to_archived(root, "60_story_cleanup").unwrap(); + pool.remove_agents_for_story("60_story_cleanup"); + + // Agent entries for the archived story should be gone. + let remaining = pool.list_agents().unwrap(); + assert_eq!(remaining.len(), 1, "only the other story's agent should remain"); + assert_eq!(remaining[0].story_id, "61_story_other"); + + // Story file should be in 5_archived/ + assert!(root.join(".story_kit/work/5_archived/60_story_cleanup.md").exists()); } } diff --git a/server/src/http/agents.rs b/server/src/http/agents.rs index c1117d5..12bdced 100644 --- a/server/src/http/agents.rs +++ b/server/src/http/agents.rs @@ -66,7 +66,7 @@ struct WorktreeListEntry { /// Used to exclude agents for already-archived stories from the `list_agents` /// response so the agents panel is not cluttered with old completed items on /// frontend startup. -fn story_is_archived(project_root: &path::Path, story_id: &str) -> bool { +pub fn story_is_archived(project_root: &path::Path, story_id: &str) -> bool { project_root .join(".story_kit") .join("work") diff --git a/server/src/http/mcp.rs b/server/src/http/mcp.rs index ad1196e..fafe36e 100644 --- a/server/src/http/mcp.rs +++ b/server/src/http/mcp.rs @@ -1032,9 +1032,16 @@ async fn tool_stop_agent(args: &Value, ctx: &AppContext) -> Result Result { + let project_root = ctx.agents.get_project_root(&ctx.state).ok(); let agents = ctx.agents.list_agents()?; serde_json::to_string_pretty(&json!(agents .iter() + .filter(|a| { + project_root + .as_deref() + .map(|root| !crate::http::agents::story_is_archived(root, &a.story_id)) + .unwrap_or(true) + }) .map(|a| json!({ "story_id": a.story_id, "agent_name": a.agent_name, @@ -1283,6 +1290,7 @@ fn tool_accept_story(args: &Value, ctx: &AppContext) -> Result { let project_root = ctx.agents.get_project_root(&ctx.state)?; move_story_to_archived(&project_root, story_id)?; + ctx.agents.remove_agents_for_story(story_id); Ok(format!( "Story '{story_id}' accepted, moved to archived/, and committed to master." @@ -1381,6 +1389,7 @@ fn tool_close_bug(args: &Value, ctx: &AppContext) -> Result { let root = ctx.agents.get_project_root(&ctx.state)?; close_bug_to_archive(&root, bug_id)?; + ctx.agents.remove_agents_for_story(bug_id); Ok(format!( "Bug '{bug_id}' closed, moved to bugs/archive/, and committed to master." diff --git a/server/src/main.rs b/server/src/main.rs index d3505c1..48be123 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -95,6 +95,21 @@ async fn main() -> Result<(), std::io::Error> { let app = build_routes(ctx); + // Background reaper: periodically remove completed/failed agent entries + // that have exceeded the TTL. + { + let reaper_agents = Arc::clone(&startup_agents); + let ttl = std::time::Duration::from_secs(agents::DEFAULT_AGENT_TTL_SECS); + tokio::spawn(async move { + // Check every 5 minutes. + let interval = std::time::Duration::from_secs(300); + loop { + tokio::time::sleep(interval).await; + reaper_agents.reap_expired_agents(ttl); + } + }); + } + // On startup: // 1. Reconcile any stories whose agent work was committed while the server was // offline (worktree has commits ahead of master but pipeline didn't advance).