story-kit: start 88_story_auto_assign_agents_to_available_work_on_server_startup

This commit is contained in:
Dave
2026-02-23 18:20:24 +00:00
parent 8b5704c8f6
commit 225073649b
4 changed files with 340 additions and 94 deletions

View File

@@ -0,0 +1,26 @@
---
name: "Auto-assign agents to available work"
test_plan: pending
---
# Story 88: Auto-assign agents to available work
## User Story
As a user, I want the server to automatically assign free agents to queued work items, both on startup and whenever an agent finishes its current task, so that work progresses continuously without manual intervention.
## Acceptance Criteria
- [ ] On server startup, scan work/2_current/ for stories with no running coder agent and auto-assign available coders
- [ ] On server startup, scan work/3_qa/ for items with no running QA agent and auto-start the QA agent
- [ ] On server startup, scan work/4_merge/ for items with no running mergemaster agent and auto-start the mergemaster agent
- [ ] When a coder agent completes (moves story to QA), check work/2_current/ for unassigned stories and assign the now-free coder to the next one
- [ ] When a QA or mergemaster agent completes, similarly check their respective queues for more work
- [ ] Auto-assignment respects the agent roster limits from project.toml (don't exceed max concurrent agents per role)
- [ ] Items in work/1_upcoming/ are NOT auto-started (they require explicit user action)
- [ ] Auto-assignment logs clearly which stories are being picked up and which agents are assigned
## Out of Scope
- Automatic promotion of items from upcoming to current (that remains a user decision)
- Priority ordering of queued work items (FIFO by filename is fine)

View File

@@ -604,6 +604,8 @@ impl AgentPool {
{ {
eprintln!("[pipeline] Failed to start qa agent for '{story_id}': {e}"); eprintln!("[pipeline] Failed to start qa agent for '{story_id}': {e}");
} }
// Coder slot is now free — pick up any other unassigned work in 2_current/.
self.auto_assign_available_work(&project_root).await;
} else { } else {
eprintln!( eprintln!(
"[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting." "[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting."
@@ -655,6 +657,8 @@ impl AgentPool {
{ {
eprintln!("[pipeline] Failed to start mergemaster for '{story_id}': {e}"); eprintln!("[pipeline] Failed to start mergemaster for '{story_id}': {e}");
} }
// QA slot is now free — pick up any other unassigned work in 3_qa/.
self.auto_assign_available_work(&project_root).await;
} else { } else {
eprintln!( eprintln!(
"[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA." "[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA."
@@ -714,6 +718,8 @@ impl AgentPool {
if let Err(e) = move_story_to_archived(&project_root, story_id) { if let Err(e) = move_story_to_archived(&project_root, story_id) {
eprintln!("[pipeline] Failed to archive '{story_id}': {e}"); eprintln!("[pipeline] Failed to archive '{story_id}': {e}");
} }
// Mergemaster slot is now free — pick up any other items in 4_merge/.
self.auto_assign_available_work(&project_root).await;
// TODO: Re-enable worktree cleanup once we have persistent agent logs. // TODO: Re-enable worktree cleanup once we have persistent agent logs.
// Removing worktrees destroys evidence needed to debug empty-commit agents. // Removing worktrees destroys evidence needed to debug empty-commit agents.
// let config = // let config =
@@ -1019,6 +1025,89 @@ impl AgentPool {
tx tx
} }
/// Automatically assign free agents to stories waiting in the active pipeline stages.
///
/// Scans `work/2_current/`, `work/3_qa/`, and `work/4_merge/` for items that have no
/// active agent and assigns the first free agent of the appropriate role. Items in
/// `work/1_upcoming/` are never auto-started.
///
/// Respects the configured agent roster: the maximum number of concurrently active agents
/// per role is bounded by the count of agents of that role defined in `project.toml`.
pub async fn auto_assign_available_work(&self, project_root: &Path) {
let config = match ProjectConfig::load(project_root) {
Ok(c) => c,
Err(e) => {
eprintln!("[auto-assign] Failed to load project config: {e}");
return;
}
};
// Process each active pipeline stage in order.
let stages: [(&str, PipelineStage); 3] = [
("2_current", PipelineStage::Coder),
("3_qa", PipelineStage::Qa),
("4_merge", PipelineStage::Mergemaster),
];
for (stage_dir, stage) in &stages {
let items = scan_stage_items(project_root, stage_dir);
if items.is_empty() {
continue;
}
for story_id in &items {
// Re-acquire the lock on each iteration to see state changes
// from previous start_agent calls in the same pass.
let (already_assigned, free_agent) = {
let agents = match self.agents.lock() {
Ok(a) => a,
Err(e) => {
eprintln!("[auto-assign] Failed to lock agents: {e}");
break;
}
};
let assigned = is_story_assigned_for_stage(&agents, story_id, stage);
let free = if assigned {
None
} else {
find_free_agent_for_stage(&config, &agents, stage)
.map(|s| s.to_string())
};
(assigned, free)
};
if already_assigned {
// Story already has an active agent — skip silently.
continue;
}
match free_agent {
Some(agent_name) => {
eprintln!(
"[auto-assign] Assigning '{agent_name}' to '{story_id}' in {stage_dir}/"
);
if let Err(e) = self
.start_agent(project_root, story_id, Some(&agent_name), None)
.await
{
eprintln!(
"[auto-assign] Failed to start '{agent_name}' for '{story_id}': {e}"
);
}
}
None => {
// No free agents of this type — stop scanning this stage.
eprintln!(
"[auto-assign] All {:?} agents busy; remaining items in {stage_dir}/ will wait.",
stage
);
break;
}
}
}
}
}
/// Test helper: inject an agent with a completion report and project_root /// Test helper: inject an agent with a completion report and project_root
/// for testing pipeline advance logic without spawning real agents. /// for testing pipeline advance logic without spawning real agents.
#[cfg(test)] #[cfg(test)]
@@ -1051,6 +1140,68 @@ impl AgentPool {
} }
} }
/// Scan a work pipeline stage directory and return story IDs, sorted alphabetically.
/// Returns an empty `Vec` if the directory does not exist.
fn scan_stage_items(project_root: &Path, stage_dir: &str) -> Vec<String> {
let dir = project_root
.join(".story_kit")
.join("work")
.join(stage_dir);
if !dir.is_dir() {
return Vec::new();
}
let mut items = Vec::new();
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("md")
&& let Some(stem) = path.file_stem().and_then(|s| s.to_str())
{
items.push(stem.to_string());
}
}
}
items.sort();
items
}
/// Return `true` if `story_id` has any active (pending/running) agent matching `stage`.
fn is_story_assigned_for_stage(
agents: &HashMap<String, StoryAgent>,
story_id: &str,
stage: &PipelineStage,
) -> bool {
agents.iter().any(|(key, agent)| {
// Composite key format: "{story_id}:{agent_name}"
let key_story_id = key.rsplit_once(':').map(|(sid, _)| sid).unwrap_or(key);
key_story_id == story_id
&& pipeline_stage(&agent.agent_name) == *stage
&& matches!(agent.status, AgentStatus::Running | AgentStatus::Pending)
})
}
/// Find the first configured agent for `stage` that has no active (pending/running) assignment.
/// Returns `None` if all agents for that stage are busy or none are configured.
fn find_free_agent_for_stage<'a>(
config: &'a ProjectConfig,
agents: &HashMap<String, StoryAgent>,
stage: &PipelineStage,
) -> Option<&'a str> {
for agent_config in &config.agent {
if pipeline_stage(&agent_config.name) != *stage {
continue;
}
let is_busy = agents.values().any(|a| {
a.agent_name == agent_config.name
&& matches!(a.status, AgentStatus::Running | AgentStatus::Pending)
});
if !is_busy {
return Some(&agent_config.name);
}
}
None
}
/// Server-owned completion: runs acceptance gates when an agent process exits /// Server-owned completion: runs acceptance gates when an agent process exits
/// normally, and advances the pipeline based on results. /// normally, and advances the pipeline based on results.
/// ///
@@ -2773,4 +2924,154 @@ mod tests {
"output should mention script/test_coverage" "output should mention script/test_coverage"
); );
} }
// ── auto-assign helper tests ───────────────────────────────────
#[test]
fn scan_stage_items_returns_empty_for_missing_dir() {
let tmp = tempfile::tempdir().unwrap();
let items = scan_stage_items(tmp.path(), "2_current");
assert!(items.is_empty());
}
#[test]
fn scan_stage_items_returns_sorted_story_ids() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let stage_dir = tmp.path().join(".story_kit").join("work").join("2_current");
fs::create_dir_all(&stage_dir).unwrap();
fs::write(stage_dir.join("42_story_foo.md"), "---\nname: foo\n---").unwrap();
fs::write(stage_dir.join("10_story_bar.md"), "---\nname: bar\n---").unwrap();
fs::write(stage_dir.join("5_story_baz.md"), "---\nname: baz\n---").unwrap();
// non-md file should be ignored
fs::write(stage_dir.join("README.txt"), "ignore me").unwrap();
let items = scan_stage_items(tmp.path(), "2_current");
assert_eq!(items, vec!["10_story_bar", "42_story_foo", "5_story_baz"]);
}
#[test]
fn is_story_assigned_returns_true_for_running_coder() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
assert!(is_story_assigned_for_stage(
&agents,
"42_story_foo",
&PipelineStage::Coder
));
// Same story but wrong stage — should be false
assert!(!is_story_assigned_for_stage(
&agents,
"42_story_foo",
&PipelineStage::Qa
));
// Different story — should be false
assert!(!is_story_assigned_for_stage(
&agents,
"99_story_other",
&PipelineStage::Coder
));
}
#[test]
fn is_story_assigned_returns_false_for_completed_agent() {
let pool = AgentPool::new(3001);
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Completed);
let agents = pool.agents.lock().unwrap();
// Completed agents don't count as assigned
assert!(!is_story_assigned_for_stage(
&agents,
"42_story_foo",
&PipelineStage::Coder
));
}
#[test]
fn find_free_agent_returns_none_when_all_busy() {
use crate::config::ProjectConfig;
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "coder-1"
[[agent]]
name = "coder-2"
"#,
)
.unwrap();
let pool = AgentPool::new(3001);
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
pool.inject_test_agent("s2", "coder-2", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert!(free.is_none(), "no free coders should be available");
}
#[test]
fn find_free_agent_returns_first_free_coder() {
use crate::config::ProjectConfig;
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "coder-1"
[[agent]]
name = "coder-2"
[[agent]]
name = "coder-3"
"#,
)
.unwrap();
let pool = AgentPool::new(3001);
// coder-1 is busy, coder-2 is free
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, Some("coder-2"), "coder-2 should be the first free coder");
}
#[test]
fn find_free_agent_ignores_completed_agents() {
use crate::config::ProjectConfig;
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "coder-1"
"#,
)
.unwrap();
let pool = AgentPool::new(3001);
// coder-1 completed its previous story — it's free for a new one
pool.inject_test_agent("s1", "coder-1", AgentStatus::Completed);
let agents = pool.agents.lock().unwrap();
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert_eq!(free, Some("coder-1"), "completed coder-1 should be free");
}
#[test]
fn find_free_agent_returns_none_for_wrong_stage() {
use crate::config::ProjectConfig;
let config = ProjectConfig::parse(
r#"
[[agent]]
name = "qa"
"#,
)
.unwrap();
let agents: HashMap<String, StoryAgent> = HashMap::new();
// Looking for a Coder but only QA is configured
let free = find_free_agent_for_stage(&config, &agents, &PipelineStage::Coder);
assert!(free.is_none());
// Looking for QA should find it
let free_qa = find_free_agent_for_stage(&config, &agents, &PipelineStage::Qa);
assert_eq!(free_qa, Some("qa"));
}
} }

View File

@@ -322,100 +322,6 @@ fn run_pty_session(
let _ = writeln!(pty_writer, "{}", response); let _ = writeln!(pty_writer, "{}", response);
} }
} }
// Claude Code is requesting user approval before executing a tool.
// Forward the request to the async context via permission_tx and
// block until the user responds (or a 5-minute timeout elapses).
"permission_request" => {
let request_id = json
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tool_name = json
.get("tool_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let tool_input = json
.get("input")
.cloned()
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
if let Some(ref ptx) = permission_tx {
let (resp_tx, resp_rx) = std::sync::mpsc::sync_channel(1);
let _ = ptx.send(PermissionReqMsg {
request_id: request_id.clone(),
tool_name,
tool_input,
response_tx: resp_tx,
});
// Block until the user responds or a 5-minute timeout elapses.
let approved = resp_rx
.recv_timeout(std::time::Duration::from_secs(300))
.unwrap_or(false);
let response = serde_json::json!({
"type": "permission_response",
"id": request_id,
"approved": approved,
});
let _ = writeln!(pty_writer, "{}", response);
} else {
// No handler configured — deny by default.
let response = serde_json::json!({
"type": "permission_response",
"id": request_id,
"approved": false,
});
let _ = writeln!(pty_writer, "{}", response);
}
}
// Claude Code is requesting user approval before executing a tool.
// Forward the request to the async context via permission_tx and
// block until the user responds (or a 5-minute timeout elapses).
"permission_request" => {
let request_id = json
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let tool_name = json
.get("tool_name")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let tool_input = json
.get("input")
.cloned()
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
if let Some(ref ptx) = permission_tx {
let (resp_tx, resp_rx) = std::sync::mpsc::sync_channel(1);
let _ = ptx.send(PermissionReqMsg {
request_id: request_id.clone(),
tool_name,
tool_input,
response_tx: resp_tx,
});
// Block until the user responds or a 5-minute timeout elapses.
let approved = resp_rx
.recv_timeout(std::time::Duration::from_secs(300))
.unwrap_or(false);
let response = serde_json::json!({
"type": "permission_response",
"id": request_id,
"approved": approved,
});
let _ = writeln!(pty_writer, "{}", response);
} else {
// No handler configured — deny by default.
let response = serde_json::json!({
"type": "permission_response",
"id": request_id,
"approved": false,
});
let _ = writeln!(pty_writer, "{}", response);
}
}
_ => {} _ => {}
} }
} }

View File

@@ -64,6 +64,10 @@ async fn main() -> Result<(), std::io::Error> {
} }
} }
// Capture project root and agents Arc before ctx is consumed by build_routes.
let startup_root: Option<PathBuf> = app_state.project_root.lock().unwrap().clone();
let startup_agents = Arc::clone(&agents);
let ctx = AppContext { let ctx = AppContext {
state: app_state, state: app_state,
store, store,
@@ -73,6 +77,15 @@ async fn main() -> Result<(), std::io::Error> {
}; };
let app = build_routes(ctx); let app = build_routes(ctx);
// On startup, auto-assign free agents to any work already queued in the
// active pipeline stages (2_current/, 3_qa/, 4_merge/).
if let Some(root) = startup_root {
tokio::spawn(async move {
eprintln!("[auto-assign] Server startup: scanning pipeline stages for unassigned work.");
startup_agents.auto_assign_available_work(&root).await;
});
}
let addr = format!("127.0.0.1:{port}"); let addr = format!("127.0.0.1:{port}");
println!( println!(