storkit: merge 428_refactor_split_pool_pipeline_rs_into_submodules
This commit is contained in:
@@ -0,0 +1,785 @@
|
||||
use crate::config::ProjectConfig;
|
||||
use crate::slog;
|
||||
use crate::slog_error;
|
||||
use crate::slog_warn;
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
use std::collections::HashMap;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use super::super::super::{
|
||||
CompletionReport, PipelineStage,
|
||||
agent_config_stage, pipeline_stage,
|
||||
};
|
||||
use super::super::{AgentPool, StoryAgent};
|
||||
|
||||
impl AgentPool {
|
||||
/// Pipeline advancement: after an agent completes, move the story to
|
||||
/// the next pipeline stage and start the appropriate agent.
|
||||
pub(super) async fn run_pipeline_advance(
|
||||
&self,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
completion: CompletionReport,
|
||||
project_root: Option<PathBuf>,
|
||||
worktree_path: Option<PathBuf>,
|
||||
merge_failure_reported: bool,
|
||||
) {
|
||||
let project_root = match project_root {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
slog_warn!("[pipeline] No project_root for '{story_id}:{agent_name}'");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let config = ProjectConfig::load(&project_root).unwrap_or_default();
|
||||
let stage = config
|
||||
.find_agent(agent_name)
|
||||
.map(agent_config_stage)
|
||||
.unwrap_or_else(|| pipeline_stage(agent_name));
|
||||
|
||||
match stage {
|
||||
PipelineStage::Other => {
|
||||
// Supervisors and unknown agents do not advance the pipeline.
|
||||
}
|
||||
PipelineStage::Coder => {
|
||||
if completion.gates_passed {
|
||||
// Determine effective QA mode for this story.
|
||||
let qa_mode = {
|
||||
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
|
||||
if item_type == "spike" {
|
||||
crate::io::story_metadata::QaMode::Human
|
||||
} else {
|
||||
let default_qa = config.default_qa_mode();
|
||||
// Story is in 2_current/ when a coder completes.
|
||||
let story_path = project_root
|
||||
.join(".storkit/work/2_current")
|
||||
.join(format!("{story_id}.md"));
|
||||
crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa)
|
||||
}
|
||||
};
|
||||
|
||||
match qa_mode {
|
||||
crate::io::story_metadata::QaMode::Server => {
|
||||
slog!(
|
||||
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
|
||||
qa: server — moving directly to merge."
|
||||
);
|
||||
if let Err(e) =
|
||||
crate::agents::lifecycle::move_story_to_merge(&project_root, story_id)
|
||||
{
|
||||
slog_error!(
|
||||
"[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
|
||||
);
|
||||
} else if let Err(e) = self
|
||||
.start_agent(&project_root, story_id, Some("mergemaster"), None)
|
||||
.await
|
||||
{
|
||||
slog_error!(
|
||||
"[pipeline] Failed to start mergemaster for '{story_id}': {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
crate::io::story_metadata::QaMode::Agent => {
|
||||
slog!(
|
||||
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
|
||||
qa: agent — moving to QA."
|
||||
);
|
||||
if let Err(e) = crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) {
|
||||
slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
|
||||
} else if let Err(e) = self
|
||||
.start_agent(&project_root, story_id, Some("qa"), None)
|
||||
.await
|
||||
{
|
||||
slog_error!("[pipeline] Failed to start qa agent for '{story_id}': {e}");
|
||||
}
|
||||
}
|
||||
crate::io::story_metadata::QaMode::Human => {
|
||||
slog!(
|
||||
"[pipeline] Coder '{agent_name}' passed gates for '{story_id}'. \
|
||||
qa: human — holding for human review."
|
||||
);
|
||||
if let Err(e) = crate::agents::lifecycle::move_story_to_qa(&project_root, story_id) {
|
||||
slog_error!("[pipeline] Failed to move '{story_id}' to 3_qa/: {e}");
|
||||
} else {
|
||||
let qa_dir = project_root.join(".storkit/work/3_qa");
|
||||
let story_path = qa_dir.join(format!("{story_id}.md"));
|
||||
if let Err(e) =
|
||||
crate::io::story_metadata::write_review_hold(&story_path)
|
||||
{
|
||||
slog_error!(
|
||||
"[pipeline] Failed to set review_hold on '{story_id}': {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Increment retry count and check if blocked.
|
||||
let story_path = project_root
|
||||
.join(".storkit/work/2_current")
|
||||
.join(format!("{story_id}.md"));
|
||||
if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "coder") {
|
||||
// Story has exceeded retry limit — do not restart.
|
||||
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
|
||||
story_id: story_id.to_string(),
|
||||
reason,
|
||||
});
|
||||
} else {
|
||||
slog!(
|
||||
"[pipeline] Coder '{agent_name}' failed gates for '{story_id}'. Restarting."
|
||||
);
|
||||
let context = format!(
|
||||
"\n\n---\n## Previous Attempt Failed\n\
|
||||
The acceptance gates failed with the following output:\n{}\n\n\
|
||||
Please review the failures above, fix the issues, and try again.",
|
||||
completion.gate_output
|
||||
);
|
||||
if let Err(e) = self
|
||||
.start_agent(&project_root, story_id, Some(agent_name), Some(&context))
|
||||
.await
|
||||
{
|
||||
slog_error!(
|
||||
"[pipeline] Failed to restart coder '{agent_name}' for '{story_id}': {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
PipelineStage::Qa => {
|
||||
if completion.gates_passed {
|
||||
// Run coverage gate in the QA worktree before advancing to merge.
|
||||
let coverage_path = worktree_path
|
||||
.clone()
|
||||
.unwrap_or_else(|| project_root.clone());
|
||||
let cp = coverage_path.clone();
|
||||
let coverage_result =
|
||||
tokio::task::spawn_blocking(move || crate::agents::gates::run_coverage_gate(&cp))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
slog_warn!("[pipeline] Coverage gate task panicked: {e}");
|
||||
Ok((false, format!("Coverage gate task panicked: {e}")))
|
||||
});
|
||||
let (coverage_passed, coverage_output) = match coverage_result {
|
||||
Ok(pair) => pair,
|
||||
Err(e) => (false, e),
|
||||
};
|
||||
|
||||
if coverage_passed {
|
||||
// Check whether this item needs human review before merging.
|
||||
let needs_human_review = {
|
||||
let item_type = crate::agents::lifecycle::item_type_from_id(story_id);
|
||||
if item_type == "spike" {
|
||||
true // Spikes always need human review.
|
||||
} else {
|
||||
let qa_dir = project_root.join(".storkit/work/3_qa");
|
||||
let story_path = qa_dir.join(format!("{story_id}.md"));
|
||||
let default_qa = config.default_qa_mode();
|
||||
matches!(
|
||||
crate::io::story_metadata::resolve_qa_mode(&story_path, default_qa),
|
||||
crate::io::story_metadata::QaMode::Human
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
if needs_human_review {
|
||||
// Hold in 3_qa/ for human review.
|
||||
let qa_dir = project_root.join(".storkit/work/3_qa");
|
||||
let story_path = qa_dir.join(format!("{story_id}.md"));
|
||||
if let Err(e) =
|
||||
crate::io::story_metadata::write_review_hold(&story_path)
|
||||
{
|
||||
slog_error!(
|
||||
"[pipeline] Failed to set review_hold on '{story_id}': {e}"
|
||||
);
|
||||
}
|
||||
slog!(
|
||||
"[pipeline] QA passed for '{story_id}'. \
|
||||
Holding for human review. \
|
||||
Worktree preserved at: {worktree_path:?}"
|
||||
);
|
||||
} else {
|
||||
slog!(
|
||||
"[pipeline] QA passed gates and coverage for '{story_id}'. \
|
||||
Moving directly to merge."
|
||||
);
|
||||
if let Err(e) =
|
||||
crate::agents::lifecycle::move_story_to_merge(&project_root, story_id)
|
||||
{
|
||||
slog_error!(
|
||||
"[pipeline] Failed to move '{story_id}' to 4_merge/: {e}"
|
||||
);
|
||||
} else if let Err(e) = self
|
||||
.start_agent(&project_root, story_id, Some("mergemaster"), None)
|
||||
.await
|
||||
{
|
||||
slog_error!(
|
||||
"[pipeline] Failed to start mergemaster for '{story_id}': {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let story_path = project_root
|
||||
.join(".storkit/work/3_qa")
|
||||
.join(format!("{story_id}.md"));
|
||||
if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa-coverage") {
|
||||
// Story has exceeded retry limit — do not restart.
|
||||
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
|
||||
story_id: story_id.to_string(),
|
||||
reason,
|
||||
});
|
||||
} else {
|
||||
slog!(
|
||||
"[pipeline] QA coverage gate failed for '{story_id}'. Restarting QA."
|
||||
);
|
||||
let context = format!(
|
||||
"\n\n---\n## Coverage Gate Failed\n\
|
||||
The coverage gate (script/test_coverage) failed with the following output:\n{}\n\n\
|
||||
Please improve test coverage until the coverage gate passes.",
|
||||
coverage_output
|
||||
);
|
||||
if let Err(e) = self
|
||||
.start_agent(&project_root, story_id, Some("qa"), Some(&context))
|
||||
.await
|
||||
{
|
||||
slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let story_path = project_root
|
||||
.join(".storkit/work/3_qa")
|
||||
.join(format!("{story_id}.md"));
|
||||
if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "qa") {
|
||||
// Story has exceeded retry limit — do not restart.
|
||||
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
|
||||
story_id: story_id.to_string(),
|
||||
reason,
|
||||
});
|
||||
} else {
|
||||
slog!("[pipeline] QA failed gates for '{story_id}'. Restarting.");
|
||||
let context = format!(
|
||||
"\n\n---\n## Previous QA Attempt Failed\n\
|
||||
The acceptance gates failed with the following output:\n{}\n\n\
|
||||
Please re-run and fix the issues.",
|
||||
completion.gate_output
|
||||
);
|
||||
if let Err(e) = self
|
||||
.start_agent(&project_root, story_id, Some("qa"), Some(&context))
|
||||
.await
|
||||
{
|
||||
slog_error!("[pipeline] Failed to restart qa for '{story_id}': {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
PipelineStage::Mergemaster => {
|
||||
// Block advancement if the mergemaster explicitly reported a failure.
|
||||
// The server-owned gate check runs in the feature-branch worktree (not
|
||||
// master), so `gates_passed=true` is misleading when no code was merged.
|
||||
if merge_failure_reported {
|
||||
slog!(
|
||||
"[pipeline] Pipeline advancement blocked for '{story_id}': \
|
||||
mergemaster explicitly reported a merge failure. \
|
||||
Story stays in 4_merge/ for human review."
|
||||
);
|
||||
} else {
|
||||
// Run script/test on master (project_root) as the post-merge verification.
|
||||
slog!(
|
||||
"[pipeline] Mergemaster completed for '{story_id}'. Running post-merge tests on master."
|
||||
);
|
||||
let root = project_root.clone();
|
||||
let test_result =
|
||||
tokio::task::spawn_blocking(move || crate::agents::gates::run_project_tests(&root))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
slog_warn!("[pipeline] Post-merge test task panicked: {e}");
|
||||
Ok((false, format!("Test task panicked: {e}")))
|
||||
});
|
||||
let (passed, output) = match test_result {
|
||||
Ok(pair) => pair,
|
||||
Err(e) => (false, e),
|
||||
};
|
||||
|
||||
if passed {
|
||||
slog!(
|
||||
"[pipeline] Post-merge tests passed for '{story_id}'. Moving to done."
|
||||
);
|
||||
if let Err(e) =
|
||||
crate::agents::lifecycle::move_story_to_archived(&project_root, story_id)
|
||||
{
|
||||
slog_error!("[pipeline] Failed to move '{story_id}' to done: {e}");
|
||||
}
|
||||
self.remove_agents_for_story(story_id);
|
||||
// TODO: Re-enable worktree cleanup once we have persistent agent logs.
|
||||
// Removing worktrees destroys evidence needed to debug empty-commit agents.
|
||||
// let config =
|
||||
// crate::config::ProjectConfig::load(&project_root).unwrap_or_default();
|
||||
// if let Err(e) =
|
||||
// worktree::remove_worktree_by_story_id(&project_root, story_id, &config)
|
||||
// .await
|
||||
// {
|
||||
// slog!(
|
||||
// "[pipeline] Failed to remove worktree for '{story_id}': {e}"
|
||||
// );
|
||||
// }
|
||||
slog!(
|
||||
"[pipeline] Story '{story_id}' done. Worktree preserved for inspection."
|
||||
);
|
||||
} else {
|
||||
let story_path = project_root
|
||||
.join(".storkit/work/4_merge")
|
||||
.join(format!("{story_id}.md"));
|
||||
if let Some(reason) = should_block_story(&story_path, config.max_retries, story_id, "mergemaster") {
|
||||
// Story has exceeded retry limit — do not restart.
|
||||
let _ = self.watcher_tx.send(WatcherEvent::StoryBlocked {
|
||||
story_id: story_id.to_string(),
|
||||
reason,
|
||||
});
|
||||
} else {
|
||||
slog!(
|
||||
"[pipeline] Post-merge tests failed for '{story_id}'. Restarting mergemaster."
|
||||
);
|
||||
let context = format!(
|
||||
"\n\n---\n## Post-Merge Test Failed\n\
|
||||
The tests on master failed with the following output:\n{}\n\n\
|
||||
Please investigate and resolve the failures, then call merge_agent_work again.",
|
||||
output
|
||||
);
|
||||
if let Err(e) = self
|
||||
.start_agent(
|
||||
&project_root,
|
||||
story_id,
|
||||
Some("mergemaster"),
|
||||
Some(&context),
|
||||
)
|
||||
.await
|
||||
{
|
||||
slog_error!(
|
||||
"[pipeline] Failed to restart mergemaster for '{story_id}': {e}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Always scan for unassigned work after any agent completes, regardless
|
||||
// of the outcome (success, failure, restart). This ensures stories that
|
||||
// failed agent assignment due to busy agents are retried when agents
|
||||
// become available (bug 295).
|
||||
self.auto_assign_available_work(&project_root).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn pipeline advancement as a background task.
|
||||
///
|
||||
/// This is a **non-async** function so it does not participate in the opaque
|
||||
/// type cycle between `start_agent` and `run_server_owned_completion`.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn spawn_pipeline_advance(
|
||||
agents: Arc<Mutex<HashMap<String, StoryAgent>>>,
|
||||
port: u16,
|
||||
story_id: &str,
|
||||
agent_name: &str,
|
||||
completion: CompletionReport,
|
||||
project_root: Option<PathBuf>,
|
||||
worktree_path: Option<PathBuf>,
|
||||
watcher_tx: broadcast::Sender<WatcherEvent>,
|
||||
merge_failure_reported: bool,
|
||||
) {
|
||||
let sid = story_id.to_string();
|
||||
let aname = agent_name.to_string();
|
||||
tokio::spawn(async move {
|
||||
let pool = AgentPool {
|
||||
agents,
|
||||
port,
|
||||
child_killers: Arc::new(Mutex::new(HashMap::new())),
|
||||
watcher_tx,
|
||||
merge_jobs: Arc::new(Mutex::new(HashMap::new())),
|
||||
};
|
||||
pool.run_pipeline_advance(
|
||||
&sid,
|
||||
&aname,
|
||||
completion,
|
||||
project_root,
|
||||
worktree_path,
|
||||
merge_failure_reported,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
/// Increment retry_count and block the story if it exceeds `max_retries`.
|
||||
///
|
||||
/// Returns `Some(reason)` if the story is now blocked (caller should NOT restart the agent).
|
||||
/// Returns `None` if the story may be retried.
|
||||
/// When `max_retries` is 0, retry limits are disabled.
|
||||
fn should_block_story(story_path: &Path, max_retries: u32, story_id: &str, stage_label: &str) -> Option<String> {
|
||||
use crate::io::story_metadata::{increment_retry_count, write_blocked};
|
||||
|
||||
if max_retries == 0 {
|
||||
// Retry limits disabled.
|
||||
return None;
|
||||
}
|
||||
|
||||
match increment_retry_count(story_path) {
|
||||
Ok(new_count) => {
|
||||
if new_count >= max_retries {
|
||||
slog_warn!(
|
||||
"[pipeline] Story '{story_id}' reached retry limit ({new_count}/{max_retries}) \
|
||||
at {stage_label} stage. Marking as blocked."
|
||||
);
|
||||
if let Err(e) = write_blocked(story_path) {
|
||||
slog_error!("[pipeline] Failed to write blocked flag for '{story_id}': {e}");
|
||||
}
|
||||
Some(format!(
|
||||
"Retry limit exceeded ({new_count}/{max_retries}) at {stage_label} stage"
|
||||
))
|
||||
} else {
|
||||
slog!(
|
||||
"[pipeline] Story '{story_id}' retry {new_count}/{max_retries} at {stage_label} stage."
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
slog_error!("[pipeline] Failed to increment retry_count for '{story_id}': {e}");
|
||||
None // Don't block on error — allow retry.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::super::super::AgentPool;
|
||||
use super::super::super::composite_key;
|
||||
use crate::agents::{AgentStatus, CompletionReport};
|
||||
use crate::io::watcher::WatcherEvent;
|
||||
|
||||
// ── pipeline advance tests ────────────────────────────────────────────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn pipeline_advance_coder_gates_pass_server_qa_moves_to_merge() {
|
||||
use std::fs;
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
// Set up story in 2_current/ (no qa frontmatter → uses project default "server")
|
||||
let current = root.join(".storkit/work/2_current");
|
||||
fs::create_dir_all(¤t).unwrap();
|
||||
fs::write(current.join("50_story_test.md"), "test").unwrap();
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.run_pipeline_advance(
|
||||
"50_story_test",
|
||||
"coder-1",
|
||||
CompletionReport {
|
||||
summary: "done".to_string(),
|
||||
gates_passed: true,
|
||||
gate_output: String::new(),
|
||||
},
|
||||
Some(root.to_path_buf()),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
// With default qa: server, story skips QA and goes straight to 4_merge/
|
||||
assert!(
|
||||
root.join(".storkit/work/4_merge/50_story_test.md")
|
||||
.exists(),
|
||||
"story should be in 4_merge/"
|
||||
);
|
||||
assert!(
|
||||
!current.join("50_story_test.md").exists(),
|
||||
"story should not still be in 2_current/"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pipeline_advance_coder_gates_pass_agent_qa_moves_to_qa() {
|
||||
use std::fs;
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
// Set up story in 2_current/ with qa: agent frontmatter
|
||||
let current = root.join(".storkit/work/2_current");
|
||||
fs::create_dir_all(¤t).unwrap();
|
||||
fs::write(
|
||||
current.join("50_story_test.md"),
|
||||
"---\nname: Test\nqa: agent\n---\ntest",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.run_pipeline_advance(
|
||||
"50_story_test",
|
||||
"coder-1",
|
||||
CompletionReport {
|
||||
summary: "done".to_string(),
|
||||
gates_passed: true,
|
||||
gate_output: String::new(),
|
||||
},
|
||||
Some(root.to_path_buf()),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
// With qa: agent, story should move to 3_qa/
|
||||
assert!(
|
||||
root.join(".storkit/work/3_qa/50_story_test.md").exists(),
|
||||
"story should be in 3_qa/"
|
||||
);
|
||||
assert!(
|
||||
!current.join("50_story_test.md").exists(),
|
||||
"story should not still be in 2_current/"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pipeline_advance_qa_gates_pass_moves_story_to_merge() {
|
||||
use std::fs;
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
// Set up story in 3_qa/
|
||||
let qa_dir = root.join(".storkit/work/3_qa");
|
||||
fs::create_dir_all(&qa_dir).unwrap();
|
||||
// qa: server so the story skips human review and goes straight to merge.
|
||||
fs::write(
|
||||
qa_dir.join("51_story_test.md"),
|
||||
"---\nname: Test\nqa: server\n---\ntest",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.run_pipeline_advance(
|
||||
"51_story_test",
|
||||
"qa",
|
||||
CompletionReport {
|
||||
summary: "QA done".to_string(),
|
||||
gates_passed: true,
|
||||
gate_output: String::new(),
|
||||
},
|
||||
Some(root.to_path_buf()),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Story should have moved to 4_merge/
|
||||
assert!(
|
||||
root.join(".storkit/work/4_merge/51_story_test.md")
|
||||
.exists(),
|
||||
"story should be in 4_merge/"
|
||||
);
|
||||
assert!(
|
||||
!qa_dir.join("51_story_test.md").exists(),
|
||||
"story should not still be in 3_qa/"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pipeline_advance_supervisor_does_not_advance() {
|
||||
use std::fs;
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
let current = root.join(".storkit/work/2_current");
|
||||
fs::create_dir_all(¤t).unwrap();
|
||||
fs::write(current.join("52_story_test.md"), "test").unwrap();
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
pool.run_pipeline_advance(
|
||||
"52_story_test",
|
||||
"supervisor",
|
||||
CompletionReport {
|
||||
summary: "supervised".to_string(),
|
||||
gates_passed: true,
|
||||
gate_output: String::new(),
|
||||
},
|
||||
Some(root.to_path_buf()),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Story should NOT have moved (supervisors don't advance pipeline)
|
||||
assert!(
|
||||
current.join("52_story_test.md").exists(),
|
||||
"story should still be in 2_current/ for supervisor"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pipeline_advance_sends_agent_state_changed_to_watcher_tx() {
|
||||
use std::fs;
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
// Set up story in 2_current/
|
||||
let current = root.join(".storkit/work/2_current");
|
||||
fs::create_dir_all(¤t).unwrap();
|
||||
fs::write(current.join("173_story_test.md"), "test").unwrap();
|
||||
// Ensure 3_qa/ exists for the move target
|
||||
fs::create_dir_all(root.join(".storkit/work/3_qa")).unwrap();
|
||||
// Ensure 1_backlog/ exists (start_agent calls move_story_to_current)
|
||||
fs::create_dir_all(root.join(".storkit/work/1_backlog")).unwrap();
|
||||
|
||||
// Write a project.toml with a qa agent so start_agent can resolve it.
|
||||
fs::create_dir_all(root.join(".storkit")).unwrap();
|
||||
fs::write(
|
||||
root.join(".storkit/project.toml"),
|
||||
r#"
|
||||
default_qa = "agent"
|
||||
|
||||
[[agent]]
|
||||
name = "coder-1"
|
||||
role = "Coder"
|
||||
command = "echo"
|
||||
args = ["noop"]
|
||||
prompt = "test"
|
||||
stage = "coder"
|
||||
|
||||
[[agent]]
|
||||
name = "qa"
|
||||
role = "QA"
|
||||
command = "echo"
|
||||
args = ["noop"]
|
||||
prompt = "test"
|
||||
stage = "qa"
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
// Subscribe to the watcher channel BEFORE the pipeline advance.
|
||||
let mut rx = pool.watcher_tx.subscribe();
|
||||
|
||||
pool.run_pipeline_advance(
|
||||
"173_story_test",
|
||||
"coder-1",
|
||||
CompletionReport {
|
||||
summary: "done".to_string(),
|
||||
gates_passed: true,
|
||||
gate_output: String::new(),
|
||||
},
|
||||
Some(root.to_path_buf()),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
// The pipeline advance should have sent AgentStateChanged events via
|
||||
// the pool's watcher_tx (not a dummy channel). Collect all events.
|
||||
let mut got_agent_state_changed = false;
|
||||
while let Ok(evt) = rx.try_recv() {
|
||||
if matches!(evt, WatcherEvent::AgentStateChanged) {
|
||||
got_agent_state_changed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
got_agent_state_changed,
|
||||
"pipeline advance should send AgentStateChanged through the real watcher_tx \
|
||||
(bug 173: lozenges must update when agents are assigned during pipeline advance)"
|
||||
);
|
||||
}
|
||||
|
||||
// ── bug 295: pipeline advance picks up waiting QA stories ──────────
|
||||
|
||||
#[tokio::test]
|
||||
async fn pipeline_advance_picks_up_waiting_qa_stories_after_completion() {
|
||||
use std::fs;
|
||||
use super::super::super::auto_assign::is_agent_free;
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let root = tmp.path();
|
||||
|
||||
let sk = root.join(".storkit");
|
||||
let qa_dir = sk.join("work/3_qa");
|
||||
fs::create_dir_all(&qa_dir).unwrap();
|
||||
|
||||
// Configure a single QA agent.
|
||||
fs::write(
|
||||
sk.join("project.toml"),
|
||||
r#"
|
||||
[[agent]]
|
||||
name = "qa"
|
||||
stage = "qa"
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Story 292 is in QA with QA agent running (will "complete" via
|
||||
// run_pipeline_advance below). Story 293 is in QA with NO agent —
|
||||
// simulating the "stuck" state from bug 295.
|
||||
fs::write(
|
||||
qa_dir.join("292_story_first.md"),
|
||||
"---\nname: First\nqa: human\n---\n",
|
||||
)
|
||||
.unwrap();
|
||||
fs::write(
|
||||
qa_dir.join("293_story_second.md"),
|
||||
"---\nname: Second\nqa: human\n---\n",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let pool = AgentPool::new_test(3001);
|
||||
// QA is currently running on story 292.
|
||||
pool.inject_test_agent("292_story_first", "qa", AgentStatus::Running);
|
||||
|
||||
// Verify that 293 cannot get a QA agent right now (QA is busy).
|
||||
{
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
assert!(
|
||||
!is_agent_free(&agents, "qa"),
|
||||
"qa should be busy on story 292"
|
||||
);
|
||||
}
|
||||
|
||||
// Simulate QA completing on story 292: remove the agent from the pool
|
||||
// (as run_server_owned_completion does) then run pipeline advance.
|
||||
{
|
||||
let mut agents = pool.agents.lock().unwrap();
|
||||
agents.remove(&composite_key("292_story_first", "qa"));
|
||||
}
|
||||
|
||||
pool.run_pipeline_advance(
|
||||
"292_story_first",
|
||||
"qa",
|
||||
CompletionReport {
|
||||
summary: "QA done".to_string(),
|
||||
gates_passed: true,
|
||||
gate_output: String::new(),
|
||||
},
|
||||
Some(root.to_path_buf()),
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
// After pipeline advance, auto_assign should have started QA on story 293.
|
||||
let agents = pool.agents.lock().unwrap();
|
||||
let qa_on_293 = agents.values().any(|a| {
|
||||
a.agent_name == "qa"
|
||||
&& matches!(a.status, AgentStatus::Pending | AgentStatus::Running)
|
||||
});
|
||||
assert!(
|
||||
qa_on_293,
|
||||
"auto_assign should have started qa for story 293 after 292's QA completed, \
|
||||
but no qa agent is pending/running. Pool: {:?}",
|
||||
agents
|
||||
.iter()
|
||||
.map(|(k, a)| format!("{k}: {} ({})", a.agent_name, a.status))
|
||||
.collect::<Vec<_>>()
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user