diff --git a/server/src/agents/pool/mod.rs b/server/src/agents/pool/mod.rs index 722ab91d..79dce61f 100644 --- a/server/src/agents/pool/mod.rs +++ b/server/src/agents/pool/mod.rs @@ -1,6 +1,8 @@ mod auto_assign; -mod lifecycle; mod pipeline; +mod start; +mod stop; +mod wait; mod process; mod query; mod types; diff --git a/server/src/agents/pool/lifecycle.rs b/server/src/agents/pool/start.rs similarity index 80% rename from server/src/agents/pool/lifecycle.rs rename to server/src/agents/pool/start.rs index 76bd0bbb..9c3bd514 100644 --- a/server/src/agents/pool/lifecycle.rs +++ b/server/src/agents/pool/start.rs @@ -1,6 +1,5 @@ use crate::agent_log::AgentLogWriter; use crate::config::ProjectConfig; -use crate::slog; use crate::slog_error; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -10,7 +9,7 @@ use super::super::{ AgentEvent, AgentInfo, AgentStatus, PipelineStage, agent_config_stage, pipeline_stage, }; -use super::types::{PendingGuard, StoryAgent, agent_info_from_entry, composite_key}; +use super::types::{PendingGuard, StoryAgent, composite_key}; use super::{AgentPool, auto_assign}; use super::worktree::find_active_story_stage; use super::super::runtime::{AgentRuntime, ClaudeCodeRuntime, GeminiRuntime, OpenAiRuntime, RuntimeContext}; @@ -601,191 +600,6 @@ impl AgentPool { throttled: false, }) } - - /// Stop a running agent. Worktree is preserved for inspection. - pub async fn stop_agent( - &self, - _project_root: &Path, - story_id: &str, - agent_name: &str, - ) -> Result<(), String> { - let key = composite_key(story_id, agent_name); - - let (worktree_info, task_handle, tx) = { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - let agent = agents - .get_mut(&key) - .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; - - let wt = agent.worktree_info.clone(); - let handle = agent.task_handle.take(); - let tx = agent.tx.clone(); - agent.status = AgentStatus::Failed; - (wt, handle, tx) - }; - - // Abort the task and kill the PTY child process. - // Note: aborting a spawn_blocking task handle does not interrupt the blocking - // thread, so we must also kill the child process directly via the killer registry. - if let Some(handle) = task_handle { - handle.abort(); - let _ = handle.await; - } - self.kill_child_for_key(&key); - - // Preserve worktree for inspection — don't destroy agent's work on stop. - if let Some(ref wt) = worktree_info { - slog!( - "[agents] Worktree preserved for {story_id}:{agent_name}: {}", - wt.path.display() - ); - } - - let _ = tx.send(AgentEvent::Status { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - status: "stopped".to_string(), - }); - - // Remove from map - { - let mut agents = self.agents.lock().map_err(|e| e.to_string())?; - agents.remove(&key); - } - - // Notify WebSocket clients so pipeline board and agent panel update. - Self::notify_agent_state_changed(&self.watcher_tx); - - Ok(()) - } - - /// Block until the agent reaches a terminal state (completed, failed, stopped). - /// Returns the agent's final `AgentInfo`. - /// `timeout_ms` caps how long to wait; returns an error if the deadline passes. - pub async fn wait_for_agent( - &self, - story_id: &str, - agent_name: &str, - timeout_ms: u64, - ) -> Result { - // Subscribe before checking status so we don't miss the terminal event - // if the agent completes in the window between the two operations. - let mut rx = self.subscribe(story_id, agent_name)?; - - // Return immediately if already in a terminal state. - { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - if let Some(agent) = agents.get(&key) - && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) - { - return Ok(agent_info_from_entry(story_id, agent)); - } - } - - let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms); - - loop { - let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); - if remaining.is_zero() { - return Err(format!( - "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" - )); - } - - match tokio::time::timeout(remaining, rx.recv()).await { - Ok(Ok(event)) => { - let is_terminal = match &event { - AgentEvent::Done { .. } | AgentEvent::Error { .. } => true, - AgentEvent::Status { status, .. } if status == "stopped" => true, - _ => false, - }; - if is_terminal { - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - return Ok(if let Some(agent) = agents.get(&key) { - agent_info_from_entry(story_id, agent) - } else { - // Agent was removed from map (e.g. stop_agent removes it after - // the "stopped" status event is sent). - let (status, session_id) = match event { - AgentEvent::Done { session_id, .. } => { - (AgentStatus::Completed, session_id) - } - _ => (AgentStatus::Failed, None), - }; - AgentInfo { - story_id: story_id.to_string(), - agent_name: agent_name.to_string(), - status, - session_id, - worktree_path: None, - base_branch: None, - completion: None, - log_session_id: None, - throttled: false, - } - }); - } - } - Ok(Err(broadcast::error::RecvError::Lagged(_))) => { - // Missed some buffered events — check current status before resuming. - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - if let Some(agent) = agents.get(&key) - && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) - { - return Ok(agent_info_from_entry(story_id, agent)); - } - // Still running — continue the loop. - } - Ok(Err(broadcast::error::RecvError::Closed)) => { - // Channel closed: no more events will arrive. Return current state. - let agents = self.agents.lock().map_err(|e| e.to_string())?; - let key = composite_key(story_id, agent_name); - if let Some(agent) = agents.get(&key) { - return Ok(agent_info_from_entry(story_id, agent)); - } - return Err(format!( - "Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly" - )); - } - Err(_) => { - return Err(format!( - "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" - )); - } - } - } - } - - /// Remove all agent entries for a given story_id from the pool. - /// - /// Called when a story is archived so that stale entries don't accumulate. - /// Returns the number of entries removed. - pub fn remove_agents_for_story(&self, story_id: &str) -> usize { - let mut agents = match self.agents.lock() { - Ok(a) => a, - Err(e) => { - slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); - return 0; - } - }; - let prefix = format!("{story_id}:"); - let keys_to_remove: Vec = agents - .keys() - .filter(|k| k.starts_with(&prefix)) - .cloned() - .collect(); - let count = keys_to_remove.len(); - for key in &keys_to_remove { - agents.remove(key); - } - if count > 0 { - slog!("[agents] Removed {count} agent entries for archived story '{story_id}'"); - } - count - } } #[cfg(test)] @@ -793,83 +607,6 @@ mod tests { use super::super::AgentPool; use crate::agents::{AgentEvent, AgentStatus}; - #[tokio::test] - async fn wait_for_agent_returns_immediately_if_completed() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s1", "bot", AgentStatus::Completed); - - let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); - assert_eq!(info.status, AgentStatus::Completed); - assert_eq!(info.story_id, "s1"); - assert_eq!(info.agent_name, "bot"); - } - - #[tokio::test] - async fn wait_for_agent_returns_immediately_if_failed() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s2", "bot", AgentStatus::Failed); - - let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); - assert_eq!(info.status, AgentStatus::Failed); - } - - #[tokio::test] - async fn wait_for_agent_completes_on_done_event() { - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); - - // Send Done event after a short delay - let tx_clone = tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - let _ = tx_clone.send(AgentEvent::Done { - story_id: "s3".to_string(), - agent_name: "bot".to_string(), - session_id: Some("sess-abc".to_string()), - }); - }); - - let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap(); - assert_eq!(info.story_id, "s3"); - } - - #[tokio::test] - async fn wait_for_agent_times_out() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("s4", "bot", AgentStatus::Running); - - let result = pool.wait_for_agent("s4", "bot", 50).await; - assert!(result.is_err()); - let msg = result.unwrap_err(); - assert!(msg.contains("Timed out"), "unexpected message: {msg}"); - } - - #[tokio::test] - async fn wait_for_agent_errors_for_nonexistent() { - let pool = AgentPool::new_test(3001); - let result = pool.wait_for_agent("no_story", "no_bot", 100).await; - assert!(result.is_err()); - } - - #[tokio::test] - async fn wait_for_agent_completes_on_stopped_status_event() { - let pool = AgentPool::new_test(3001); - let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); - - let tx_clone = tx.clone(); - tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_millis(30)).await; - let _ = tx_clone.send(AgentEvent::Status { - story_id: "s5".to_string(), - agent_name: "bot".to_string(), - status: "stopped".to_string(), - }); - }); - - let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap(); - assert_eq!(info.story_id, "s5"); - } - #[tokio::test] async fn start_agent_auto_selects_second_coder_when_first_busy() { let tmp = tempfile::tempdir().unwrap(); @@ -1640,35 +1377,6 @@ stage = "coder" } } - // ── remove_agents_for_story tests ──────────────────────────────────────── - - #[test] - fn remove_agents_for_story_removes_all_entries() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed); - pool.inject_test_agent("story_a", "qa", AgentStatus::Failed); - pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); - - let removed = pool.remove_agents_for_story("story_a"); - assert_eq!(removed, 2, "should remove both agents for story_a"); - - let agents = pool.list_agents().unwrap(); - assert_eq!(agents.len(), 1, "only story_b agent should remain"); - assert_eq!(agents[0].story_id, "story_b"); - } - - #[test] - fn remove_agents_for_story_returns_zero_when_no_match() { - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); - - let removed = pool.remove_agents_for_story("nonexistent"); - assert_eq!(removed, 0); - - let agents = pool.list_agents().unwrap(); - assert_eq!(agents.len(), 1, "existing agents should not be affected"); - } - // ── front matter agent preference (bug 379) ────────────────────────────── #[tokio::test] @@ -1770,43 +1478,4 @@ stage = "coder" "error should say agent is busy or story is queued: {err}" ); } - - // ── archive + cleanup integration test ─────────────────────────────────── - - #[tokio::test] - async fn archiving_story_removes_agent_entries_from_pool() { - use crate::agents::lifecycle::move_story_to_done; - use std::fs; - - let tmp = tempfile::tempdir().unwrap(); - let root = tmp.path(); - - let current = root.join(".huskies/work/2_current"); - fs::create_dir_all(¤t).unwrap(); - fs::write(current.join("60_story_cleanup.md"), "test").unwrap(); - - let pool = AgentPool::new_test(3001); - pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); - pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed); - pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running); - - assert_eq!(pool.list_agents().unwrap().len(), 3); - - move_story_to_done(root, "60_story_cleanup").unwrap(); - pool.remove_agents_for_story("60_story_cleanup"); - - let remaining = pool.list_agents().unwrap(); - assert_eq!( - remaining.len(), - 1, - "only the other story's agent should remain" - ); - assert_eq!(remaining[0].story_id, "61_story_other"); - - assert!( - root.join(".huskies/work/5_done/60_story_cleanup.md") - .exists() - ); - } - } diff --git a/server/src/agents/pool/stop.rs b/server/src/agents/pool/stop.rs new file mode 100644 index 00000000..1a6d74c3 --- /dev/null +++ b/server/src/agents/pool/stop.rs @@ -0,0 +1,167 @@ +use crate::slog; +use crate::slog_error; +use std::path::Path; + +use super::super::{AgentEvent, AgentStatus}; +use super::types::composite_key; +use super::AgentPool; + +impl AgentPool { + /// Stop a running agent. Worktree is preserved for inspection. + pub async fn stop_agent( + &self, + _project_root: &Path, + story_id: &str, + agent_name: &str, + ) -> Result<(), String> { + let key = composite_key(story_id, agent_name); + + let (worktree_info, task_handle, tx) = { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + let agent = agents + .get_mut(&key) + .ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?; + + let wt = agent.worktree_info.clone(); + let handle = agent.task_handle.take(); + let tx = agent.tx.clone(); + agent.status = AgentStatus::Failed; + (wt, handle, tx) + }; + + // Abort the task and kill the PTY child process. + // Note: aborting a spawn_blocking task handle does not interrupt the blocking + // thread, so we must also kill the child process directly via the killer registry. + if let Some(handle) = task_handle { + handle.abort(); + let _ = handle.await; + } + self.kill_child_for_key(&key); + + // Preserve worktree for inspection — don't destroy agent's work on stop. + if let Some(ref wt) = worktree_info { + slog!( + "[agents] Worktree preserved for {story_id}:{agent_name}: {}", + wt.path.display() + ); + } + + let _ = tx.send(AgentEvent::Status { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + status: "stopped".to_string(), + }); + + // Remove from map + { + let mut agents = self.agents.lock().map_err(|e| e.to_string())?; + agents.remove(&key); + } + + // Notify WebSocket clients so pipeline board and agent panel update. + Self::notify_agent_state_changed(&self.watcher_tx); + + Ok(()) + } + + /// Remove all agent entries for a given story_id from the pool. + /// + /// Called when a story is archived so that stale entries don't accumulate. + /// Returns the number of entries removed. + pub fn remove_agents_for_story(&self, story_id: &str) -> usize { + let mut agents = match self.agents.lock() { + Ok(a) => a, + Err(e) => { + slog_error!("[agents] Failed to lock pool for cleanup of '{story_id}': {e}"); + return 0; + } + }; + let prefix = format!("{story_id}:"); + let keys_to_remove: Vec = agents + .keys() + .filter(|k| k.starts_with(&prefix)) + .cloned() + .collect(); + let count = keys_to_remove.len(); + for key in &keys_to_remove { + agents.remove(key); + } + if count > 0 { + slog!("[agents] Removed {count} agent entries for archived story '{story_id}'"); + } + count + } +} + +#[cfg(test)] +mod tests { + use super::super::AgentPool; + use crate::agents::AgentStatus; + + // ── remove_agents_for_story tests ──────────────────────────────────────── + + #[test] + fn remove_agents_for_story_removes_all_entries() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed); + pool.inject_test_agent("story_a", "qa", AgentStatus::Failed); + pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); + + let removed = pool.remove_agents_for_story("story_a"); + assert_eq!(removed, 2, "should remove both agents for story_a"); + + let agents = pool.list_agents().unwrap(); + assert_eq!(agents.len(), 1, "only story_b agent should remain"); + assert_eq!(agents[0].story_id, "story_b"); + } + + #[test] + fn remove_agents_for_story_returns_zero_when_no_match() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); + + let removed = pool.remove_agents_for_story("nonexistent"); + assert_eq!(removed, 0); + + let agents = pool.list_agents().unwrap(); + assert_eq!(agents.len(), 1, "existing agents should not be affected"); + } + + // ── archive + cleanup integration test ─────────────────────────────────── + + #[tokio::test] + async fn archiving_story_removes_agent_entries_from_pool() { + use crate::agents::lifecycle::move_story_to_done; + use std::fs; + + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path(); + + let current = root.join(".huskies/work/2_current"); + fs::create_dir_all(¤t).unwrap(); + fs::write(current.join("60_story_cleanup.md"), "test").unwrap(); + + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); + pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed); + pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running); + + assert_eq!(pool.list_agents().unwrap().len(), 3); + + move_story_to_done(root, "60_story_cleanup").unwrap(); + pool.remove_agents_for_story("60_story_cleanup"); + + let remaining = pool.list_agents().unwrap(); + assert_eq!( + remaining.len(), + 1, + "only the other story's agent should remain" + ); + assert_eq!(remaining[0].story_id, "61_story_other"); + + assert!( + root.join(".huskies/work/5_done/60_story_cleanup.md") + .exists() + ); + } +} diff --git a/server/src/agents/pool/wait.rs b/server/src/agents/pool/wait.rs new file mode 100644 index 00000000..d20a572f --- /dev/null +++ b/server/src/agents/pool/wait.rs @@ -0,0 +1,190 @@ +use super::super::{AgentEvent, AgentInfo, AgentStatus}; +use super::types::{agent_info_from_entry, composite_key}; +use super::AgentPool; + +use tokio::sync::broadcast; + +impl AgentPool { + /// Block until the agent reaches a terminal state (completed, failed, stopped). + /// Returns the agent's final `AgentInfo`. + /// `timeout_ms` caps how long to wait; returns an error if the deadline passes. + pub async fn wait_for_agent( + &self, + story_id: &str, + agent_name: &str, + timeout_ms: u64, + ) -> Result { + // Subscribe before checking status so we don't miss the terminal event + // if the agent completes in the window between the two operations. + let mut rx = self.subscribe(story_id, agent_name)?; + + // Return immediately if already in a terminal state. + { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + if let Some(agent) = agents.get(&key) + && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) + { + return Ok(agent_info_from_entry(story_id, agent)); + } + } + + let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms); + + loop { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + return Err(format!( + "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" + )); + } + + match tokio::time::timeout(remaining, rx.recv()).await { + Ok(Ok(event)) => { + let is_terminal = match &event { + AgentEvent::Done { .. } | AgentEvent::Error { .. } => true, + AgentEvent::Status { status, .. } if status == "stopped" => true, + _ => false, + }; + if is_terminal { + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + return Ok(if let Some(agent) = agents.get(&key) { + agent_info_from_entry(story_id, agent) + } else { + // Agent was removed from map (e.g. stop_agent removes it after + // the "stopped" status event is sent). + let (status, session_id) = match event { + AgentEvent::Done { session_id, .. } => { + (AgentStatus::Completed, session_id) + } + _ => (AgentStatus::Failed, None), + }; + AgentInfo { + story_id: story_id.to_string(), + agent_name: agent_name.to_string(), + status, + session_id, + worktree_path: None, + base_branch: None, + completion: None, + log_session_id: None, + throttled: false, + } + }); + } + } + Ok(Err(broadcast::error::RecvError::Lagged(_))) => { + // Missed some buffered events — check current status before resuming. + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + if let Some(agent) = agents.get(&key) + && matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed) + { + return Ok(agent_info_from_entry(story_id, agent)); + } + // Still running — continue the loop. + } + Ok(Err(broadcast::error::RecvError::Closed)) => { + // Channel closed: no more events will arrive. Return current state. + let agents = self.agents.lock().map_err(|e| e.to_string())?; + let key = composite_key(story_id, agent_name); + if let Some(agent) = agents.get(&key) { + return Ok(agent_info_from_entry(story_id, agent)); + } + return Err(format!( + "Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly" + )); + } + Err(_) => { + return Err(format!( + "Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'" + )); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::super::AgentPool; + use crate::agents::{AgentEvent, AgentStatus}; + + #[tokio::test] + async fn wait_for_agent_returns_immediately_if_completed() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s1", "bot", AgentStatus::Completed); + + let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); + assert_eq!(info.status, AgentStatus::Completed); + assert_eq!(info.story_id, "s1"); + assert_eq!(info.agent_name, "bot"); + } + + #[tokio::test] + async fn wait_for_agent_returns_immediately_if_failed() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s2", "bot", AgentStatus::Failed); + + let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); + assert_eq!(info.status, AgentStatus::Failed); + } + + #[tokio::test] + async fn wait_for_agent_completes_on_done_event() { + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); + + // Send Done event after a short delay + let tx_clone = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + let _ = tx_clone.send(AgentEvent::Done { + story_id: "s3".to_string(), + agent_name: "bot".to_string(), + session_id: Some("sess-abc".to_string()), + }); + }); + + let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap(); + assert_eq!(info.story_id, "s3"); + } + + #[tokio::test] + async fn wait_for_agent_times_out() { + let pool = AgentPool::new_test(3001); + pool.inject_test_agent("s4", "bot", AgentStatus::Running); + + let result = pool.wait_for_agent("s4", "bot", 50).await; + assert!(result.is_err()); + let msg = result.unwrap_err(); + assert!(msg.contains("Timed out"), "unexpected message: {msg}"); + } + + #[tokio::test] + async fn wait_for_agent_errors_for_nonexistent() { + let pool = AgentPool::new_test(3001); + let result = pool.wait_for_agent("no_story", "no_bot", 100).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn wait_for_agent_completes_on_stopped_status_event() { + let pool = AgentPool::new_test(3001); + let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); + + let tx_clone = tx.clone(); + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + let _ = tx_clone.send(AgentEvent::Status { + story_id: "s5".to_string(), + agent_name: "bot".to_string(), + status: "stopped".to_string(), + }); + }); + + let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap(); + assert_eq!(info.story_id, "s5"); + } +}