845b85e7a7
cargo fmt without --all fails with "Failed to find targets" in workspace repos. This was blocking every story's gates. Also ran cargo fmt --all to fix all existing formatting issues. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
192 lines
7.9 KiB
Rust
192 lines
7.9 KiB
Rust
//! Agent wait — blocks until an agent reaches a terminal state with optional timeout.
|
|
use super::super::{AgentEvent, AgentInfo, AgentStatus};
|
|
use super::AgentPool;
|
|
use super::types::{agent_info_from_entry, composite_key};
|
|
|
|
use tokio::sync::broadcast;
|
|
|
|
impl AgentPool {
|
|
/// Block until the agent reaches a terminal state (completed, failed, stopped).
|
|
/// Returns the agent's final `AgentInfo`.
|
|
/// `timeout_ms` caps how long to wait; returns an error if the deadline passes.
|
|
pub async fn wait_for_agent(
|
|
&self,
|
|
story_id: &str,
|
|
agent_name: &str,
|
|
timeout_ms: u64,
|
|
) -> Result<AgentInfo, String> {
|
|
// Subscribe before checking status so we don't miss the terminal event
|
|
// if the agent completes in the window between the two operations.
|
|
let mut rx = self.subscribe(story_id, agent_name)?;
|
|
|
|
// Return immediately if already in a terminal state.
|
|
{
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
let key = composite_key(story_id, agent_name);
|
|
if let Some(agent) = agents.get(&key)
|
|
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
|
|
{
|
|
return Ok(agent_info_from_entry(story_id, agent));
|
|
}
|
|
}
|
|
|
|
let deadline = tokio::time::Instant::now() + std::time::Duration::from_millis(timeout_ms);
|
|
|
|
loop {
|
|
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
|
if remaining.is_zero() {
|
|
return Err(format!(
|
|
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
|
|
));
|
|
}
|
|
|
|
match tokio::time::timeout(remaining, rx.recv()).await {
|
|
Ok(Ok(event)) => {
|
|
let is_terminal = match &event {
|
|
AgentEvent::Done { .. } | AgentEvent::Error { .. } => true,
|
|
AgentEvent::Status { status, .. } if status == "stopped" => true,
|
|
_ => false,
|
|
};
|
|
if is_terminal {
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
let key = composite_key(story_id, agent_name);
|
|
return Ok(if let Some(agent) = agents.get(&key) {
|
|
agent_info_from_entry(story_id, agent)
|
|
} else {
|
|
// Agent was removed from map (e.g. stop_agent removes it after
|
|
// the "stopped" status event is sent).
|
|
let (status, session_id) = match event {
|
|
AgentEvent::Done { session_id, .. } => {
|
|
(AgentStatus::Completed, session_id)
|
|
}
|
|
_ => (AgentStatus::Failed, None),
|
|
};
|
|
AgentInfo {
|
|
story_id: story_id.to_string(),
|
|
agent_name: agent_name.to_string(),
|
|
status,
|
|
session_id,
|
|
worktree_path: None,
|
|
base_branch: None,
|
|
completion: None,
|
|
log_session_id: None,
|
|
throttled: false,
|
|
}
|
|
});
|
|
}
|
|
}
|
|
Ok(Err(broadcast::error::RecvError::Lagged(_))) => {
|
|
// Missed some buffered events — check current status before resuming.
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
let key = composite_key(story_id, agent_name);
|
|
if let Some(agent) = agents.get(&key)
|
|
&& matches!(agent.status, AgentStatus::Completed | AgentStatus::Failed)
|
|
{
|
|
return Ok(agent_info_from_entry(story_id, agent));
|
|
}
|
|
// Still running — continue the loop.
|
|
}
|
|
Ok(Err(broadcast::error::RecvError::Closed)) => {
|
|
// Channel closed: no more events will arrive. Return current state.
|
|
let agents = self.agents.lock().map_err(|e| e.to_string())?;
|
|
let key = composite_key(story_id, agent_name);
|
|
if let Some(agent) = agents.get(&key) {
|
|
return Ok(agent_info_from_entry(story_id, agent));
|
|
}
|
|
return Err(format!(
|
|
"Agent '{agent_name}' for story '{story_id}' channel closed unexpectedly"
|
|
));
|
|
}
|
|
Err(_) => {
|
|
return Err(format!(
|
|
"Timed out after {timeout_ms}ms waiting for agent '{agent_name}' on story '{story_id}'"
|
|
));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::super::AgentPool;
|
|
use crate::agents::{AgentEvent, AgentStatus};
|
|
|
|
#[tokio::test]
|
|
async fn wait_for_agent_returns_immediately_if_completed() {
|
|
let pool = AgentPool::new_test(3001);
|
|
pool.inject_test_agent("s1", "bot", AgentStatus::Completed);
|
|
|
|
let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap();
|
|
assert_eq!(info.status, AgentStatus::Completed);
|
|
assert_eq!(info.story_id, "s1");
|
|
assert_eq!(info.agent_name, "bot");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn wait_for_agent_returns_immediately_if_failed() {
|
|
let pool = AgentPool::new_test(3001);
|
|
pool.inject_test_agent("s2", "bot", AgentStatus::Failed);
|
|
|
|
let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap();
|
|
assert_eq!(info.status, AgentStatus::Failed);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn wait_for_agent_completes_on_done_event() {
|
|
let pool = AgentPool::new_test(3001);
|
|
let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running);
|
|
|
|
// Send Done event after a short delay
|
|
let tx_clone = tx.clone();
|
|
tokio::spawn(async move {
|
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
|
let _ = tx_clone.send(AgentEvent::Done {
|
|
story_id: "s3".to_string(),
|
|
agent_name: "bot".to_string(),
|
|
session_id: Some("sess-abc".to_string()),
|
|
});
|
|
});
|
|
|
|
let info = pool.wait_for_agent("s3", "bot", 2000).await.unwrap();
|
|
assert_eq!(info.story_id, "s3");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn wait_for_agent_times_out() {
|
|
let pool = AgentPool::new_test(3001);
|
|
pool.inject_test_agent("s4", "bot", AgentStatus::Running);
|
|
|
|
let result = pool.wait_for_agent("s4", "bot", 50).await;
|
|
assert!(result.is_err());
|
|
let msg = result.unwrap_err();
|
|
assert!(msg.contains("Timed out"), "unexpected message: {msg}");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn wait_for_agent_errors_for_nonexistent() {
|
|
let pool = AgentPool::new_test(3001);
|
|
let result = pool.wait_for_agent("no_story", "no_bot", 100).await;
|
|
assert!(result.is_err());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn wait_for_agent_completes_on_stopped_status_event() {
|
|
let pool = AgentPool::new_test(3001);
|
|
let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running);
|
|
|
|
let tx_clone = tx.clone();
|
|
tokio::spawn(async move {
|
|
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
|
|
let _ = tx_clone.send(AgentEvent::Status {
|
|
story_id: "s5".to_string(),
|
|
agent_name: "bot".to_string(),
|
|
status: "stopped".to_string(),
|
|
});
|
|
});
|
|
|
|
let info = pool.wait_for_agent("s5", "bot", 2000).await.unwrap();
|
|
assert_eq!(info.story_id, "s5");
|
|
}
|
|
}
|