huskies: merge 476_refactor_split_agents_pool_lifecycle_rs_into_submodules
This commit is contained in:
@@ -1,6 +1,8 @@
|
|||||||
mod auto_assign;
|
mod auto_assign;
|
||||||
mod lifecycle;
|
|
||||||
mod pipeline;
|
mod pipeline;
|
||||||
|
mod start;
|
||||||
|
mod stop;
|
||||||
|
mod wait;
|
||||||
mod process;
|
mod process;
|
||||||
mod query;
|
mod query;
|
||||||
mod types;
|
mod types;
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
use crate::agent_log::AgentLogWriter;
|
use crate::agent_log::AgentLogWriter;
|
||||||
use crate::config::ProjectConfig;
|
use crate::config::ProjectConfig;
|
||||||
use crate::slog;
|
|
||||||
use crate::slog_error;
|
use crate::slog_error;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
@@ -10,7 +9,7 @@ use super::super::{
|
|||||||
AgentEvent, AgentInfo, AgentStatus, PipelineStage, agent_config_stage,
|
AgentEvent, AgentInfo, AgentStatus, PipelineStage, agent_config_stage,
|
||||||
pipeline_stage,
|
pipeline_stage,
|
||||||
};
|
};
|
||||||
use super::types::{PendingGuard, StoryAgent, agent_info_from_entry, composite_key};
|
use super::types::{PendingGuard, StoryAgent, composite_key};
|
||||||
use super::{AgentPool, auto_assign};
|
use super::{AgentPool, auto_assign};
|
||||||
use super::worktree::find_active_story_stage;
|
use super::worktree::find_active_story_stage;
|
||||||
use super::super::runtime::{AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext};
|
use super::super::runtime::{AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext};
|
||||||
@@ -601,191 +600,6 @@ impl AgentPool {
|
|||||||
throttled: false,
|
throttled: false,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop a running agent. Worktree is preserved for inspection.
|
|
||||||
pub async fn stop_agent(
|
|
||||||
&self,
|
|
||||||
_project_root: &Path,
|
|
||||||
story_id: &str,
|
|
||||||
agent_name: &str,
|
|
||||||
) -> Result<(), String> {
|
|
||||||
let key = composite_key(story_id, agent_name);
|
|
||||||
|
|
||||||
let (worktree_info, task_handle, tx) = {
|
|
||||||
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
||||||
let agent = agents
|
|
||||||
.get_mut(&key)
|
|
||||||
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
|
|
||||||
|
|
||||||
let wt = agent.worktree_info.clone();
|
|
||||||
let handle = agent.task_handle.take();
|
|
||||||
let tx = agent.tx.clone();
|
|
||||||
agent.status = AgentStatus::Failed;
|
|
||||||
(wt, handle, tx)
|
|
||||||
};
|
|
||||||
|
|
||||||
// Abort the task and kill the PTY child process.
|
|
||||||
// Note: aborting a spawn_blocking task handle does not interrupt the blocking
|
|
||||||
// thread, so we must also kill the child process directly via the killer registry.
|
|
||||||
if let Some(handle) = task_handle {
|
|
||||||
handle.abort();
|
|
||||||
let _ = handle.await;
|
|
||||||
}
|
|
||||||
self.kill_child_for_key(&key);
|
|
||||||
|
|
||||||
// Preserve worktree for inspection — don't destroy agent's work on stop.
|
|
||||||
if let Some(ref wt) = worktree_info {
|
|
||||||
slog!(
|
|
||||||
"[agents] Worktree preserved for {story_id}:{agent_name}: {}",
|
|
||||||
wt.path.display()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
let _ = tx.send(AgentEvent::Status {
|
|
||||||
story_id: story_id.to_string(),
|
|
||||||
agent_name: agent_name.to_string(),
|
|
||||||
status: "stopped".to_string(),
|
|
||||||
});
|
|
||||||
|
|
||||||
// Remove from map
|
|
||||||
{
|
|
||||||
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
||||||
agents.remove(&key);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Notify WebSocket clients so pipeline board and agent panel update.
|
|
||||||
Self::notify_agent_state_changed(&self.watcher_tx);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Block until the agent reaches a terminal state (completed, failed, stopped).
|
|
||||||
/// Returns the agent's final `AgentInfo`.
|
|
||||||
/// `timeout_ms` caps how long to wait; returns an error if the deadline passes.
|
|
||||||
pub async fn wait_for_agent(
|
|
||||||
&self,
|
|
||||||
story_id: &str,
|
|
||||||
agent_name: &str,
|
|
||||||
timeout_ms: u64,
|
|
||||||
) -> Result<AgentInfo, String> {
|
|
||||||
// Subscribe before checking status so we don't miss the terminal event
|
|
||||||
// if the agent completes in the window between the two operations.
|
|
||||||
let mut rx = self.subscribe(story_id, agent_name)?;
|
|
||||||
|
|
||||||
// Return immediately if already in a terminal state.
|
|
||||||
{
|
|
||||||
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
||||||
let key = composite_key(story_id, agent_name);
|
|
||||||
if let Some(agent) = agents.get(&key)
|
|
||||||
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
|
|
||||||
{
|
|
||||||
return Ok(agent_info_from_entry(story_id, agent));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
|
||||||
if remaining.is_zero() {
|
|
||||||
return Err(format!(
|
|
||||||
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
match tokio::time::timeout(remaining, rx.recv()).await {
|
|
||||||
Ok(Ok(event)) => {
|
|
||||||
let is_terminal = match &event {
|
|
||||||
AgentEvent::Done { .. } | AgentEvent::Error { .. } => true,
|
|
||||||
AgentEvent::Status { status, .. } if status == "stopped" => true,
|
|
||||||
_ => false,
|
|
||||||
};
|
|
||||||
if is_terminal {
|
|
||||||
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
||||||
let key = composite_key(story_id, agent_name);
|
|
||||||
return Ok(if let Some(agent) = agents.get(&key) {
|
|
||||||
agent_info_from_entry(story_id, agent)
|
|
||||||
} else {
|
|
||||||
// Agent was removed from map (e.g. stop_agent removes it after
|
|
||||||
// the "stopped" status event is sent).
|
|
||||||
let (status, session_id) = match event {
|
|
||||||
AgentEvent::Done { session_id, .. } => {
|
|
||||||
(AgentStatus::Completed, session_id)
|
|
||||||
}
|
|
||||||
_ => (AgentStatus::Failed, None),
|
|
||||||
};
|
|
||||||
AgentInfo {
|
|
||||||
story_id: story_id.to_string(),
|
|
||||||
agent_name: agent_name.to_string(),
|
|
||||||
status,
|
|
||||||
session_id,
|
|
||||||
worktree_path: None,
|
|
||||||
base_branch: None,
|
|
||||||
completion: None,
|
|
||||||
log_session_id: None,
|
|
||||||
throttled: false,
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Err(broadcast::error::RecvError::Lagged(_))) => {
|
|
||||||
// Missed some buffered events — check current status before resuming.
|
|
||||||
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
||||||
let key = composite_key(story_id, agent_name);
|
|
||||||
if let Some(agent) = agents.get(&key)
|
|
||||||
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
|
|
||||||
{
|
|
||||||
return Ok(agent_info_from_entry(story_id, agent));
|
|
||||||
}
|
|
||||||
// Still running — continue the loop.
|
|
||||||
}
|
|
||||||
Ok(Err(broadcast::error::RecvError::Closed)) => {
|
|
||||||
// Channel closed: no more events will arrive. Return current state.
|
|
||||||
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
||||||
let key = composite_key(story_id, agent_name);
|
|
||||||
if let Some(agent) = agents.get(&key) {
|
|
||||||
return Ok(agent_info_from_entry(story_id, agent));
|
|
||||||
}
|
|
||||||
return Err(format!(
|
|
||||||
"Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly"
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
return Err(format!(
|
|
||||||
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove all agent entries for a given story_id from the pool.
|
|
||||||
///
|
|
||||||
/// Called when a story is archived so that stale entries don't accumulate.
|
|
||||||
/// Returns the number of entries removed.
|
|
||||||
pub fn remove_agents_for_story(&self, story_id: &str) -> usize {
|
|
||||||
let mut agents = match self.agents.lock() {
|
|
||||||
Ok(a) => a,
|
|
||||||
Err(e) => {
|
|
||||||
slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let prefix = format!("{story_id}:");
|
|
||||||
let keys_to_remove: Vec<String> = agents
|
|
||||||
.keys()
|
|
||||||
.filter(|k| k.starts_with(&prefix))
|
|
||||||
.cloned()
|
|
||||||
.collect();
|
|
||||||
let count = keys_to_remove.len();
|
|
||||||
for key in &keys_to_remove {
|
|
||||||
agents.remove(key);
|
|
||||||
}
|
|
||||||
if count > 0 {
|
|
||||||
slog!("[agents] Removed {count} agent entries for archived story '{story_id}'");
|
|
||||||
}
|
|
||||||
count
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -793,83 +607,6 @@ mod tests {
|
|||||||
use super::super::AgentPool;
|
use super::super::AgentPool;
|
||||||
use crate::agents::{AgentEvent, AgentStatus};
|
use crate::agents::{AgentEvent, AgentStatus};
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn wait_for_agent_returns_immediately_if_completed() {
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.inject_test_agent("s1", "bot", AgentStatus::Completed);
|
|
||||||
|
|
||||||
let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap();
|
|
||||||
assert_eq!(info.status, AgentStatus::Completed);
|
|
||||||
assert_eq!(info.story_id, "s1");
|
|
||||||
assert_eq!(info.agent_name, "bot");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn wait_for_agent_returns_immediately_if_failed() {
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.inject_test_agent("s2", "bot", AgentStatus::Failed);
|
|
||||||
|
|
||||||
let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap();
|
|
||||||
assert_eq!(info.status, AgentStatus::Failed);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn wait_for_agent_completes_on_done_event() {
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running);
|
|
||||||
|
|
||||||
// Send Done event after a short delay
|
|
||||||
let tx_clone = tx.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
|
||||||
let _ = tx_clone.send(AgentEvent::Done {
|
|
||||||
story_id: "s3".to_string(),
|
|
||||||
agent_name: "bot".to_string(),
|
|
||||||
session_id: Some("sess-abc".to_string()),
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap();
|
|
||||||
assert_eq!(info.story_id, "s3");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn wait_for_agent_times_out() {
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.inject_test_agent("s4", "bot", AgentStatus::Running);
|
|
||||||
|
|
||||||
let result = pool.wait_for_agent("s4", "bot", 50).await;
|
|
||||||
assert!(result.is_err());
|
|
||||||
let msg = result.unwrap_err();
|
|
||||||
assert!(msg.contains("Timed out"), "unexpected message: {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn wait_for_agent_errors_for_nonexistent() {
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
let result = pool.wait_for_agent("no_story", "no_bot", 100).await;
|
|
||||||
assert!(result.is_err());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn wait_for_agent_completes_on_stopped_status_event() {
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running);
|
|
||||||
|
|
||||||
let tx_clone = tx.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
|
|
||||||
let _ = tx_clone.send(AgentEvent::Status {
|
|
||||||
story_id: "s5".to_string(),
|
|
||||||
agent_name: "bot".to_string(),
|
|
||||||
status: "stopped".to_string(),
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap();
|
|
||||||
assert_eq!(info.story_id, "s5");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn start_agent_auto_selects_second_coder_when_first_busy() {
|
async fn start_agent_auto_selects_second_coder_when_first_busy() {
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
@@ -1640,35 +1377,6 @@ stage = "coder"
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── remove_agents_for_story tests ────────────────────────────────────────
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn remove_agents_for_story_removes_all_entries() {
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed);
|
|
||||||
pool.inject_test_agent("story_a", "qa", AgentStatus::Failed);
|
|
||||||
pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running);
|
|
||||||
|
|
||||||
let removed = pool.remove_agents_for_story("story_a");
|
|
||||||
assert_eq!(removed, 2, "should remove both agents for story_a");
|
|
||||||
|
|
||||||
let agents = pool.list_agents().unwrap();
|
|
||||||
assert_eq!(agents.len(), 1, "only story_b agent should remain");
|
|
||||||
assert_eq!(agents[0].story_id, "story_b");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn remove_agents_for_story_returns_zero_when_no_match() {
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running);
|
|
||||||
|
|
||||||
let removed = pool.remove_agents_for_story("nonexistent");
|
|
||||||
assert_eq!(removed, 0);
|
|
||||||
|
|
||||||
let agents = pool.list_agents().unwrap();
|
|
||||||
assert_eq!(agents.len(), 1, "existing agents should not be affected");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── front matter agent preference (bug 379) ──────────────────────────────
|
// ── front matter agent preference (bug 379) ──────────────────────────────
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -1770,43 +1478,4 @@ stage = "coder"
|
|||||||
"error should say agent is busy or story is queued: {err}"
|
"error should say agent is busy or story is queued: {err}"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── archive + cleanup integration test ───────────────────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn archiving_story_removes_agent_entries_from_pool() {
|
|
||||||
use crate::agents::lifecycle::move_story_to_done;
|
|
||||||
use std::fs;
|
|
||||||
|
|
||||||
let tmp = tempfile::tempdir().unwrap();
|
|
||||||
let root = tmp.path();
|
|
||||||
|
|
||||||
let current = root.join(".huskies/work/2_current");
|
|
||||||
fs::create_dir_all(¤t).unwrap();
|
|
||||||
fs::write(current.join("60_story_cleanup.md"), "test").unwrap();
|
|
||||||
|
|
||||||
let pool = AgentPool::new_test(3001);
|
|
||||||
pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed);
|
|
||||||
pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed);
|
|
||||||
pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running);
|
|
||||||
|
|
||||||
assert_eq!(pool.list_agents().unwrap().len(), 3);
|
|
||||||
|
|
||||||
move_story_to_done(root, "60_story_cleanup").unwrap();
|
|
||||||
pool.remove_agents_for_story("60_story_cleanup");
|
|
||||||
|
|
||||||
let remaining = pool.list_agents().unwrap();
|
|
||||||
assert_eq!(
|
|
||||||
remaining.len(),
|
|
||||||
1,
|
|
||||||
"only the other story's agent should remain"
|
|
||||||
);
|
|
||||||
assert_eq!(remaining[0].story_id, "61_story_other");
|
|
||||||
|
|
||||||
assert!(
|
|
||||||
root.join(".huskies/work/5_done/60_story_cleanup.md")
|
|
||||||
.exists()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -0,0 +1,167 @@
|
|||||||
|
use crate::slog;
|
||||||
|
use crate::slog_error;
|
||||||
|
use std::path::Path;
|
||||||
|
|
||||||
|
use super::super::{AgentEvent, AgentStatus};
|
||||||
|
use super::types::composite_key;
|
||||||
|
use super::AgentPool;
|
||||||
|
|
||||||
|
impl AgentPool {
|
||||||
|
/// Stop a running agent. Worktree is preserved for inspection.
|
||||||
|
pub async fn stop_agent(
|
||||||
|
&self,
|
||||||
|
_project_root: &Path,
|
||||||
|
story_id: &str,
|
||||||
|
agent_name: &str,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
let key = composite_key(story_id, agent_name);
|
||||||
|
|
||||||
|
let (worktree_info, task_handle, tx) = {
|
||||||
|
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||||
|
let agent = agents
|
||||||
|
.get_mut(&key)
|
||||||
|
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
|
||||||
|
|
||||||
|
let wt = agent.worktree_info.clone();
|
||||||
|
let handle = agent.task_handle.take();
|
||||||
|
let tx = agent.tx.clone();
|
||||||
|
agent.status = AgentStatus::Failed;
|
||||||
|
(wt, handle, tx)
|
||||||
|
};
|
||||||
|
|
||||||
|
// Abort the task and kill the PTY child process.
|
||||||
|
// Note: aborting a spawn_blocking task handle does not interrupt the blocking
|
||||||
|
// thread, so we must also kill the child process directly via the killer registry.
|
||||||
|
if let Some(handle) = task_handle {
|
||||||
|
handle.abort();
|
||||||
|
let _ = handle.await;
|
||||||
|
}
|
||||||
|
self.kill_child_for_key(&key);
|
||||||
|
|
||||||
|
// Preserve worktree for inspection — don't destroy agent's work on stop.
|
||||||
|
if let Some(ref wt) = worktree_info {
|
||||||
|
slog!(
|
||||||
|
"[agents] Worktree preserved for {story_id}:{agent_name}: {}",
|
||||||
|
wt.path.display()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = tx.send(AgentEvent::Status {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
agent_name: agent_name.to_string(),
|
||||||
|
status: "stopped".to_string(),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Remove from map
|
||||||
|
{
|
||||||
|
let mut agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||||
|
agents.remove(&key);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify WebSocket clients so pipeline board and agent panel update.
|
||||||
|
Self::notify_agent_state_changed(&self.watcher_tx);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove all agent entries for a given story_id from the pool.
|
||||||
|
///
|
||||||
|
/// Called when a story is archived so that stale entries don't accumulate.
|
||||||
|
/// Returns the number of entries removed.
|
||||||
|
pub fn remove_agents_for_story(&self, story_id: &str) -> usize {
|
||||||
|
let mut agents = match self.agents.lock() {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => {
|
||||||
|
slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let prefix = format!("{story_id}:");
|
||||||
|
let keys_to_remove: Vec<String> = agents
|
||||||
|
.keys()
|
||||||
|
.filter(|k| k.starts_with(&prefix))
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
let count = keys_to_remove.len();
|
||||||
|
for key in &keys_to_remove {
|
||||||
|
agents.remove(key);
|
||||||
|
}
|
||||||
|
if count > 0 {
|
||||||
|
slog!("[agents] Removed {count} agent entries for archived story '{story_id}'");
|
||||||
|
}
|
||||||
|
count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::super::AgentPool;
|
||||||
|
use crate::agents::AgentStatus;
|
||||||
|
|
||||||
|
// ── remove_agents_for_story tests ────────────────────────────────────────
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn remove_agents_for_story_removes_all_entries() {
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed);
|
||||||
|
pool.inject_test_agent("story_a", "qa", AgentStatus::Failed);
|
||||||
|
pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running);
|
||||||
|
|
||||||
|
let removed = pool.remove_agents_for_story("story_a");
|
||||||
|
assert_eq!(removed, 2, "should remove both agents for story_a");
|
||||||
|
|
||||||
|
let agents = pool.list_agents().unwrap();
|
||||||
|
assert_eq!(agents.len(), 1, "only story_b agent should remain");
|
||||||
|
assert_eq!(agents[0].story_id, "story_b");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn remove_agents_for_story_returns_zero_when_no_match() {
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running);
|
||||||
|
|
||||||
|
let removed = pool.remove_agents_for_story("nonexistent");
|
||||||
|
assert_eq!(removed, 0);
|
||||||
|
|
||||||
|
let agents = pool.list_agents().unwrap();
|
||||||
|
assert_eq!(agents.len(), 1, "existing agents should not be affected");
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── archive + cleanup integration test ───────────────────────────────────
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn archiving_story_removes_agent_entries_from_pool() {
|
||||||
|
use crate::agents::lifecycle::move_story_to_done;
|
||||||
|
use std::fs;
|
||||||
|
|
||||||
|
let tmp = tempfile::tempdir().unwrap();
|
||||||
|
let root = tmp.path();
|
||||||
|
|
||||||
|
let current = root.join(".huskies/work/2_current");
|
||||||
|
fs::create_dir_all(¤t).unwrap();
|
||||||
|
fs::write(current.join("60_story_cleanup.md"), "test").unwrap();
|
||||||
|
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed);
|
||||||
|
pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed);
|
||||||
|
pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running);
|
||||||
|
|
||||||
|
assert_eq!(pool.list_agents().unwrap().len(), 3);
|
||||||
|
|
||||||
|
move_story_to_done(root, "60_story_cleanup").unwrap();
|
||||||
|
pool.remove_agents_for_story("60_story_cleanup");
|
||||||
|
|
||||||
|
let remaining = pool.list_agents().unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
remaining.len(),
|
||||||
|
1,
|
||||||
|
"only the other story's agent should remain"
|
||||||
|
);
|
||||||
|
assert_eq!(remaining[0].story_id, "61_story_other");
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
root.join(".huskies/work/5_done/60_story_cleanup.md")
|
||||||
|
.exists()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,190 @@
|
|||||||
|
use super::super::{AgentEvent, AgentInfo, AgentStatus};
|
||||||
|
use super::types::{agent_info_from_entry, composite_key};
|
||||||
|
use super::AgentPool;
|
||||||
|
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
|
impl AgentPool {
|
||||||
|
/// Block until the agent reaches a terminal state (completed, failed, stopped).
|
||||||
|
/// Returns the agent's final `AgentInfo`.
|
||||||
|
/// `timeout_ms` caps how long to wait; returns an error if the deadline passes.
|
||||||
|
pub async fn wait_for_agent(
|
||||||
|
&self,
|
||||||
|
story_id: &str,
|
||||||
|
agent_name: &str,
|
||||||
|
timeout_ms: u64,
|
||||||
|
) -> Result<AgentInfo, String> {
|
||||||
|
// Subscribe before checking status so we don't miss the terminal event
|
||||||
|
// if the agent completes in the window between the two operations.
|
||||||
|
let mut rx = self.subscribe(story_id, agent_name)?;
|
||||||
|
|
||||||
|
// Return immediately if already in a terminal state.
|
||||||
|
{
|
||||||
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||||
|
let key = composite_key(story_id, agent_name);
|
||||||
|
if let Some(agent) = agents.get(&key)
|
||||||
|
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
|
||||||
|
{
|
||||||
|
return Ok(agent_info_from_entry(story_id, agent));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
||||||
|
if remaining.is_zero() {
|
||||||
|
return Err(format!(
|
||||||
|
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
match tokio::time::timeout(remaining, rx.recv()).await {
|
||||||
|
Ok(Ok(event)) => {
|
||||||
|
let is_terminal = match &event {
|
||||||
|
AgentEvent::Done { .. } | AgentEvent::Error { .. } => true,
|
||||||
|
AgentEvent::Status { status, .. } if status == "stopped" => true,
|
||||||
|
_ => false,
|
||||||
|
};
|
||||||
|
if is_terminal {
|
||||||
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||||
|
let key = composite_key(story_id, agent_name);
|
||||||
|
return Ok(if let Some(agent) = agents.get(&key) {
|
||||||
|
agent_info_from_entry(story_id, agent)
|
||||||
|
} else {
|
||||||
|
// Agent was removed from map (e.g. stop_agent removes it after
|
||||||
|
// the "stopped" status event is sent).
|
||||||
|
let (status, session_id) = match event {
|
||||||
|
AgentEvent::Done { session_id, .. } => {
|
||||||
|
(AgentStatus::Completed, session_id)
|
||||||
|
}
|
||||||
|
_ => (AgentStatus::Failed, None),
|
||||||
|
};
|
||||||
|
AgentInfo {
|
||||||
|
story_id: story_id.to_string(),
|
||||||
|
agent_name: agent_name.to_string(),
|
||||||
|
status,
|
||||||
|
session_id,
|
||||||
|
worktree_path: None,
|
||||||
|
base_branch: None,
|
||||||
|
completion: None,
|
||||||
|
log_session_id: None,
|
||||||
|
throttled: false,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Err(broadcast::error::RecvError::Lagged(_))) => {
|
||||||
|
// Missed some buffered events — check current status before resuming.
|
||||||
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||||
|
let key = composite_key(story_id, agent_name);
|
||||||
|
if let Some(agent) = agents.get(&key)
|
||||||
|
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
|
||||||
|
{
|
||||||
|
return Ok(agent_info_from_entry(story_id, agent));
|
||||||
|
}
|
||||||
|
// Still running — continue the loop.
|
||||||
|
}
|
||||||
|
Ok(Err(broadcast::error::RecvError::Closed)) => {
|
||||||
|
// Channel closed: no more events will arrive. Return current state.
|
||||||
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
||||||
|
let key = composite_key(story_id, agent_name);
|
||||||
|
if let Some(agent) = agents.get(&key) {
|
||||||
|
return Ok(agent_info_from_entry(story_id, agent));
|
||||||
|
}
|
||||||
|
return Err(format!(
|
||||||
|
"Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
return Err(format!(
|
||||||
|
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::super::AgentPool;
|
||||||
|
use crate::agents::{AgentEvent, AgentStatus};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn wait_for_agent_returns_immediately_if_completed() {
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
pool.inject_test_agent("s1", "bot", AgentStatus::Completed);
|
||||||
|
|
||||||
|
let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap();
|
||||||
|
assert_eq!(info.status, AgentStatus::Completed);
|
||||||
|
assert_eq!(info.story_id, "s1");
|
||||||
|
assert_eq!(info.agent_name, "bot");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn wait_for_agent_returns_immediately_if_failed() {
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
pool.inject_test_agent("s2", "bot", AgentStatus::Failed);
|
||||||
|
|
||||||
|
let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap();
|
||||||
|
assert_eq!(info.status, AgentStatus::Failed);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn wait_for_agent_completes_on_done_event() {
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running);
|
||||||
|
|
||||||
|
// Send Done event after a short delay
|
||||||
|
let tx_clone = tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||||
|
let _ = tx_clone.send(AgentEvent::Done {
|
||||||
|
story_id: "s3".to_string(),
|
||||||
|
agent_name: "bot".to_string(),
|
||||||
|
session_id: Some("sess-abc".to_string()),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap();
|
||||||
|
assert_eq!(info.story_id, "s3");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn wait_for_agent_times_out() {
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
pool.inject_test_agent("s4", "bot", AgentStatus::Running);
|
||||||
|
|
||||||
|
let result = pool.wait_for_agent("s4", "bot", 50).await;
|
||||||
|
assert!(result.is_err());
|
||||||
|
let msg = result.unwrap_err();
|
||||||
|
assert!(msg.contains("Timed out"), "unexpected message: {msg}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn wait_for_agent_errors_for_nonexistent() {
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
let result = pool.wait_for_agent("no_story", "no_bot", 100).await;
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn wait_for_agent_completes_on_stopped_status_event() {
|
||||||
|
let pool = AgentPool::new_test(3001);
|
||||||
|
let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running);
|
||||||
|
|
||||||
|
let tx_clone = tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
|
||||||
|
let _ = tx_clone.send(AgentEvent::Status {
|
||||||
|
story_id: "s5".to_string(),
|
||||||
|
agent_name: "bot".to_string(),
|
||||||
|
status: "stopped".to_string(),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap();
|
||||||
|
assert_eq!(info.story_id, "s5");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user