Refactor agents.rs (7631 lines) into agents/ module directory

Split the monolithic agents.rs into 6 focused modules:
- mod.rs: shared types (AgentEvent, AgentStatus, etc.) and re-exports
- pool.rs: AgentPool struct, all methods, and helper free functions
- pty.rs: PTY streaming (run_agent_pty_blocking, emit_event)
- lifecycle.rs: story movement functions (move_story_to_qa, etc.)
- gates.rs: acceptance gates (clippy, tests, coverage)
- merge.rs: squash-merge, conflict resolution, quality gates

All 121 original tests are preserved and distributed across modules.
Also adds clear_front_matter_field to story_metadata.rs to strip
stale merge_failure from front matter when stories move to done.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dave
2026-03-16 23:06:14 +00:00
parent 225137fbdc
commit 7c3a756a5c
7 changed files with 4331 additions and 4149 deletions

393
server/src/agents/gates.rs Normal file
View File

@@ -0,0 +1,393 @@
use std::path::Path;
use std::process::Command;
/// Detect whether the base branch in a worktree is `master` or `main`.
/// Falls back to `"master"` if neither is found.
pub(crate) fn detect_worktree_base_branch(wt_path: &Path) -> String {
for branch in &["master", "main"] {
let ok = Command::new("git")
.args(["rev-parse", "--verify", branch])
.current_dir(wt_path)
.output()
.map(|o| o.status.success())
.unwrap_or(false);
if ok {
return branch.to_string();
}
}
"master".to_string()
}
/// Return `true` if the git worktree at `wt_path` has commits on its current
/// branch that are not present on the base branch (`master` or `main`).
///
/// Used during server startup reconciliation to detect stories whose agent work
/// was committed while the server was offline.
pub(crate) fn worktree_has_committed_work(wt_path: &Path) -> bool {
let base_branch = detect_worktree_base_branch(wt_path);
let output = Command::new("git")
.args(["log", &format!("{base_branch}..HEAD"), "--oneline"])
.current_dir(wt_path)
.output();
match output {
Ok(out) if out.status.success() => {
!String::from_utf8_lossy(&out.stdout).trim().is_empty()
}
_ => false,
}
}
/// Check whether the given directory has any uncommitted git changes.
/// Returns `Err` with a descriptive message if there are any.
pub(crate) fn check_uncommitted_changes(path: &Path) -> Result<(), String> {
let output = Command::new("git")
.args(["status", "--porcelain"])
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run git status: {e}"))?;
let stdout = String::from_utf8_lossy(&output.stdout);
if !stdout.trim().is_empty() {
return Err(format!(
"Worktree has uncommitted changes. Please commit all work before \
the agent exits:\n{stdout}"
));
}
Ok(())
}
/// Run the project's test suite.
///
/// Uses `script/test` if present, treating it as the canonical single test entry point.
/// Falls back to `cargo nextest run` / `cargo test` when `script/test` is absent.
/// Returns `(tests_passed, output)`.
pub(crate) fn run_project_tests(path: &Path) -> Result<(bool, String), String> {
let script_test = path.join("script").join("test");
if script_test.exists() {
let mut output = String::from("=== script/test ===\n");
let result = Command::new(&script_test)
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run script/test: {e}"))?;
let out = format!(
"{}{}",
String::from_utf8_lossy(&result.stdout),
String::from_utf8_lossy(&result.stderr)
);
output.push_str(&out);
output.push('\n');
return Ok((result.status.success(), output));
}
// Fallback: cargo nextest run / cargo test
let mut output = String::from("=== tests ===\n");
let (success, test_out) = match Command::new("cargo")
.args(["nextest", "run"])
.current_dir(path)
.output()
{
Ok(o) => {
let combined = format!(
"{}{}",
String::from_utf8_lossy(&o.stdout),
String::from_utf8_lossy(&o.stderr)
);
(o.status.success(), combined)
}
Err(_) => {
// nextest not available — fall back to cargo test
let o = Command::new("cargo")
.args(["test"])
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run cargo test: {e}"))?;
let combined = format!(
"{}{}",
String::from_utf8_lossy(&o.stdout),
String::from_utf8_lossy(&o.stderr)
);
(o.status.success(), combined)
}
};
output.push_str(&test_out);
output.push('\n');
Ok((success, output))
}
/// Run `cargo clippy` and the project test suite (via `script/test` if present,
/// otherwise `cargo nextest run` / `cargo test`) in the given directory.
/// Returns `(gates_passed, combined_output)`.
pub(crate) fn run_acceptance_gates(path: &Path) -> Result<(bool, String), String> {
let mut all_output = String::new();
let mut all_passed = true;
// ── cargo clippy ──────────────────────────────────────────────
let clippy = Command::new("cargo")
.args(["clippy", "--all-targets", "--all-features"])
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run cargo clippy: {e}"))?;
all_output.push_str("=== cargo clippy ===\n");
let clippy_stdout = String::from_utf8_lossy(&clippy.stdout);
let clippy_stderr = String::from_utf8_lossy(&clippy.stderr);
if !clippy_stdout.is_empty() {
all_output.push_str(&clippy_stdout);
}
if !clippy_stderr.is_empty() {
all_output.push_str(&clippy_stderr);
}
all_output.push('\n');
if !clippy.status.success() {
all_passed = false;
}
// ── tests (script/test if available, else cargo nextest/test) ─
let (test_success, test_out) = run_project_tests(path)?;
all_output.push_str(&test_out);
if !test_success {
all_passed = false;
}
Ok((all_passed, all_output))
}
/// Run `script/test_coverage` in the given directory if the script exists.
///
/// Used as a QA gate before advancing a story from `3_qa/` to `4_merge/`.
/// Returns `(passed, output)`. If the script does not exist, returns `(true, …)`.
pub(crate) fn run_coverage_gate(path: &Path) -> Result<(bool, String), String> {
let script = path.join("script").join("test_coverage");
if !script.exists() {
return Ok((
true,
"script/test_coverage not found; coverage gate skipped.\n".to_string(),
));
}
let mut output = String::from("=== script/test_coverage ===\n");
let result = Command::new(&script)
.current_dir(path)
.output()
.map_err(|e| format!("Failed to run script/test_coverage: {e}"))?;
let combined = format!(
"{}{}",
String::from_utf8_lossy(&result.stdout),
String::from_utf8_lossy(&result.stderr)
);
output.push_str(&combined);
output.push('\n');
Ok((result.status.success(), output))
}
#[cfg(test)]
mod tests {
use super::*;
fn init_git_repo(repo: &std::path::Path) {
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
}
// ── run_project_tests tests ───────────────────────────────────
#[cfg(unix)]
#[test]
fn run_project_tests_uses_script_test_when_present_and_passes() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let path = tmp.path();
let script_dir = path.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script_test = script_dir.join("test");
fs::write(&script_test, "#!/usr/bin/env bash\necho 'all tests passed'\nexit 0\n").unwrap();
let mut perms = fs::metadata(&script_test).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_test, perms).unwrap();
let (passed, output) = run_project_tests(path).unwrap();
assert!(passed, "script/test exiting 0 should pass");
assert!(output.contains("script/test"), "output should mention script/test");
}
#[cfg(unix)]
#[test]
fn run_project_tests_reports_failure_when_script_test_exits_nonzero() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let path = tmp.path();
let script_dir = path.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script_test = script_dir.join("test");
fs::write(&script_test, "#!/usr/bin/env bash\nexit 1\n").unwrap();
let mut perms = fs::metadata(&script_test).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script_test, perms).unwrap();
let (passed, output) = run_project_tests(path).unwrap();
assert!(!passed, "script/test exiting 1 should fail");
assert!(output.contains("script/test"), "output should mention script/test");
}
// ── run_coverage_gate tests ───────────────────────────────────────────────
#[cfg(unix)]
#[test]
fn coverage_gate_passes_when_script_absent() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let (passed, output) = run_coverage_gate(tmp.path()).unwrap();
assert!(passed, "coverage gate should pass when script is absent");
assert!(
output.contains("not found"),
"output should mention script not found"
);
}
#[cfg(unix)]
#[test]
fn coverage_gate_passes_when_script_exits_zero() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let path = tmp.path();
let script_dir = path.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script = script_dir.join("test_coverage");
fs::write(
&script,
"#!/usr/bin/env bash\necho 'Rust line coverage: 85%'\necho 'PASS: Coverage 85% meets threshold 0%'\nexit 0\n",
)
.unwrap();
let mut perms = fs::metadata(&script).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script, perms).unwrap();
let (passed, output) = run_coverage_gate(path).unwrap();
assert!(passed, "coverage gate should pass when script exits 0");
assert!(
output.contains("script/test_coverage"),
"output should mention script/test_coverage"
);
}
#[cfg(unix)]
#[test]
fn coverage_gate_fails_when_script_exits_nonzero() {
use std::fs;
use std::os::unix::fs::PermissionsExt;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let path = tmp.path();
let script_dir = path.join("script");
fs::create_dir_all(&script_dir).unwrap();
let script = script_dir.join("test_coverage");
fs::write(
&script,
"#!/usr/bin/env bash\necho 'FAIL: Coverage 40% is below threshold 80%'\nexit 1\n",
)
.unwrap();
let mut perms = fs::metadata(&script).unwrap().permissions();
perms.set_mode(0o755);
fs::set_permissions(&script, perms).unwrap();
let (passed, output) = run_coverage_gate(path).unwrap();
assert!(!passed, "coverage gate should fail when script exits 1");
assert!(
output.contains("script/test_coverage"),
"output should mention script/test_coverage"
);
}
// ── worktree_has_committed_work tests ─────────────────────────────────────
#[test]
fn worktree_has_committed_work_false_on_fresh_repo() {
let tmp = tempfile::tempdir().unwrap();
let repo = tmp.path();
// init_git_repo creates the initial commit on the default branch.
// HEAD IS the base branch — no commits ahead.
init_git_repo(repo);
assert!(!worktree_has_committed_work(repo));
}
#[test]
fn worktree_has_committed_work_true_after_commit_on_feature_branch() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let project_root = tmp.path().join("project");
fs::create_dir_all(&project_root).unwrap();
init_git_repo(&project_root);
// Create a git worktree on a feature branch.
let wt_path = tmp.path().join("wt");
Command::new("git")
.args([
"worktree",
"add",
&wt_path.to_string_lossy(),
"-b",
"feature/story-99_test",
])
.current_dir(&project_root)
.output()
.unwrap();
// No commits on the feature branch yet — same as base branch.
assert!(!worktree_has_committed_work(&wt_path));
// Add a commit to the feature branch in the worktree.
fs::write(wt_path.join("work.txt"), "done").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(&wt_path)
.output()
.unwrap();
Command::new("git")
.args([
"-c",
"user.email=test@test.com",
"-c",
"user.name=Test",
"commit",
"-m",
"coder: implement story",
])
.current_dir(&wt_path)
.output()
.unwrap();
// Now the feature branch is ahead of the base branch.
assert!(worktree_has_committed_work(&wt_path));
}
}

View File

@@ -0,0 +1,556 @@
use std::path::{Path, PathBuf};
use std::process::Command;
use crate::io::story_metadata::clear_front_matter_field;
use crate::slog;
#[allow(dead_code)]
fn item_type_from_id(item_id: &str) -> &'static str {
// New format: {digits}_{type}_{slug}
let after_num = item_id.trim_start_matches(|c: char| c.is_ascii_digit());
if after_num.starts_with("_bug_") {
"bug"
} else if after_num.starts_with("_spike_") {
"spike"
} else {
"story"
}
}
/// Return the source directory path for a work item (always work/1_upcoming/).
fn item_source_dir(project_root: &Path, _item_id: &str) -> PathBuf {
project_root.join(".story_kit").join("work").join("1_upcoming")
}
/// Return the done directory path for a work item (always work/5_done/).
fn item_archive_dir(project_root: &Path, _item_id: &str) -> PathBuf {
project_root.join(".story_kit").join("work").join("5_done")
}
/// Move a work item (story, bug, or spike) from `work/1_upcoming/` to `work/2_current/`.
///
/// Idempotent: if the item is already in `2_current/`, returns Ok without committing.
/// If the item is not found in `1_upcoming/`, logs a warning and returns Ok.
pub fn move_story_to_current(project_root: &Path, story_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_dir = sk.join("2_current");
let current_path = current_dir.join(format!("{story_id}.md"));
if current_path.exists() {
// Already in 2_current/ — idempotent, nothing to do.
return Ok(());
}
let source_dir = item_source_dir(project_root, story_id);
let source_path = source_dir.join(format!("{story_id}.md"));
if !source_path.exists() {
slog!(
"[lifecycle] Work item '{story_id}' not found in {}; skipping move to 2_current/",
source_dir.display()
);
return Ok(());
}
std::fs::create_dir_all(&current_dir)
.map_err(|e| format!("Failed to create work/2_current/ directory: {e}"))?;
std::fs::rename(&source_path, &current_path)
.map_err(|e| format!("Failed to move '{story_id}' to 2_current/: {e}"))?;
slog!(
"[lifecycle] Moved '{story_id}' from {} to work/2_current/",
source_dir.display()
);
Ok(())
}
/// Check whether a feature branch `feature/story-{story_id}` exists and has
/// commits that are not yet on master. Returns `true` when there is unmerged
/// work, `false` when there is no branch or all its commits are already
/// reachable from master.
pub fn feature_branch_has_unmerged_changes(project_root: &Path, story_id: &str) -> bool {
let branch = format!("feature/story-{story_id}");
// Check if the branch exists.
let branch_check = Command::new("git")
.args(["rev-parse", "--verify", &branch])
.current_dir(project_root)
.output();
match branch_check {
Ok(out) if out.status.success() => {}
_ => return false, // No feature branch → nothing to merge.
}
// Check if the branch has commits not reachable from master.
let log = Command::new("git")
.args(["log", &format!("master..{branch}"), "--oneline"])
.current_dir(project_root)
.output();
match log {
Ok(out) => {
let stdout = String::from_utf8_lossy(&out.stdout);
!stdout.trim().is_empty()
}
Err(_) => false,
}
}
/// Move a story from `work/2_current/` to `work/5_done/` and auto-commit.
///
/// * If the story is in `2_current/`, it is moved to `5_done/` and committed.
/// * If the story is in `4_merge/`, it is moved to `5_done/` and committed.
/// * If the story is already in `5_done/` or `6_archived/`, this is a no-op (idempotent).
/// * If the story is not found in `2_current/`, `4_merge/`, `5_done/`, or `6_archived/`, an error is returned.
pub fn move_story_to_archived(project_root: &Path, story_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_path = sk.join("2_current").join(format!("{story_id}.md"));
let merge_path = sk.join("4_merge").join(format!("{story_id}.md"));
let done_dir = sk.join("5_done");
let done_path = done_dir.join(format!("{story_id}.md"));
let archived_path = sk.join("6_archived").join(format!("{story_id}.md"));
if done_path.exists() || archived_path.exists() {
// Already in done or archived — idempotent, nothing to do.
return Ok(());
}
// Check 2_current/ first, then 4_merge/
let source_path = if current_path.exists() {
current_path.clone()
} else if merge_path.exists() {
merge_path.clone()
} else {
return Err(format!(
"Story '{story_id}' not found in work/2_current/ or work/4_merge/. Cannot accept story."
));
};
std::fs::create_dir_all(&done_dir)
.map_err(|e| format!("Failed to create work/5_done/ directory: {e}"))?;
std::fs::rename(&source_path, &done_path)
.map_err(|e| format!("Failed to move story '{story_id}' to 5_done/: {e}"))?;
// Strip stale merge_failure from front matter now that the story is done.
if let Err(e) = clear_front_matter_field(&done_path, "merge_failure") {
slog!("[lifecycle] Warning: could not clear merge_failure from '{story_id}': {e}");
}
let from_dir = if source_path == current_path {
"work/2_current/"
} else {
"work/4_merge/"
};
slog!("[lifecycle] Moved story '{story_id}' from {from_dir} to work/5_done/");
Ok(())
}
/// Move a story/bug from `work/2_current/` or `work/3_qa/` to `work/4_merge/`.
///
/// This stages a work item as ready for the mergemaster to pick up and merge into master.
/// Idempotent: if already in `4_merge/`, returns Ok without committing.
pub fn move_story_to_merge(project_root: &Path, story_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_path = sk.join("2_current").join(format!("{story_id}.md"));
let qa_path = sk.join("3_qa").join(format!("{story_id}.md"));
let merge_dir = sk.join("4_merge");
let merge_path = merge_dir.join(format!("{story_id}.md"));
if merge_path.exists() {
// Already in 4_merge/ — idempotent, nothing to do.
return Ok(());
}
// Accept from 2_current/ (manual trigger) or 3_qa/ (pipeline advancement from QA stage).
let source_path = if current_path.exists() {
current_path.clone()
} else if qa_path.exists() {
qa_path.clone()
} else {
return Err(format!(
"Work item '{story_id}' not found in work/2_current/ or work/3_qa/. Cannot move to 4_merge/."
));
};
std::fs::create_dir_all(&merge_dir)
.map_err(|e| format!("Failed to create work/4_merge/ directory: {e}"))?;
std::fs::rename(&source_path, &merge_path)
.map_err(|e| format!("Failed to move '{story_id}' to 4_merge/: {e}"))?;
let from_dir = if source_path == current_path {
"work/2_current/"
} else {
"work/3_qa/"
};
slog!("[lifecycle] Moved '{story_id}' from {from_dir} to work/4_merge/");
Ok(())
}
/// Move a story/bug from `work/2_current/` to `work/3_qa/` and auto-commit.
///
/// This stages a work item for QA review before merging to master.
/// Idempotent: if already in `3_qa/`, returns Ok without committing.
pub fn move_story_to_qa(project_root: &Path, story_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_path = sk.join("2_current").join(format!("{story_id}.md"));
let qa_dir = sk.join("3_qa");
let qa_path = qa_dir.join(format!("{story_id}.md"));
if qa_path.exists() {
// Already in 3_qa/ — idempotent, nothing to do.
return Ok(());
}
if !current_path.exists() {
return Err(format!(
"Work item '{story_id}' not found in work/2_current/. Cannot move to 3_qa/."
));
}
std::fs::create_dir_all(&qa_dir)
.map_err(|e| format!("Failed to create work/3_qa/ directory: {e}"))?;
std::fs::rename(&current_path, &qa_path)
.map_err(|e| format!("Failed to move '{story_id}' to 3_qa/: {e}"))?;
slog!("[lifecycle] Moved '{story_id}' from work/2_current/ to work/3_qa/");
Ok(())
}
/// Move a bug from `work/2_current/` or `work/1_upcoming/` to `work/5_done/` and auto-commit.
///
/// * If the bug is in `2_current/`, it is moved to `5_done/` and committed.
/// * If the bug is still in `1_upcoming/` (never started), it is moved directly to `5_done/`.
/// * If the bug is already in `5_done/`, this is a no-op (idempotent).
/// * If the bug is not found anywhere, an error is returned.
pub fn close_bug_to_archive(project_root: &Path, bug_id: &str) -> Result<(), String> {
let sk = project_root.join(".story_kit").join("work");
let current_path = sk.join("2_current").join(format!("{bug_id}.md"));
let upcoming_path = sk.join("1_upcoming").join(format!("{bug_id}.md"));
let archive_dir = item_archive_dir(project_root, bug_id);
let archive_path = archive_dir.join(format!("{bug_id}.md"));
if archive_path.exists() {
return Ok(());
}
let source_path = if current_path.exists() {
current_path.clone()
} else if upcoming_path.exists() {
upcoming_path.clone()
} else {
return Err(format!(
"Bug '{bug_id}' not found in work/2_current/ or work/1_upcoming/. Cannot close bug."
));
};
std::fs::create_dir_all(&archive_dir)
.map_err(|e| format!("Failed to create work/5_done/ directory: {e}"))?;
std::fs::rename(&source_path, &archive_path)
.map_err(|e| format!("Failed to move bug '{bug_id}' to 5_done/: {e}"))?;
slog!(
"[lifecycle] Closed bug '{bug_id}' → work/5_done/"
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
// ── move_story_to_current tests ────────────────────────────────────────────
#[test]
fn move_story_to_current_moves_file() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let upcoming = root.join(".story_kit/work/1_upcoming");
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&upcoming).unwrap();
fs::create_dir_all(&current).unwrap();
fs::write(upcoming.join("10_story_foo.md"), "test").unwrap();
move_story_to_current(root, "10_story_foo").unwrap();
assert!(!upcoming.join("10_story_foo.md").exists());
assert!(current.join("10_story_foo.md").exists());
}
#[test]
fn move_story_to_current_is_idempotent_when_already_current() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("11_story_foo.md"), "test").unwrap();
move_story_to_current(root, "11_story_foo").unwrap();
assert!(current.join("11_story_foo.md").exists());
}
#[test]
fn move_story_to_current_noop_when_not_in_upcoming() {
let tmp = tempfile::tempdir().unwrap();
assert!(move_story_to_current(tmp.path(), "99_missing").is_ok());
}
#[test]
fn move_bug_to_current_moves_from_upcoming() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let upcoming = root.join(".story_kit/work/1_upcoming");
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&upcoming).unwrap();
fs::create_dir_all(&current).unwrap();
fs::write(upcoming.join("1_bug_test.md"), "# Bug 1\n").unwrap();
move_story_to_current(root, "1_bug_test").unwrap();
assert!(!upcoming.join("1_bug_test.md").exists());
assert!(current.join("1_bug_test.md").exists());
}
// ── close_bug_to_archive tests ─────────────────────────────────────────────
#[test]
fn close_bug_moves_from_current_to_archive() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("2_bug_test.md"), "# Bug 2\n").unwrap();
close_bug_to_archive(root, "2_bug_test").unwrap();
assert!(!current.join("2_bug_test.md").exists());
assert!(root.join(".story_kit/work/5_done/2_bug_test.md").exists());
}
#[test]
fn close_bug_moves_from_upcoming_when_not_started() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let upcoming = root.join(".story_kit/work/1_upcoming");
fs::create_dir_all(&upcoming).unwrap();
fs::write(upcoming.join("3_bug_test.md"), "# Bug 3\n").unwrap();
close_bug_to_archive(root, "3_bug_test").unwrap();
assert!(!upcoming.join("3_bug_test.md").exists());
assert!(root.join(".story_kit/work/5_done/3_bug_test.md").exists());
}
// ── item_type_from_id tests ────────────────────────────────────────────────
#[test]
fn item_type_from_id_detects_types() {
assert_eq!(item_type_from_id("1_bug_test"), "bug");
assert_eq!(item_type_from_id("1_spike_research"), "spike");
assert_eq!(item_type_from_id("50_story_my_story"), "story");
assert_eq!(item_type_from_id("1_story_simple"), "story");
}
// ── move_story_to_merge tests ──────────────────────────────────────────────
#[test]
fn move_story_to_merge_moves_file() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("20_story_foo.md"), "test").unwrap();
move_story_to_merge(root, "20_story_foo").unwrap();
assert!(!current.join("20_story_foo.md").exists());
assert!(root.join(".story_kit/work/4_merge/20_story_foo.md").exists());
}
#[test]
fn move_story_to_merge_from_qa_dir() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let qa_dir = root.join(".story_kit/work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
fs::write(qa_dir.join("40_story_test.md"), "test").unwrap();
move_story_to_merge(root, "40_story_test").unwrap();
assert!(!qa_dir.join("40_story_test.md").exists());
assert!(root.join(".story_kit/work/4_merge/40_story_test.md").exists());
}
#[test]
fn move_story_to_merge_idempotent_when_already_in_merge() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let merge_dir = root.join(".story_kit/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
fs::write(merge_dir.join("21_story_test.md"), "test").unwrap();
move_story_to_merge(root, "21_story_test").unwrap();
assert!(merge_dir.join("21_story_test.md").exists());
}
#[test]
fn move_story_to_merge_errors_when_not_in_current_or_qa() {
let tmp = tempfile::tempdir().unwrap();
let result = move_story_to_merge(tmp.path(), "99_nonexistent");
assert!(result.unwrap_err().contains("not found in work/2_current/ or work/3_qa/"));
}
// ── move_story_to_qa tests ────────────────────────────────────────────────
#[test]
fn move_story_to_qa_moves_file() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let current = root.join(".story_kit/work/2_current");
fs::create_dir_all(&current).unwrap();
fs::write(current.join("30_story_qa.md"), "test").unwrap();
move_story_to_qa(root, "30_story_qa").unwrap();
assert!(!current.join("30_story_qa.md").exists());
assert!(root.join(".story_kit/work/3_qa/30_story_qa.md").exists());
}
#[test]
fn move_story_to_qa_idempotent_when_already_in_qa() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let qa_dir = root.join(".story_kit/work/3_qa");
fs::create_dir_all(&qa_dir).unwrap();
fs::write(qa_dir.join("31_story_test.md"), "test").unwrap();
move_story_to_qa(root, "31_story_test").unwrap();
assert!(qa_dir.join("31_story_test.md").exists());
}
#[test]
fn move_story_to_qa_errors_when_not_in_current() {
let tmp = tempfile::tempdir().unwrap();
let result = move_story_to_qa(tmp.path(), "99_nonexistent");
assert!(result.unwrap_err().contains("not found in work/2_current/"));
}
// ── move_story_to_archived tests ──────────────────────────────────────────
#[test]
fn move_story_to_archived_finds_in_merge_dir() {
use std::fs;
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let merge_dir = root.join(".story_kit/work/4_merge");
fs::create_dir_all(&merge_dir).unwrap();
fs::write(merge_dir.join("22_story_test.md"), "test").unwrap();
move_story_to_archived(root, "22_story_test").unwrap();
assert!(!merge_dir.join("22_story_test.md").exists());
assert!(root.join(".story_kit/work/5_done/22_story_test.md").exists());
}
#[test]
fn move_story_to_archived_error_when_not_in_current_or_merge() {
let tmp = tempfile::tempdir().unwrap();
let result = move_story_to_archived(tmp.path(), "99_nonexistent");
assert!(result.unwrap_err().contains("4_merge"));
}
// ── feature_branch_has_unmerged_changes tests ────────────────────────────
fn init_git_repo(repo: &std::path::Path) {
Command::new("git")
.args(["init"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.email", "test@test.com"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["config", "user.name", "Test"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "--allow-empty", "-m", "init"])
.current_dir(repo)
.output()
.unwrap();
}
/// Bug 226: feature_branch_has_unmerged_changes returns true when the
/// feature branch has commits not on master.
#[test]
fn feature_branch_has_unmerged_changes_detects_unmerged_code() {
use std::fs;
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
// Create a feature branch with a code commit.
Command::new("git")
.args(["checkout", "-b", "feature/story-50_story_test"])
.current_dir(repo)
.output()
.unwrap();
fs::write(repo.join("feature.rs"), "fn main() {}").unwrap();
Command::new("git")
.args(["add", "."])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["commit", "-m", "add feature"])
.current_dir(repo)
.output()
.unwrap();
Command::new("git")
.args(["checkout", "master"])
.current_dir(repo)
.output()
.unwrap();
assert!(
feature_branch_has_unmerged_changes(repo, "50_story_test"),
"should detect unmerged changes on feature branch"
);
}
/// Bug 226: feature_branch_has_unmerged_changes returns false when no
/// feature branch exists.
#[test]
fn feature_branch_has_unmerged_changes_false_when_no_branch() {
use tempfile::tempdir;
let tmp = tempdir().unwrap();
let repo = tmp.path();
init_git_repo(repo);
assert!(
!feature_branch_has_unmerged_changes(repo, "99_nonexistent"),
"should return false when no feature branch"
);
}
}

1638
server/src/agents/merge.rs Normal file

File diff suppressed because it is too large Load Diff

181
server/src/agents/mod.rs Normal file
View File

@@ -0,0 +1,181 @@
pub mod gates;
pub mod lifecycle;
pub mod merge;
mod pool;
mod pty;
use crate::config::AgentConfig;
use serde::Serialize;
pub use lifecycle::{
close_bug_to_archive, feature_branch_has_unmerged_changes, move_story_to_archived,
move_story_to_merge, move_story_to_qa,
};
pub use pool::AgentPool;
/// Events emitted during server startup reconciliation to broadcast real-time
/// progress to connected WebSocket clients.
#[derive(Debug, Clone, Serialize)]
pub struct ReconciliationEvent {
/// The story being reconciled, or empty string for the overall "done" event.
pub story_id: String,
/// Coarse status: "checking", "gates_running", "advanced", "skipped", "failed", "done"
pub status: String,
/// Human-readable details.
pub message: String,
}
/// Events streamed from a running agent to SSE clients.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum AgentEvent {
/// Agent status changed.
Status {
story_id: String,
agent_name: String,
status: String,
},
/// Raw text output from the agent process.
Output {
story_id: String,
agent_name: String,
text: String,
},
/// Agent produced a JSON event from `--output-format stream-json`.
AgentJson {
story_id: String,
agent_name: String,
data: serde_json::Value,
},
/// Agent finished.
Done {
story_id: String,
agent_name: String,
session_id: Option<String>,
},
/// Agent errored.
Error {
story_id: String,
agent_name: String,
message: String,
},
/// Thinking tokens from an extended-thinking block.
Thinking {
story_id: String,
agent_name: String,
text: String,
},
}
#[derive(Debug, Clone, Serialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum AgentStatus {
Pending,
Running,
Completed,
Failed,
}
impl std::fmt::Display for AgentStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "pending"),
Self::Running => write!(f, "running"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
}
}
}
/// Pipeline stages for automatic story advancement.
#[derive(Debug, Clone, PartialEq)]
pub enum PipelineStage {
/// Coding agents (coder-1, coder-2, etc.)
Coder,
/// QA review agent
Qa,
/// Mergemaster agent
Mergemaster,
/// Supervisors and unknown agents — no automatic advancement.
Other,
}
/// Determine the pipeline stage from an agent name.
pub fn pipeline_stage(agent_name: &str) -> PipelineStage {
match agent_name {
"qa" => PipelineStage::Qa,
"mergemaster" => PipelineStage::Mergemaster,
name if name.starts_with("coder") => PipelineStage::Coder,
_ => PipelineStage::Other,
}
}
/// Determine the pipeline stage for a configured agent.
///
/// Prefers the explicit `stage` config field (added in Bug 150) over the
/// legacy name-based heuristic so that agents with non-standard names
/// (e.g. `qa-2`, `coder-opus`) are assigned to the correct stage.
pub(crate) fn agent_config_stage(cfg: &AgentConfig) -> PipelineStage {
match cfg.stage.as_deref() {
Some("coder") => PipelineStage::Coder,
Some("qa") => PipelineStage::Qa,
Some("mergemaster") => PipelineStage::Mergemaster,
Some(_) => PipelineStage::Other,
None => pipeline_stage(&cfg.name),
}
}
/// Completion report produced when acceptance gates are run.
///
/// Created automatically by the server when an agent process exits normally,
/// or via the internal `report_completion` method.
#[derive(Debug, Serialize, Clone)]
pub struct CompletionReport {
pub summary: String,
pub gates_passed: bool,
pub gate_output: String,
}
#[derive(Debug, Serialize, Clone)]
pub struct AgentInfo {
pub story_id: String,
pub agent_name: String,
pub status: AgentStatus,
pub session_id: Option<String>,
pub worktree_path: Option<String>,
pub base_branch: Option<String>,
pub completion: Option<CompletionReport>,
/// UUID identifying the persistent log file for this session.
pub log_session_id: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
// ── pipeline_stage tests ──────────────────────────────────────────────────
#[test]
fn pipeline_stage_detects_coders() {
assert_eq!(pipeline_stage("coder-1"), PipelineStage::Coder);
assert_eq!(pipeline_stage("coder-2"), PipelineStage::Coder);
assert_eq!(pipeline_stage("coder-3"), PipelineStage::Coder);
}
#[test]
fn pipeline_stage_detects_qa() {
assert_eq!(pipeline_stage("qa"), PipelineStage::Qa);
}
#[test]
fn pipeline_stage_detects_mergemaster() {
assert_eq!(pipeline_stage("mergemaster"), PipelineStage::Mergemaster);
}
#[test]
fn pipeline_stage_supervisor_is_other() {
assert_eq!(pipeline_stage("supervisor"), PipelineStage::Other);
assert_eq!(pipeline_stage("default"), PipelineStage::Other);
assert_eq!(pipeline_stage("unknown"), PipelineStage::Other);
}
}

4470
server/src/agents/pool.rs Normal file

File diff suppressed because it is too large Load Diff

490
server/src/agents/pty.rs Normal file
View File

@@ -0,0 +1,490 @@
use std::collections::HashMap;
use std::io::{BufRead, BufReader};
use std::sync::{Arc, Mutex};
use portable_pty::{ChildKiller, CommandBuilder, PtySize, native_pty_system};
use tokio::sync::broadcast;
use super::AgentEvent;
use crate::agent_log::AgentLogWriter;
use crate::slog;
use crate::slog_warn;
fn composite_key(story_id: &str, agent_name: &str) -> String {
format!("{story_id}:{agent_name}")
}
struct ChildKillerGuard {
killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
key: String,
}
impl Drop for ChildKillerGuard {
fn drop(&mut self) {
if let Ok(mut killers) = self.killers.lock() {
killers.remove(&self.key);
}
}
}
/// Spawn claude agent in a PTY and stream events through the broadcast channel.
#[allow(clippy::too_many_arguments)]
pub(super) async fn run_agent_pty_streaming(
story_id: &str,
agent_name: &str,
command: &str,
args: &[String],
prompt: &str,
cwd: &str,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Arc<Mutex<Vec<AgentEvent>>>,
log_writer: Option<Arc<Mutex<AgentLogWriter>>>,
inactivity_timeout_secs: u64,
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
) -> Result<Option<String>, String> {
let sid = story_id.to_string();
let aname = agent_name.to_string();
let cmd = command.to_string();
let args = args.to_vec();
let prompt = prompt.to_string();
let cwd = cwd.to_string();
let tx = tx.clone();
let event_log = event_log.clone();
tokio::task::spawn_blocking(move || {
run_agent_pty_blocking(
&sid,
&aname,
&cmd,
&args,
&prompt,
&cwd,
&tx,
&event_log,
log_writer.as_deref(),
inactivity_timeout_secs,
&child_killers,
)
})
.await
.map_err(|e| format!("Agent task panicked: {e}"))?
}
/// Dispatch a `stream_event` from Claude Code's `--include-partial-messages` output.
///
/// Extracts `thinking_delta` and `text_delta` from `content_block_delta` events
/// and routes them as `AgentEvent::Thinking` and `AgentEvent::Output` respectively.
/// This ensures thinking traces flow through the dedicated `ThinkingBlock` UI
/// component rather than appearing as unbounded regular output.
fn handle_agent_stream_event(
event: &serde_json::Value,
story_id: &str,
agent_name: &str,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Mutex<Vec<AgentEvent>>,
log_writer: Option<&Mutex<AgentLogWriter>>,
) {
let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
if event_type == "content_block_delta"
&& let Some(delta) = event.get("delta")
{
let delta_type = delta.get("type").and_then(|t| t.as_str()).unwrap_or("");
match delta_type {
"thinking_delta" => {
if let Some(thinking) = delta.get("thinking").and_then(|t| t.as_str()) {
emit_event(
AgentEvent::Thinking {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: thinking.to_string(),
},
tx,
event_log,
log_writer,
);
}
}
"text_delta" => {
if let Some(text) = delta.get("text").and_then(|t| t.as_str()) {
emit_event(
AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: text.to_string(),
},
tx,
event_log,
log_writer,
);
}
}
_ => {}
}
}
}
/// Helper to send an event to broadcast, event log, and optional persistent log file.
pub(super) fn emit_event(
event: AgentEvent,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Mutex<Vec<AgentEvent>>,
log_writer: Option<&Mutex<AgentLogWriter>>,
) {
if let Ok(mut log) = event_log.lock() {
log.push(event.clone());
}
if let Some(writer) = log_writer
&& let Ok(mut w) = writer.lock()
&& let Err(e) = w.write_event(&event)
{
eprintln!("[agent_log] Failed to write event to log file: {e}");
}
let _ = tx.send(event);
}
#[allow(clippy::too_many_arguments)]
fn run_agent_pty_blocking(
story_id: &str,
agent_name: &str,
command: &str,
args: &[String],
prompt: &str,
cwd: &str,
tx: &broadcast::Sender<AgentEvent>,
event_log: &Mutex<Vec<AgentEvent>>,
log_writer: Option<&Mutex<AgentLogWriter>>,
inactivity_timeout_secs: u64,
child_killers: &Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
) -> Result<Option<String>, String> {
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 50,
cols: 200,
pixel_width: 0,
pixel_height: 0,
})
.map_err(|e| format!("Failed to open PTY: {e}"))?;
let mut cmd = CommandBuilder::new(command);
// -p <prompt> must come first
cmd.arg("-p");
cmd.arg(prompt);
// Add configured args (e.g., --directory /path/to/worktree, --model, etc.)
for arg in args {
cmd.arg(arg);
}
cmd.arg("--output-format");
cmd.arg("stream-json");
cmd.arg("--verbose");
// Enable partial streaming so we receive thinking_delta and text_delta
// events in real-time, rather than only complete assistant events.
// Without this, thinking traces may not appear in the structured output
// and instead leak as unstructured PTY text.
cmd.arg("--include-partial-messages");
// Supervised agents don't need interactive permission prompts
cmd.arg("--permission-mode");
cmd.arg("bypassPermissions");
cmd.cwd(cwd);
cmd.env("NO_COLOR", "1");
// Allow spawning Claude Code from within a Claude Code session
cmd.env_remove("CLAUDECODE");
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
slog!("[agent:{story_id}:{agent_name}] Spawning {command} in {cwd} with args: {args:?}");
let mut child = pair
.slave
.spawn_command(cmd)
.map_err(|e| format!("Failed to spawn agent for {story_id}:{agent_name}: {e}"))?;
// Register the child killer so that kill_all_children() / stop_agent() can
// terminate this process on server shutdown, even if the blocking thread
// cannot be interrupted. The ChildKillerGuard deregisters on function exit.
let killer_key = composite_key(story_id, agent_name);
{
let killer = child.clone_killer();
if let Ok(mut killers) = child_killers.lock() {
killers.insert(killer_key.clone(), killer);
}
}
let _killer_guard = ChildKillerGuard {
killers: Arc::clone(child_killers),
key: killer_key,
};
drop(pair.slave);
let reader = pair
.master
.try_clone_reader()
.map_err(|e| format!("Failed to clone PTY reader: {e}"))?;
drop(pair.master);
// Spawn a reader thread to collect PTY output lines.
// We use a channel so the main thread can apply an inactivity deadline
// via recv_timeout: if no output arrives within the configured window
// the process is killed and the agent is marked Failed.
let (line_tx, line_rx) = std::sync::mpsc::channel::<std::io::Result<String>>();
std::thread::spawn(move || {
let buf_reader = BufReader::new(reader);
for line in buf_reader.lines() {
if line_tx.send(line).is_err() {
break;
}
}
});
let timeout_dur = if inactivity_timeout_secs > 0 {
Some(std::time::Duration::from_secs(inactivity_timeout_secs))
} else {
None
};
let mut session_id: Option<String> = None;
loop {
let recv_result = match timeout_dur {
Some(dur) => line_rx.recv_timeout(dur),
None => line_rx
.recv()
.map_err(|_| std::sync::mpsc::RecvTimeoutError::Disconnected),
};
let line = match recv_result {
Ok(Ok(l)) => l,
Ok(Err(_)) => {
// IO error reading from PTY — treat as EOF.
break;
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
// Reader thread exited (EOF from PTY).
break;
}
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
slog_warn!(
"[agent:{story_id}:{agent_name}] Inactivity timeout after \
{inactivity_timeout_secs}s with no output. Killing process."
);
let _ = child.kill();
let _ = child.wait();
return Err(format!(
"Agent inactivity timeout: no output received for {inactivity_timeout_secs}s"
));
}
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
// Try to parse as JSON
let json: serde_json::Value = match serde_json::from_str(trimmed) {
Ok(j) => j,
Err(_) => {
// Non-JSON output (terminal escapes etc.) — send as raw output
emit_event(
AgentEvent::Output {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
text: trimmed.to_string(),
},
tx,
event_log,
log_writer,
);
continue;
}
};
let event_type = json.get("type").and_then(|t| t.as_str()).unwrap_or("");
match event_type {
"system" => {
session_id = json
.get("session_id")
.and_then(|s| s.as_str())
.map(|s| s.to_string());
}
// With --include-partial-messages, thinking and text arrive
// incrementally via stream_event → content_block_delta. Handle
// them here for real-time streaming to the frontend.
"stream_event" => {
if let Some(event) = json.get("event") {
handle_agent_stream_event(
event,
story_id,
agent_name,
tx,
event_log,
log_writer,
);
}
}
// Complete assistant events are skipped for content extraction
// because thinking and text already arrived via stream_event.
// The raw JSON is still forwarded as AgentJson below.
"assistant" | "user" | "result" => {}
_ => {}
}
// Forward all JSON events
emit_event(
AgentEvent::AgentJson {
story_id: story_id.to_string(),
agent_name: agent_name.to_string(),
data: json,
},
tx,
event_log,
log_writer,
);
}
let _ = child.kill();
let _ = child.wait();
slog!(
"[agent:{story_id}:{agent_name}] Done. Session: {:?}",
session_id
);
Ok(session_id)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::agents::AgentEvent;
#[test]
fn test_emit_event_writes_to_log_writer() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let log_writer =
AgentLogWriter::new(root, "42_story_foo", "coder-1", "sess-emit").unwrap();
let log_mutex = Mutex::new(log_writer);
let (tx, _rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = AgentEvent::Status {
story_id: "42_story_foo".to_string(),
agent_name: "coder-1".to_string(),
status: "running".to_string(),
};
emit_event(event, &tx, &event_log, Some(&log_mutex));
// Verify event was added to in-memory log
let mem_events = event_log.lock().unwrap();
assert_eq!(mem_events.len(), 1);
drop(mem_events);
// Verify event was written to the log file
let log_path =
crate::agent_log::log_file_path(root, "42_story_foo", "coder-1", "sess-emit");
let entries = crate::agent_log::read_log(&log_path).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].event["type"], "status");
assert_eq!(entries[0].event["status"], "running");
}
// ── bug 167: handle_agent_stream_event routes thinking/text correctly ───
#[test]
fn stream_event_thinking_delta_emits_thinking_event() {
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = serde_json::json!({
"type": "content_block_delta",
"delta": {"type": "thinking_delta", "thinking": "Let me analyze this..."}
});
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
let received = rx.try_recv().unwrap();
match received {
AgentEvent::Thinking {
story_id,
agent_name,
text,
} => {
assert_eq!(story_id, "s1");
assert_eq!(agent_name, "coder-1");
assert_eq!(text, "Let me analyze this...");
}
other => panic!("Expected Thinking event, got: {other:?}"),
}
}
#[test]
fn stream_event_text_delta_emits_output_event() {
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = serde_json::json!({
"type": "content_block_delta",
"delta": {"type": "text_delta", "text": "Here is the result."}
});
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
let received = rx.try_recv().unwrap();
match received {
AgentEvent::Output {
story_id,
agent_name,
text,
} => {
assert_eq!(story_id, "s1");
assert_eq!(agent_name, "coder-1");
assert_eq!(text, "Here is the result.");
}
other => panic!("Expected Output event, got: {other:?}"),
}
}
#[test]
fn stream_event_input_json_delta_ignored() {
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = serde_json::json!({
"type": "content_block_delta",
"delta": {"type": "input_json_delta", "partial_json": "{\"file\":"}
});
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
// No event should be emitted for tool argument deltas
assert!(rx.try_recv().is_err());
}
#[test]
fn stream_event_non_delta_type_ignored() {
let (tx, mut rx) = broadcast::channel::<AgentEvent>(64);
let event_log: Mutex<Vec<AgentEvent>> = Mutex::new(Vec::new());
let event = serde_json::json!({
"type": "message_start",
"message": {"role": "assistant"}
});
handle_agent_stream_event(&event, "s1", "coder-1", &tx, &event_log, None);
assert!(rx.try_recv().is_err());
}
}