642 lines
20 KiB
Rust
642 lines
20 KiB
Rust
//! Scanning pipeline stages for work items and querying agent pool state.
|
|
|
|
use crate::config::ProjectConfig;
|
|
use std::collections::HashMap;
|
|
use std::path::Path;
|
|
|
|
use super::super::super::{AgentStatus, PipelineStage, agent_config_stage, pipeline_stage};
|
|
use super::super::StoryAgent;
|
|
|
|
/// Return `true` if `agent_name` has no active (pending/running) entry in the pool.
|
|
pub(in crate::agents::pool) fn is_agent_free(
|
|
agents: &HashMap<String, StoryAgent>,
|
|
agent_name: &str,
|
|
) -> bool {
|
|
!agents.values().any(|a| {
|
|
a.agent_name == agent_name
|
|
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
|
|
})
|
|
}
|
|
|
|
pub(super) fn scan_stage_items(_project_root: &Path, stage_dir: &str) -> Vec<String> {
|
|
use std::collections::BTreeSet;
|
|
let mut items = BTreeSet::new();
|
|
|
|
// Accept legacy directory-style strings (`"2_current"`, `"4_merge"`,
|
|
// etc.) at the boundary; `Stage::from_dir` itself is strict post-934
|
|
// stage 6, so we normalise here.
|
|
let normalised = match stage_dir {
|
|
"0_upcoming" => "upcoming",
|
|
"1_backlog" => "backlog",
|
|
"2_current" => "coding",
|
|
"2_blocked" => "blocked",
|
|
"3_qa" => "qa",
|
|
"4_merge" => "merge",
|
|
"4_merge_failure" => "merge_failure",
|
|
"5_done" => "done",
|
|
"6_archived" => "archived",
|
|
other => other,
|
|
};
|
|
let Some(want) = crate::pipeline_state::Stage::from_dir(normalised) else {
|
|
return Vec::new();
|
|
};
|
|
|
|
// CRDT is the only source of truth — no filesystem fallback.
|
|
for item in crate::pipeline_state::read_all_typed() {
|
|
if std::mem::discriminant(&item.stage) == std::mem::discriminant(&want) {
|
|
items.insert(item.story_id.0.clone());
|
|
}
|
|
}
|
|
|
|
items.into_iter().collect()
|
|
}
|
|
|
|
/// Return `true` if `story_id` has any active (pending/running) agent matching `stage`.
|
|
///
|
|
/// Uses the explicit `stage` config field when the agent is found in `config`;
|
|
/// falls back to the legacy name-based heuristic for unlisted agents.
|
|
pub(super) fn is_story_assigned_for_stage(
|
|
config: &ProjectConfig,
|
|
agents: &HashMap<String, StoryAgent>,
|
|
story_id: &str,
|
|
stage: &PipelineStage,
|
|
) -> bool {
|
|
agents.iter().any(|(key, agent)| {
|
|
// Composite key format: "{story_id}:{agent_name}"
|
|
let key_story_id = key.rsplit_once(':').map(|(sid, _)| sid).unwrap_or(key);
|
|
let agent_stage = config
|
|
.find_agent(&agent.agent_name)
|
|
.map(agent_config_stage)
|
|
.unwrap_or_else(|| pipeline_stage(&agent.agent_name));
|
|
key_story_id == story_id
|
|
&& agent_stage == *stage
|
|
&& matches!(agent.status, AgentStatus::Running | AgentStatus::Pending)
|
|
})
|
|
}
|
|
|
|
/// Count active (pending/running) agents for a given pipeline stage.
|
|
pub(super) fn count_active_agents_for_stage(
|
|
config: &ProjectConfig,
|
|
agents: &HashMap<String, StoryAgent>,
|
|
stage: &PipelineStage,
|
|
) -> usize {
|
|
agents
|
|
.values()
|
|
.filter(|a| {
|
|
matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
|
|
&& config
|
|
.find_agent(&a.agent_name)
|
|
.map(|cfg| agent_config_stage(cfg) == *stage)
|
|
.unwrap_or_else(|| pipeline_stage(&a.agent_name) == *stage)
|
|
})
|
|
.count()
|
|
}
|
|
|
|
/// Find the first configured agent for `stage` that has no active (pending/running) assignment.
|
|
/// Returns `None` if all agents for that stage are busy, none are configured,
|
|
/// or the `max_coders` limit has been reached (for the Coder stage).
|
|
///
|
|
/// For the Coder stage, when `default_coder_model` is set, only considers agents whose
|
|
/// model matches the default. This ensures opus-class agents are reserved for explicit
|
|
/// front-matter requests.
|
|
pub(in crate::agents::pool) fn find_free_agent_for_stage<'a>(
|
|
config: &'a ProjectConfig,
|
|
agents: &HashMap<String, StoryAgent>,
|
|
stage: &PipelineStage,
|
|
) -> Option<&'a str> {
|
|
// Enforce max_coders limit for the Coder stage.
|
|
if *stage == PipelineStage::Coder
|
|
&& let Some(max) = config.max_coders
|
|
{
|
|
let active = count_active_agents_for_stage(config, agents, stage);
|
|
if active >= max {
|
|
return None;
|
|
}
|
|
}
|
|
|
|
for agent_config in &config.agent {
|
|
if agent_config_stage(agent_config) != *stage {
|
|
continue;
|
|
}
|
|
// When default_coder_model is set, only auto-assign coder agents whose
|
|
// model matches. This keeps opus agents reserved for explicit requests.
|
|
if *stage == PipelineStage::Coder
|
|
&& let Some(ref default_model) = config.default_coder_model
|
|
{
|
|
let agent_model = agent_config.model.as_deref().unwrap_or("");
|
|
if agent_model != default_model {
|
|
continue;
|
|
}
|
|
}
|
|
let is_busy = agents.values().any(|a| {
|
|
a.agent_name == agent_config.name
|
|
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
|
|
});
|
|
if !is_busy {
|
|
return Some(&agent_config.name);
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
// ── Tests ──────────────────────────────────────────────────────────────────
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::config::ProjectConfig;
|
|
use std::sync::{Arc, Mutex};
|
|
use tokio::sync::broadcast;
|
|
|
|
use super::super::super::AgentPool;
|
|
|
|
fn make_config(toml_str: &str) -> ProjectConfig {
|
|
ProjectConfig::parse(toml_str).unwrap()
|
|
}
|
|
|
|
fn make_test_story_agent(agent_name: &str, status: AgentStatus) -> StoryAgent {
|
|
StoryAgent {
|
|
agent_name: agent_name.to_string(),
|
|
status,
|
|
worktree_info: None,
|
|
session_id: None,
|
|
tx: broadcast::channel(1).0,
|
|
task_handle: None,
|
|
event_log: Arc::new(Mutex::new(Vec::new())),
|
|
completion: None,
|
|
project_root: None,
|
|
log_session_id: None,
|
|
merge_failure_reported: false,
|
|
throttled: false,
|
|
termination_reason: None,
|
|
status_buffer: None,
|
|
}
|
|
}
|
|
|
|
// ── Bug 556: stale filesystem shadow must not override CRDT stage ──────────
|
|
//
|
|
// A story file left in 1_backlog/ on disk but tracked as 6_archived in the
|
|
// CRDT must NOT appear when scanning 1_backlog. Without the fix, the
|
|
// filesystem fallback would add it, causing promote_ready_backlog_stories to
|
|
// attempt to promote an archived story.
|
|
#[test]
|
|
fn scan_stage_items_skips_filesystem_item_known_to_crdt_at_different_stage() {
|
|
crate::db::ensure_content_store();
|
|
// Write the story into the CRDT as 6_archived.
|
|
crate::db::write_item_with_content(
|
|
"9970_story_archived",
|
|
"6_archived",
|
|
"---\nname: Archived\n---\n",
|
|
crate::db::ItemMeta::named("Archived"),
|
|
);
|
|
|
|
// Also place a stale .md file in a temp 1_backlog/ dir.
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let backlog = tmp.path().join(".huskies/work/1_backlog");
|
|
std::fs::create_dir_all(&backlog).unwrap();
|
|
std::fs::write(
|
|
backlog.join("9970_story_archived.md"),
|
|
"---\nname: Archived\n---\n",
|
|
)
|
|
.unwrap();
|
|
|
|
let items = scan_stage_items(tmp.path(), "1_backlog");
|
|
assert!(
|
|
!items.contains(&"9970_story_archived".to_string()),
|
|
"archived CRDT story must not appear in 1_backlog scan via stale filesystem shadow"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn scan_stage_items_returns_empty_for_missing_dir() {
|
|
// Use a unique stage name that no other test writes to, so
|
|
// the global CRDT store won't contribute stale items.
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let items = scan_stage_items(tmp.path(), "9_nonexistent");
|
|
assert!(items.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn scan_stage_items_returns_sorted_story_ids() {
|
|
// Write items via the CRDT store (the primary source of truth).
|
|
crate::db::ensure_content_store();
|
|
crate::db::write_item_with_content(
|
|
"9942_story_foo",
|
|
"2_current",
|
|
"---\nname: foo\n---",
|
|
crate::db::ItemMeta::named("foo"),
|
|
);
|
|
crate::db::write_item_with_content(
|
|
"9940_story_bar",
|
|
"2_current",
|
|
"---\nname: bar\n---",
|
|
crate::db::ItemMeta::named("bar"),
|
|
);
|
|
crate::db::write_item_with_content(
|
|
"9935_story_baz",
|
|
"2_current",
|
|
"---\nname: baz\n---",
|
|
crate::db::ItemMeta::named("baz"),
|
|
);
|
|
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let items = scan_stage_items(tmp.path(), "2_current");
|
|
// The global CRDT may contain items from other tests, so check
|
|
// that our three items are present and appear in sorted order.
|
|
assert!(
|
|
items.iter().any(|id| id == "9935_story_baz"),
|
|
"9935_story_baz should be in results"
|
|
);
|
|
assert!(
|
|
items.iter().any(|id| id == "9940_story_bar"),
|
|
"9940_story_bar should be in results"
|
|
);
|
|
assert!(
|
|
items.iter().any(|id| id == "9942_story_foo"),
|
|
"9942_story_foo should be in results"
|
|
);
|
|
// Verify sorted order: BTreeSet produces lexicographic order.
|
|
let positions: Vec<usize> = ["9935_story_baz", "9940_story_bar", "9942_story_foo"]
|
|
.iter()
|
|
.filter_map(|id| items.iter().position(|x| x == id))
|
|
.collect();
|
|
assert_eq!(positions.len(), 3, "all three items must be found");
|
|
assert!(
|
|
positions[0] < positions[1] && positions[1] < positions[2],
|
|
"items should appear in sorted order: positions = {positions:?}"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn is_story_assigned_returns_true_for_running_coder() {
|
|
let config = ProjectConfig::default();
|
|
let pool = AgentPool::new_test(3001);
|
|
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running);
|
|
|
|
let agents = pool.agents.lock().unwrap();
|
|
assert!(is_story_assigned_for_stage(
|
|
&config,
|
|
&agents,
|
|
"42_story_foo",
|
|
&PipelineStage::Coder
|
|
));
|
|
// Same story but wrong stage — should be false
|
|
assert!(!is_story_assigned_for_stage(
|
|
&config,
|
|
&agents,
|
|
"42_story_foo",
|
|
&PipelineStage::Qa
|
|
));
|
|
// Different story — should be false
|
|
assert!(!is_story_assigned_for_stage(
|
|
&config,
|
|
&agents,
|
|
"99_story_other",
|
|
&PipelineStage::Coder
|
|
));
|
|
}
|
|
|
|
#[test]
|
|
fn is_story_assigned_returns_false_for_completed_agent() {
|
|
let config = ProjectConfig::default();
|
|
let pool = AgentPool::new_test(3001);
|
|
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Completed);
|
|
|
|
let agents = pool.agents.lock().unwrap();
|
|
// Completed agents don't count as assigned
|
|
assert!(!is_story_assigned_for_stage(
|
|
&config,
|
|
&agents,
|
|
"42_story_foo",
|
|
&PipelineStage::Coder
|
|
));
|
|
}
|
|
|
|
#[test]
|
|
fn is_story_assigned_uses_config_stage_field_for_nonstandard_names() {
|
|
let config = ProjectConfig::parse(
|
|
r#"
|
|
[[agent]]
|
|
name = "qa-2"
|
|
stage = "qa"
|
|
"#,
|
|
)
|
|
.unwrap();
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
pool.inject_test_agent("42_story_foo", "qa-2", AgentStatus::Running);
|
|
|
|
let agents = pool.agents.lock().unwrap();
|
|
// qa-2 with stage=qa should be recognised as a QA agent
|
|
assert!(
|
|
is_story_assigned_for_stage(&config, &agents, "42_story_foo", &PipelineStage::Qa),
|
|
"qa-2 should be detected as assigned to QA stage"
|
|
);
|
|
// Should NOT appear as a coder
|
|
assert!(
|
|
!is_story_assigned_for_stage(&config, &agents, "42_story_foo", &PipelineStage::Coder),
|
|
"qa-2 should not be detected as a coder"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn find_free_agent_returns_none_when_all_busy() {
|
|
let config = ProjectConfig::parse(
|
|
r#"
|
|
[[agent]]
|
|
name = "coder-1"
|
|
[[agent]]
|
|
name = "coder-2"
|
|
"#,
|
|
)
|
|
.unwrap();
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
|
|
pool.inject_test_agent("s2", "coder-2", AgentStatus::Running);
|
|
|
|
let agents = pool.agents.lock().unwrap();
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert!(free.is_none(), "no free coders should be available");
|
|
}
|
|
|
|
#[test]
|
|
fn find_free_agent_returns_first_free_coder() {
|
|
let config = ProjectConfig::parse(
|
|
r#"
|
|
[[agent]]
|
|
name = "coder-1"
|
|
[[agent]]
|
|
name = "coder-2"
|
|
[[agent]]
|
|
name = "coder-3"
|
|
"#,
|
|
)
|
|
.unwrap();
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
// coder-1 is busy, coder-2 is free
|
|
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
|
|
|
|
let agents = pool.agents.lock().unwrap();
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert_eq!(
|
|
free,
|
|
Some("coder-2"),
|
|
"coder-2 should be the first free coder"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn find_free_agent_ignores_completed_agents() {
|
|
let config = ProjectConfig::parse(
|
|
r#"
|
|
[[agent]]
|
|
name = "coder-1"
|
|
"#,
|
|
)
|
|
.unwrap();
|
|
|
|
let pool = AgentPool::new_test(3001);
|
|
// coder-1 completed its previous story — it's free for a new one
|
|
pool.inject_test_agent("s1", "coder-1", AgentStatus::Completed);
|
|
|
|
let agents = pool.agents.lock().unwrap();
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert_eq!(free, Some("coder-1"), "completed coder-1 should be free");
|
|
}
|
|
|
|
#[test]
|
|
fn find_free_agent_returns_none_for_wrong_stage() {
|
|
let config = ProjectConfig::parse(
|
|
r#"
|
|
[[agent]]
|
|
name = "qa"
|
|
"#,
|
|
)
|
|
.unwrap();
|
|
|
|
let agents: HashMap<String, StoryAgent> = HashMap::new();
|
|
// Looking for a Coder but only QA is configured
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert!(free.is_none());
|
|
// Looking for QA should find it
|
|
let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa);
|
|
assert_eq!(free_qa, Some("qa"));
|
|
}
|
|
|
|
#[test]
|
|
fn find_free_agent_uses_config_stage_field_not_name() {
|
|
// Agents named "qa-2" and "coder-opus" don't match the legacy name heuristic
|
|
// but should be picked up via their explicit stage field.
|
|
let config = ProjectConfig::parse(
|
|
r#"
|
|
[[agent]]
|
|
name = "qa-2"
|
|
stage = "qa"
|
|
|
|
[[agent]]
|
|
name = "coder-opus"
|
|
stage = "coder"
|
|
"#,
|
|
)
|
|
.unwrap();
|
|
|
|
let agents: HashMap<String, StoryAgent> = HashMap::new();
|
|
|
|
// qa-2 should be found for PipelineStage::Qa via config stage field
|
|
let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa);
|
|
assert_eq!(free_qa, Some("qa-2"), "qa-2 with stage=qa should be found");
|
|
|
|
// coder-opus should be found for PipelineStage::Coder via config stage field
|
|
let free_coder = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert_eq!(
|
|
free_coder,
|
|
Some("coder-opus"),
|
|
"coder-opus with stage=coder should be found"
|
|
);
|
|
|
|
// Neither should match the other stage
|
|
let free_merge = find_free_agent_for_stage(&config, &agents, &PipelineStage::Mergemaster);
|
|
assert!(free_merge.is_none());
|
|
}
|
|
|
|
// ── find_free_agent_for_stage: default_coder_model filtering ─────────
|
|
|
|
#[test]
|
|
fn find_free_agent_skips_opus_when_default_coder_model_set() {
|
|
let config = make_config(
|
|
r#"
|
|
default_coder_model = "sonnet"
|
|
|
|
[[agent]]
|
|
name = "coder-1"
|
|
stage = "coder"
|
|
model = "sonnet"
|
|
|
|
[[agent]]
|
|
name = "coder-opus"
|
|
stage = "coder"
|
|
model = "opus"
|
|
"#,
|
|
);
|
|
|
|
let agents = HashMap::new();
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert_eq!(free, Some("coder-1"));
|
|
}
|
|
|
|
#[test]
|
|
fn find_free_agent_returns_opus_when_no_default_coder_model() {
|
|
let config = make_config(
|
|
r#"
|
|
[[agent]]
|
|
name = "coder-opus"
|
|
stage = "coder"
|
|
model = "opus"
|
|
"#,
|
|
);
|
|
|
|
let agents = HashMap::new();
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert_eq!(free, Some("coder-opus"));
|
|
}
|
|
|
|
#[test]
|
|
fn find_free_agent_returns_none_when_all_sonnet_coders_busy() {
|
|
let config = make_config(
|
|
r#"
|
|
default_coder_model = "sonnet"
|
|
|
|
[[agent]]
|
|
name = "coder-1"
|
|
stage = "coder"
|
|
model = "sonnet"
|
|
|
|
[[agent]]
|
|
name = "coder-opus"
|
|
stage = "coder"
|
|
model = "opus"
|
|
"#,
|
|
);
|
|
|
|
let mut agents = HashMap::new();
|
|
agents.insert(
|
|
"story1:coder-1".to_string(),
|
|
make_test_story_agent("coder-1", AgentStatus::Running),
|
|
);
|
|
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert_eq!(free, None, "opus agent should not be auto-assigned");
|
|
}
|
|
|
|
// ── find_free_agent_for_stage: max_coders limit ─────────────────────
|
|
|
|
#[test]
|
|
fn find_free_agent_respects_max_coders() {
|
|
let config = make_config(
|
|
r#"
|
|
max_coders = 1
|
|
|
|
[[agent]]
|
|
name = "coder-1"
|
|
stage = "coder"
|
|
model = "sonnet"
|
|
|
|
[[agent]]
|
|
name = "coder-2"
|
|
stage = "coder"
|
|
model = "sonnet"
|
|
"#,
|
|
);
|
|
|
|
let mut agents = HashMap::new();
|
|
agents.insert(
|
|
"story1:coder-1".to_string(),
|
|
make_test_story_agent("coder-1", AgentStatus::Running),
|
|
);
|
|
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert_eq!(free, None, "max_coders=1 should block second coder");
|
|
}
|
|
|
|
#[test]
|
|
fn find_free_agent_allows_within_max_coders() {
|
|
let config = make_config(
|
|
r#"
|
|
max_coders = 2
|
|
|
|
[[agent]]
|
|
name = "coder-1"
|
|
stage = "coder"
|
|
model = "sonnet"
|
|
|
|
[[agent]]
|
|
name = "coder-2"
|
|
stage = "coder"
|
|
model = "sonnet"
|
|
"#,
|
|
);
|
|
|
|
let mut agents = HashMap::new();
|
|
agents.insert(
|
|
"story1:coder-1".to_string(),
|
|
make_test_story_agent("coder-1", AgentStatus::Running),
|
|
);
|
|
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert_eq!(free, Some("coder-2"));
|
|
}
|
|
|
|
#[test]
|
|
fn max_coders_does_not_affect_qa_stage() {
|
|
let config = make_config(
|
|
r#"
|
|
max_coders = 1
|
|
|
|
[[agent]]
|
|
name = "qa"
|
|
stage = "qa"
|
|
model = "sonnet"
|
|
"#,
|
|
);
|
|
|
|
let agents = HashMap::new();
|
|
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa);
|
|
assert_eq!(free, Some("qa"));
|
|
}
|
|
|
|
// ── count_active_agents_for_stage ────────────────────────────────────
|
|
|
|
#[test]
|
|
fn count_active_agents_counts_running_and_pending() {
|
|
let config = make_config(
|
|
r#"
|
|
[[agent]]
|
|
name = "coder-1"
|
|
stage = "coder"
|
|
|
|
[[agent]]
|
|
name = "coder-2"
|
|
stage = "coder"
|
|
"#,
|
|
);
|
|
|
|
let mut agents = HashMap::new();
|
|
agents.insert(
|
|
"s1:coder-1".to_string(),
|
|
make_test_story_agent("coder-1", AgentStatus::Running),
|
|
);
|
|
agents.insert(
|
|
"s2:coder-2".to_string(),
|
|
make_test_story_agent("coder-2", AgentStatus::Completed),
|
|
);
|
|
|
|
let count = count_active_agents_for_stage(&config, &agents, &PipelineStage::Coder);
|
|
assert_eq!(
|
|
count, 1,
|
|
"Only Running coder should be counted, not Completed"
|
|
);
|
|
}
|
|
}
|