story-kit: merge 133_story_clean_up_agent_state_on_story_archive_and_add_ttl_for_completed_entries

This commit is contained in:
Dave
2026-02-24 13:20:59 +00:00
parent e49be6b905
commit 6170a7d984
4 changed files with 267 additions and 13 deletions

View File

@@ -9,8 +9,12 @@ use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process::Command; use std::process::Command;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::sync::broadcast; use tokio::sync::broadcast;
/// Default TTL for completed/failed agent entries: 1 hour.
pub const DEFAULT_AGENT_TTL_SECS: u64 = 3600;
/// Events emitted during server startup reconciliation to broadcast real-time /// Events emitted during server startup reconciliation to broadcast real-time
/// progress to connected WebSocket clients. /// progress to connected WebSocket clients.
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
@@ -193,6 +197,9 @@ struct StoryAgent {
project_root: Option<PathBuf>, project_root: Option<PathBuf>,
/// UUID identifying the log file for this session. /// UUID identifying the log file for this session.
log_session_id: Option<String>, log_session_id: Option<String>,
/// Timestamp when the agent entered a terminal state (Completed/Failed).
/// Used by the TTL reaper to remove stale entries.
completed_at: Option<Instant>,
} }
/// Build an `AgentInfo` snapshot from a `StoryAgent` map entry. /// Build an `AgentInfo` snapshot from a `StoryAgent` map entry.
@@ -337,6 +344,7 @@ impl AgentPool {
completion: None, completion: None,
project_root: Some(project_root.to_path_buf()), project_root: Some(project_root.to_path_buf()),
log_session_id: Some(log_session_id.clone()), log_session_id: Some(log_session_id.clone()),
completed_at: None,
}, },
); );
} }
@@ -419,6 +427,7 @@ impl AgentPool {
&& let Some(agent) = agents.get_mut(&key_clone) && let Some(agent) = agents.get_mut(&key_clone)
{ {
agent.status = AgentStatus::Failed; agent.status = AgentStatus::Failed;
agent.completed_at = Some(Instant::now());
} }
let _ = tx_clone.send(AgentEvent::Error { let _ = tx_clone.send(AgentEvent::Error {
story_id: sid.clone(), story_id: sid.clone(),
@@ -845,6 +854,7 @@ impl AgentPool {
if let Err(e) = move_story_to_archived(&project_root, story_id) { if let Err(e) = move_story_to_archived(&project_root, story_id) {
slog!("[pipeline] Failed to archive '{story_id}': {e}"); slog!("[pipeline] Failed to archive '{story_id}': {e}");
} }
self.remove_agents_for_story(story_id);
// Mergemaster slot is now free — pick up any other items in 4_merge/. // Mergemaster slot is now free — pick up any other items in 4_merge/.
self.auto_assign_available_work(&project_root).await; self.auto_assign_available_work(&project_root).await;
// TODO: Re-enable worktree cleanup once we have persistent agent logs. // TODO: Re-enable worktree cleanup once we have persistent agent logs.
@@ -963,6 +973,7 @@ impl AgentPool {
} else { } else {
AgentStatus::Failed AgentStatus::Failed
}; };
agent.completed_at = Some(Instant::now());
(agent.tx.clone(), agent.session_id.clone()) (agent.tx.clone(), agent.session_id.clone())
}; };
@@ -1051,8 +1062,11 @@ impl AgentPool {
}); });
} }
// Gates passed — archive the story. // Gates passed — archive the story and clean up agent entries.
let story_archived = move_story_to_archived(project_root, story_id).is_ok(); let story_archived = move_story_to_archived(project_root, story_id).is_ok();
if story_archived {
self.remove_agents_for_story(story_id);
}
// Clean up the worktree if it exists. // Clean up the worktree if it exists.
let worktree_cleaned_up = if wt_path.exists() { let worktree_cleaned_up = if wt_path.exists() {
@@ -1117,6 +1131,11 @@ impl AgentPool {
agent_name: &str, agent_name: &str,
status: AgentStatus, status: AgentStatus,
) -> broadcast::Sender<AgentEvent> { ) -> broadcast::Sender<AgentEvent> {
let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) {
Some(Instant::now())
} else {
None
};
let (tx, _) = broadcast::channel::<AgentEvent>(64); let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name); let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap(); let mut agents = self.agents.lock().unwrap();
@@ -1133,6 +1152,7 @@ impl AgentPool {
completion: None, completion: None,
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
completed_at,
}, },
); );
tx tx
@@ -1148,6 +1168,11 @@ impl AgentPool {
status: AgentStatus, status: AgentStatus,
worktree_path: PathBuf, worktree_path: PathBuf,
) -> broadcast::Sender<AgentEvent> { ) -> broadcast::Sender<AgentEvent> {
let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) {
Some(Instant::now())
} else {
None
};
let (tx, _) = broadcast::channel::<AgentEvent>(64); let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name); let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap(); let mut agents = self.agents.lock().unwrap();
@@ -1168,6 +1193,7 @@ impl AgentPool {
completion: None, completion: None,
project_root: None, project_root: None,
log_session_id: None, log_session_id: None,
completed_at,
}, },
); );
tx tx
@@ -1494,6 +1520,11 @@ impl AgentPool {
project_root: PathBuf, project_root: PathBuf,
completion: CompletionReport, completion: CompletionReport,
) -> broadcast::Sender<AgentEvent> { ) -> broadcast::Sender<AgentEvent> {
let completed_at = if matches!(status, AgentStatus::Completed | AgentStatus::Failed) {
Some(Instant::now())
} else {
None
};
let (tx, _) = broadcast::channel::<AgentEvent>(64); let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key(story_id, agent_name); let key = composite_key(story_id, agent_name);
let mut agents = self.agents.lock().unwrap(); let mut agents = self.agents.lock().unwrap();
@@ -1510,6 +1541,7 @@ impl AgentPool {
completion: Some(completion), completion: Some(completion),
project_root: Some(project_root), project_root: Some(project_root),
log_session_id: None, log_session_id: None,
completed_at,
}, },
); );
tx tx
@@ -1569,6 +1601,63 @@ impl AgentPool {
check_orphaned_agents(&agents); check_orphaned_agents(&agents);
} }
}); });
/// Remove all agent entries for a given story_id from the pool.
///
/// Called when a story is archived so that stale entries don't accumulate.
/// Returns the number of entries removed.
pub fn remove_agents_for_story(&self, story_id: &str) -> usize {
let mut agents = match self.agents.lock() {
Ok(a) => a,
Err(e) => {
slog!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}");
return 0;
}
};
let prefix = format!("{story_id}:");
let keys_to_remove: Vec<String> = agents
.keys()
.filter(|k| k.starts_with(&prefix))
.cloned()
.collect();
let count = keys_to_remove.len();
for key in &keys_to_remove {
agents.remove(key);
}
if count > 0 {
slog!("[agents] Removed {count} agent entries for archived story '{story_id}'");
}
count
}
/// Reap agent entries in terminal states (Completed/Failed) whose `completed_at`
/// timestamp is older than `ttl`. Returns the number of entries reaped.
pub fn reap_expired_agents(&self, ttl: std::time::Duration) -> usize {
let mut agents = match self.agents.lock() {
Ok(a) => a,
Err(e) => {
slog!("[reaper] Failed to lock pool for TTL reaping: {e}");
return 0;
}
};
let now = Instant::now();
let keys_to_remove: Vec<String> = agents
.iter()
.filter(|(_, agent)| {
matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
&& agent
.completed_at
.is_some_and(|t| now.duration_since(t) >= ttl)
})
.map(|(k, _)| k.clone())
.collect();
let count = keys_to_remove.len();
for key in &keys_to_remove {
agents.remove(key);
}
if count > 0 {
slog!("[reaper] Reaped {count} expired agent entries (TTL: {}s)", ttl.as_secs());
}
count
} }
} }
@@ -1793,6 +1882,7 @@ async fn run_server_owned_completion(
} else { } else {
AgentStatus::Failed AgentStatus::Failed
}; };
agent.completed_at = Some(Instant::now());
agent.tx.clone() agent.tx.clone()
}; };
@@ -4651,13 +4741,10 @@ name = "qa"
fn resolve_simple_conflicts_additive() { fn resolve_simple_conflicts_additive() {
let input = "\ let input = "\
before before
<<<<<<< HEAD
ours line 1 ours line 1
ours line 2 ours line 2
=======
theirs line 1 theirs line 1
theirs line 2 theirs line 2
>>>>>>> feature/branch
after after
"; ";
let result = resolve_simple_conflicts(input).unwrap(); let result = resolve_simple_conflicts(input).unwrap();
@@ -4688,17 +4775,11 @@ after
fn resolve_simple_conflicts_multiple_blocks() { fn resolve_simple_conflicts_multiple_blocks() {
let input = "\ let input = "\
header header
<<<<<<< HEAD
ours block 1 ours block 1
=======
theirs block 1 theirs block 1
>>>>>>> feature
middle middle
<<<<<<< HEAD
ours block 2 ours block 2
=======
theirs block 2 theirs block 2
>>>>>>> feature
footer footer
"; ";
let result = resolve_simple_conflicts(input).unwrap(); let result = resolve_simple_conflicts(input).unwrap();
@@ -4715,7 +4796,6 @@ footer
#[test] #[test]
fn resolve_simple_conflicts_malformed_no_separator() { fn resolve_simple_conflicts_malformed_no_separator() {
let input = "\ let input = "\
<<<<<<< HEAD
ours ours
>>>>>>> feature >>>>>>> feature
"; ";
@@ -4728,7 +4808,6 @@ ours
let input = "\ let input = "\
<<<<<<< HEAD <<<<<<< HEAD
ours ours
=======
theirs theirs
"; ";
let result = resolve_simple_conflicts(input); let result = resolve_simple_conflicts(input);
@@ -5004,6 +5083,7 @@ theirs
assert!(report.had_conflicts, "should report conflicts"); assert!(report.had_conflicts, "should report conflicts");
} }
<<<<<<< HEAD
// ── process health monitoring tests ────────────────────────────────────── // ── process health monitoring tests ──────────────────────────────────────
/// Demonstrates that the PTY read-loop inactivity timeout fires when no output /// Demonstrates that the PTY read-loop inactivity timeout fires when no output
@@ -5081,5 +5161,155 @@ theirs
matches!(event, AgentEvent::Error { .. }), matches!(event, AgentEvent::Error { .. }),
"expected AgentEvent::Error, got: {event:?}" "expected AgentEvent::Error, got: {event:?}"
); );
=======
// ── remove_agents_for_story tests ────────────────────────────────────────
#[test]
fn remove_agents_for_story_removes_all_entries() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed);
pool.inject_test_agent("story_a", "qa", AgentStatus::Failed);
pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running);
let removed = pool.remove_agents_for_story("story_a");
assert_eq!(removed, 2, "should remove both agents for story_a");
let agents = pool.list_agents().unwrap();
assert_eq!(agents.len(), 1, "only story_b agent should remain");
assert_eq!(agents[0].story_id, "story_b");
}
#[test]
fn remove_agents_for_story_returns_zero_when_no_match() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running);
let removed = pool.remove_agents_for_story("nonexistent");
assert_eq!(removed, 0);
let agents = pool.list_agents().unwrap();
assert_eq!(agents.len(), 1, "existing agents should not be affected");
}
// ── reap_expired_agents tests ────────────────────────────────────────────
#[test]
fn reap_expired_agents_removes_old_completed_entries() {
let pool = AgentPool::new(3001);
// Inject a completed agent with an artificial old completed_at.
{
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key("old_story", "coder-1");
let mut agents = pool.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: "coder-1".to_string(),
status: AgentStatus::Completed,
worktree_info: None,
session_id: None,
tx,
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
log_session_id: None,
// Set completed_at 2 hours ago.
completed_at: Some(Instant::now() - std::time::Duration::from_secs(7200)),
},
);
}
// Inject a recently completed agent.
pool.inject_test_agent("new_story", "coder-1", AgentStatus::Completed);
// Inject a running agent (should not be reaped).
pool.inject_test_agent("active_story", "coder-2", AgentStatus::Running);
// Reap with a 1-hour TTL — only the old entry should be removed.
let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(3600));
assert_eq!(reaped, 1, "should reap only the old completed entry");
let agents = pool.list_agents().unwrap();
assert_eq!(agents.len(), 2, "new_story and active_story should remain");
assert!(
agents.iter().all(|a| a.story_id != "old_story"),
"old_story should have been reaped"
);
}
#[test]
fn reap_expired_agents_removes_old_failed_entries() {
let pool = AgentPool::new(3001);
// Inject a failed agent with an old completed_at.
{
let (tx, _) = broadcast::channel::<AgentEvent>(64);
let key = composite_key("failed_old", "coder-1");
let mut agents = pool.agents.lock().unwrap();
agents.insert(
key,
StoryAgent {
agent_name: "coder-1".to_string(),
status: AgentStatus::Failed,
worktree_info: None,
session_id: None,
tx,
task_handle: None,
event_log: Arc::new(Mutex::new(Vec::new())),
completion: None,
project_root: None,
log_session_id: None,
completed_at: Some(Instant::now() - std::time::Duration::from_secs(7200)),
},
);
}
let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(3600));
assert_eq!(reaped, 1);
assert!(pool.list_agents().unwrap().is_empty());
}
#[test]
fn reap_expired_agents_skips_running_entries() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("running_story", "coder-1", AgentStatus::Running);
let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(0));
assert_eq!(reaped, 0, "running agents should never be reaped");
}
// ── archive + cleanup integration test ───────────────────────────────────
#[tokio::test]
async fn archiving_story_removes_agent_entries_from_pool() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
// Set up story in 2_current/
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("60_story_cleanup.md"), "test").unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed);
pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed);
pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running);
// Verify all 3 agents exist.
assert_eq!(pool.list_agents().unwrap().len(), 3);
// Archive the story.
move_story_to_archived(root, "60_story_cleanup").unwrap();
pool.remove_agents_for_story("60_story_cleanup");
// Agent entries for the archived story should be gone.
let remaining = pool.list_agents().unwrap();
assert_eq!(remaining.len(), 1, "only the other story's agent should remain");
assert_eq!(remaining[0].story_id, "61_story_other");
// Story file should be in 5_archived/
assert!(root.join(".story_kit/work/5_archived/60_story_cleanup.md").exists());
} }
} }

View File

@@ -66,7 +66,7 @@ struct WorktreeListEntry {
/// Used to exclude agents for already-archived stories from the `list_agents` /// Used to exclude agents for already-archived stories from the `list_agents`
/// response so the agents panel is not cluttered with old completed items on /// response so the agents panel is not cluttered with old completed items on
/// frontend startup. /// frontend startup.
fn story_is_archived(project_root: &path::Path, story_id: &str) -> bool { pub fn story_is_archived(project_root: &path::Path, story_id: &str) -> bool {
project_root project_root
.join(".story_kit") .join(".story_kit")
.join("work") .join("work")

View File

@@ -1032,9 +1032,16 @@ async fn tool_stop_agent(args: &Value, ctx: &AppContext) -> Result<String, Strin
} }
fn tool_list_agents(ctx: &AppContext) -> Result<String, String> { fn tool_list_agents(ctx: &AppContext) -> Result<String, String> {
let project_root = ctx.agents.get_project_root(&ctx.state).ok();
let agents = ctx.agents.list_agents()?; let agents = ctx.agents.list_agents()?;
serde_json::to_string_pretty(&json!(agents serde_json::to_string_pretty(&json!(agents
.iter() .iter()
.filter(|a| {
project_root
.as_deref()
.map(|root| !crate::http::agents::story_is_archived(root, &a.story_id))
.unwrap_or(true)
})
.map(|a| json!({ .map(|a| json!({
"story_id": a.story_id, "story_id": a.story_id,
"agent_name": a.agent_name, "agent_name": a.agent_name,
@@ -1283,6 +1290,7 @@ fn tool_accept_story(args: &Value, ctx: &AppContext) -> Result<String, String> {
let project_root = ctx.agents.get_project_root(&ctx.state)?; let project_root = ctx.agents.get_project_root(&ctx.state)?;
move_story_to_archived(&project_root, story_id)?; move_story_to_archived(&project_root, story_id)?;
ctx.agents.remove_agents_for_story(story_id);
Ok(format!( Ok(format!(
"Story '{story_id}' accepted, moved to archived/, and committed to master." "Story '{story_id}' accepted, moved to archived/, and committed to master."
@@ -1381,6 +1389,7 @@ fn tool_close_bug(args: &Value, ctx: &AppContext) -> Result<String, String> {
let root = ctx.agents.get_project_root(&ctx.state)?; let root = ctx.agents.get_project_root(&ctx.state)?;
close_bug_to_archive(&root, bug_id)?; close_bug_to_archive(&root, bug_id)?;
ctx.agents.remove_agents_for_story(bug_id);
Ok(format!( Ok(format!(
"Bug '{bug_id}' closed, moved to bugs/archive/, and committed to master." "Bug '{bug_id}' closed, moved to bugs/archive/, and committed to master."

View File

@@ -95,6 +95,21 @@ async fn main() -> Result<(), std::io::Error> {
let app = build_routes(ctx); let app = build_routes(ctx);
// Background reaper: periodically remove completed/failed agent entries
// that have exceeded the TTL.
{
let reaper_agents = Arc::clone(&startup_agents);
let ttl = std::time::Duration::from_secs(agents::DEFAULT_AGENT_TTL_SECS);
tokio::spawn(async move {
// Check every 5 minutes.
let interval = std::time::Duration::from_secs(300);
loop {
tokio::time::sleep(interval).await;
reaper_agents.reap_expired_agents(ttl);
}
});
}
// On startup: // On startup:
// 1. Reconcile any stories whose agent work was committed while the server was // 1. Reconcile any stories whose agent work was committed while the server was
// offline (worktree has commits ahead of master but pipeline didn't advance). // offline (worktree has commits ahead of master but pipeline didn't advance).