Files
huskies/server/src/agents/pool/query.rs
T

167 lines
5.1 KiB
Rust
Raw Normal View History

use crate::config::ProjectConfig;
use std::path::PathBuf;
use tokio::sync::broadcast;
use super::super::{AgentEvent, AgentInfo, AgentStatus, PipelineStage, agent_config_stage};
use super::types::{agent_info_from_entry, composite_key};
use super::AgentPool;
impl AgentPool {
/// Return the names of configured agents for `stage` that are not currently
/// running or pending.
pub fn available_agents_for_stage(
&self,
config: &ProjectConfig,
stage: &PipelineStage,
) -> Result<Vec<String>, String> {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
Ok(config
.agent
.iter()
.filter(|cfg| agent_config_stage(cfg) == *stage)
.filter(|cfg| {
!agents.values().any(|a| {
a.agent_name == cfg.name
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
})
})
.map(|cfg| cfg.name.clone())
.collect())
}
/// List all agents with their status.
pub fn list_agents(&self) -> Result<Vec<AgentInfo>, String> {
let agents = self.agents.lock().map_err(|e| e.to_string())?;
Ok(agents
.iter()
.map(|(key, agent)| {
// Extract story_id from composite key "story_id:agent_name"
let story_id = key
.rsplit_once(':')
.map(|(sid, _)| sid.to_string())
.unwrap_or_else(|| key.clone());
agent_info_from_entry(&story_id, agent)
})
.collect())
}
/// Subscribe to events for a story agent.
pub fn subscribe(
&self,
story_id: &str,
agent_name: &str,
) -> Result<broadcast::Receiver<AgentEvent>, String> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
Ok(agent.tx.subscribe())
}
/// Drain accumulated events for polling. Returns all events since the last drain.
pub fn drain_events(
&self,
story_id: &str,
agent_name: &str,
) -> Result<Vec<AgentEvent>, String> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().map_err(|e| e.to_string())?;
let agent = agents
.get(&key)
.ok_or_else(|| format!("No agent '{agent_name}' for story '{story_id}'"))?;
let mut log = agent.event_log.lock().map_err(|e| e.to_string())?;
Ok(log.drain(..).collect())
}
/// Get the log session ID and project root for an agent, if available.
///
/// Used by MCP tools to find the persistent log file for a completed agent.
pub fn get_log_info(&self, story_id: &str, agent_name: &str) -> Option<(String, PathBuf)> {
let key = composite_key(story_id, agent_name);
let agents = self.agents.lock().ok()?;
let agent = agents.get(&key)?;
let session_id = agent.log_session_id.clone()?;
let project_root = agent.project_root.clone()?;
Some((session_id, project_root))
}
}
#[cfg(test)]
mod tests {
use super::super::AgentPool;
use crate::agents::{AgentStatus, PipelineStage};
use crate::config::ProjectConfig;
fn make_config(toml_str: &str) -> ProjectConfig {
ProjectConfig::parse(toml_str).unwrap()
}
#[test]
fn available_agents_for_stage_returns_idle_agents() {
let config = make_config(
r#"
[[agent]]
name = "coder-1"
stage = "coder"
[[agent]]
name = "coder-2"
stage = "coder"
[[agent]]
name = "qa"
stage = "qa"
"#,
);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
let available = pool
.available_agents_for_stage(&config, &PipelineStage::Coder)
.unwrap();
assert_eq!(available, vec!["coder-2"]);
let available_qa = pool
.available_agents_for_stage(&config, &PipelineStage::Qa)
.unwrap();
assert_eq!(available_qa, vec!["qa"]);
}
#[test]
fn available_agents_for_stage_returns_empty_when_all_busy() {
let config = make_config(
r#"
[[agent]]
name = "coder-1"
stage = "coder"
"#,
);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Running);
let available = pool
.available_agents_for_stage(&config, &PipelineStage::Coder)
.unwrap();
assert!(available.is_empty());
}
#[test]
fn available_agents_for_stage_ignores_completed_agents() {
let config = make_config(
r#"
[[agent]]
name = "coder-1"
stage = "coder"
"#,
);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("story-1", "coder-1", AgentStatus::Completed);
let available = pool
.available_agents_for_stage(&config, &PipelineStage::Coder)
.unwrap();
assert_eq!(available, vec!["coder-1"]);
}
}