story-kit: merge 149_bug_web_ui_does_not_update_when_agents_are_started_or_stopped

This commit is contained in:
Dave
2026-02-24 23:09:13 +00:00
parent 51303c07d8
commit dc631d1933
8 changed files with 187 additions and 108 deletions

View File

@@ -1,4 +1,5 @@
use crate::agent_log::AgentLogWriter;
use crate::io::watcher::WatcherEvent;
use crate::slog;
use crate::slog_error;
use crate::slog_warn;
@@ -253,6 +254,11 @@ pub struct AgentPool {
/// Used to terminate child processes on server shutdown or agent stop, preventing
/// orphaned Claude Code processes from running after the server exits.
child_killers: Arc<Mutex<HashMap<String, Box<dyn ChildKiller + Send + Sync>>>>,
/// Broadcast channel for notifying WebSocket clients of agent state changes.
/// When an agent transitions state (Pending, Running, Completed, Failed, Stopped),
/// an `AgentStateChanged` event is emitted so the frontend can refresh the
/// pipeline board without waiting for a filesystem event.
watcher_tx: broadcast::Sender<WatcherEvent>,
}
/// RAII guard that removes a child killer from the registry on drop.
@@ -273,14 +279,28 @@ impl Drop for ChildKillerGuard {
}
impl AgentPool {
pub fn new(port: u16) -> Self {
pub fn new(port: u16, watcher_tx: broadcast::Sender<WatcherEvent>) -> Self {
Self {
agents: Arc::new(Mutex::new(HashMap::new())),
port,
child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx,
}
}
/// Create a pool with a dummy watcher channel for unit tests.
#[cfg(test)]
pub fn new_test(port: u16) -> Self {
let (watcher_tx, _) = broadcast::channel(16);
Self::new(port, watcher_tx)
}
/// Notify WebSocket clients that agent state has changed, so the pipeline
/// board and agent panel can refresh.
fn notify_agent_state_changed(watcher_tx: &broadcast::Sender<WatcherEvent>) {
let _ = watcher_tx.send(WatcherEvent::AgentStateChanged);
}
/// Kill all active PTY child processes.
///
/// Called on server shutdown to prevent orphaned Claude Code processes from
@@ -423,6 +443,9 @@ impl AgentPool {
}
let mut pending_guard = PendingGuard::new(self.agents.clone(), key.clone());
// Notify WebSocket clients that a new agent is pending.
Self::notify_agent_state_changed(&self.watcher_tx);
let _ = tx.send(AgentEvent::Status {
story_id: story_id.to_string(),
agent_name: resolved_name.clone(),
@@ -451,6 +474,7 @@ impl AgentPool {
let port_for_task = self.port;
let log_writer_clone = log_writer.clone();
let child_killers_clone = self.child_killers.clone();
let watcher_tx_clone = self.watcher_tx.clone();
// Spawn the background task. Worktree creation and agent launch happen here
// so `start_agent` returns immediately after registering the agent as
@@ -479,6 +503,7 @@ impl AgentPool {
agent_name: aname.clone(),
message: format!("Failed to create worktree: {e}"),
});
Self::notify_agent_state_changed(&watcher_tx_clone);
return;
}
};
@@ -512,6 +537,7 @@ impl AgentPool {
agent_name: aname.clone(),
message: format!("Failed to render agent args: {e}"),
});
Self::notify_agent_state_changed(&watcher_tx_clone);
return;
}
};
@@ -534,6 +560,7 @@ impl AgentPool {
agent_name: aname.clone(),
status: "running".to_string(),
});
Self::notify_agent_state_changed(&watcher_tx_clone);
// Step 4: launch the agent process.
match run_agent_pty_streaming(
@@ -562,6 +589,7 @@ impl AgentPool {
session_id,
)
.await;
Self::notify_agent_state_changed(&watcher_tx_clone);
}
Err(e) => {
if let Ok(mut agents) = agents_ref.lock()
@@ -575,6 +603,7 @@ impl AgentPool {
agent_name: aname.clone(),
message: e,
});
Self::notify_agent_state_changed(&watcher_tx_clone);
}
}
});
@@ -653,6 +682,9 @@ impl AgentPool {
agents.remove(&key);
}
// Notify WebSocket clients so pipeline board and agent panel update.
Self::notify_agent_state_changed(&self.watcher_tx);
Ok(())
}
@@ -1138,6 +1170,7 @@ impl AgentPool {
agents: Arc::clone(&self.agents),
port: self.port,
child_killers: Arc::clone(&self.child_killers),
watcher_tx: self.watcher_tx.clone(),
};
let sid = story_id.to_string();
let aname = agent_name.to_string();
@@ -2077,10 +2110,12 @@ fn spawn_pipeline_advance(
let sid = story_id.to_string();
let aname = agent_name.to_string();
tokio::spawn(async move {
let (watcher_tx, _) = broadcast::channel(16);
let pool = AgentPool {
agents,
port,
child_killers: Arc::new(Mutex::new(HashMap::new())),
watcher_tx,
};
pool.run_pipeline_advance_for_completed_agent(&sid, &aname)
.await;
@@ -3469,7 +3504,7 @@ mod tests {
#[tokio::test]
async fn wait_for_agent_returns_immediately_if_completed() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("s1", "bot", AgentStatus::Completed);
let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap();
@@ -3480,7 +3515,7 @@ mod tests {
#[tokio::test]
async fn wait_for_agent_returns_immediately_if_failed() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("s2", "bot", AgentStatus::Failed);
let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap();
@@ -3489,7 +3524,7 @@ mod tests {
#[tokio::test]
async fn wait_for_agent_completes_on_done_event() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running);
// Send Done event after a short delay
@@ -3514,7 +3549,7 @@ mod tests {
#[tokio::test]
async fn wait_for_agent_times_out() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("s4", "bot", AgentStatus::Running);
let result = pool.wait_for_agent("s4", "bot", 50).await;
@@ -3525,14 +3560,14 @@ mod tests {
#[tokio::test]
async fn wait_for_agent_errors_for_nonexistent() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let result = pool.wait_for_agent("no_story", "no_bot", 100).await;
assert!(result.is_err());
}
#[tokio::test]
async fn wait_for_agent_completes_on_stopped_status_event() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running);
let tx_clone = tx.clone();
@@ -3553,7 +3588,7 @@ mod tests {
#[tokio::test]
async fn report_completion_rejects_nonexistent_agent() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let result = pool
.report_completion("no_story", "no_bot", "done")
.await;
@@ -3564,7 +3599,7 @@ mod tests {
#[tokio::test]
async fn report_completion_rejects_non_running_agent() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("s6", "bot", AgentStatus::Completed);
let result = pool.report_completion("s6", "bot", "done").await;
@@ -3599,7 +3634,7 @@ mod tests {
// Write an uncommitted file
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf());
let result = pool.report_completion("s7", "bot", "done").await;
@@ -3615,7 +3650,7 @@ mod tests {
#[tokio::test]
async fn server_owned_completion_skips_when_already_completed() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let report = CompletionReport {
summary: "Already done".to_string(),
gates_passed: true,
@@ -3662,7 +3697,7 @@ mod tests {
let repo = tmp.path();
init_git_repo(repo);
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path(
"s11",
"coder-1",
@@ -3717,7 +3752,7 @@ mod tests {
// Create an uncommitted file.
fs::write(repo.join("dirty.txt"), "not committed").unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_path(
"s12",
"coder-1",
@@ -3747,7 +3782,7 @@ mod tests {
#[tokio::test]
async fn server_owned_completion_nonexistent_agent_is_noop() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// Should not panic or error — just silently return.
run_server_owned_completion(&pool.agents, pool.port, "nonexistent", "bot", None)
.await;
@@ -3909,7 +3944,7 @@ mod tests {
fs::create_dir_all(&current).unwrap();
fs::write(current.join("50_story_test.md"), "test").unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_completion(
"50_story_test",
"coder-1",
@@ -3949,7 +3984,7 @@ mod tests {
fs::create_dir_all(&qa_dir).unwrap();
fs::write(qa_dir.join("51_story_test.md"), "test").unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_completion(
"51_story_test",
"qa",
@@ -3986,7 +4021,7 @@ mod tests {
fs::create_dir_all(&current).unwrap();
fs::write(current.join("52_story_test.md"), "test").unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent_with_completion(
"52_story_test",
"supervisor",
@@ -4132,7 +4167,7 @@ mod tests {
let repo = tmp.path();
init_git_repo(repo);
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// branch feature/story-99_nonexistent does not exist
let result = pool
.merge_agent_work(repo, "99_nonexistent")
@@ -4192,7 +4227,7 @@ mod tests {
.output()
.unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let report = pool.merge_agent_work(repo, "23_test").await.unwrap();
// Merge should succeed (gates will run but cargo/pnpm results will depend on env)
@@ -4460,7 +4495,7 @@ mod tests {
fn is_story_assigned_returns_true_for_running_coder() {
use crate::config::ProjectConfig;
let config = ProjectConfig::default();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
@@ -4490,7 +4525,7 @@ mod tests {
fn is_story_assigned_returns_false_for_completed_agent() {
use crate::config::ProjectConfig;
let config = ProjectConfig::default();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Completed);
let agents = pool.agents.lock().unwrap();
@@ -4516,7 +4551,7 @@ name = "coder-2"
)
.unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
pool.inject_test_agent("s2", "coder-2", AgentStatus::Running);
@@ -4540,7 +4575,7 @@ name = "coder-3"
)
.unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// coder-1 is busy, coder-2 is free
pool.inject_test_agent("s1", "coder-1", AgentStatus::Running);
@@ -4560,7 +4595,7 @@ name = "coder-1"
)
.unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// coder-1 completed its previous story — it's free for a new one
pool.inject_test_agent("s1", "coder-1", AgentStatus::Completed);
@@ -4640,7 +4675,7 @@ stage = "qa"
)
.unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("42_story_foo", "qa-2", AgentStatus::Running);
let agents = pool.agents.lock().unwrap();
@@ -4734,7 +4769,7 @@ stage = "qa"
)
.unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// Simulate qa already running on story-a.
pool.inject_test_agent("story-a", "qa", AgentStatus::Running);
@@ -4771,7 +4806,7 @@ stage = "qa"
)
.unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// Previous run completed — should NOT block a new story.
pool.inject_test_agent("story-a", "qa", AgentStatus::Completed);
@@ -4859,7 +4894,7 @@ stage = "qa"
#[tokio::test]
async fn reconcile_on_startup_noop_when_no_worktrees() {
let tmp = tempfile::tempdir().unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let (tx, _rx) = broadcast::channel(16);
// Should not panic; no worktrees to reconcile.
pool.reconcile_on_startup(tmp.path(), &tx).await;
@@ -4868,7 +4903,7 @@ stage = "qa"
#[tokio::test]
async fn reconcile_on_startup_emits_done_event() {
let tmp = tempfile::tempdir().unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let (tx, mut rx) = broadcast::channel::<ReconciliationEvent>(16);
pool.reconcile_on_startup(tmp.path(), &tx).await;
@@ -4901,7 +4936,7 @@ stage = "qa"
fs::create_dir_all(&wt_dir).unwrap();
init_git_repo(&wt_dir);
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let (tx, _rx) = broadcast::channel(16);
pool.reconcile_on_startup(root, &tx).await;
@@ -4985,7 +5020,7 @@ stage = "qa"
"test setup: worktree should have committed work"
);
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let (tx, _rx) = broadcast::channel(16);
pool.reconcile_on_startup(root, &tx).await;
@@ -5160,7 +5195,7 @@ stage = "qa"
)
.unwrap();
let pool = AgentPool::new(3099);
let pool = AgentPool::new_test(3099);
let result = pool
.start_agent(root, "50_story_test", Some("qa"), None)
@@ -5222,7 +5257,7 @@ stage = "qa"
)
.unwrap();
let pool = AgentPool::new(3099);
let pool = AgentPool::new_test(3099);
// Manually inject a Running agent (simulates successful start).
pool.inject_test_agent("story-x", "qa", AgentStatus::Running);
@@ -5258,7 +5293,7 @@ stage = "qa"
fs::create_dir_all(&sk_dir).unwrap();
fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"coder-1\"\n").unwrap();
let pool = AgentPool::new(3099);
let pool = AgentPool::new_test(3099);
// Simulate what the winning concurrent call would have done: insert a
// Pending entry for coder-1 on story-86.
@@ -5308,7 +5343,7 @@ stage = "qa"
)
.unwrap();
let pool = Arc::new(AgentPool::new(3099));
let pool = Arc::new(AgentPool::new_test(3099));
let pool1 = pool.clone();
let root1 = root.clone();
@@ -5379,7 +5414,7 @@ stage = "qa"
)
.unwrap();
let pool = Arc::new(AgentPool::new(3099));
let pool = Arc::new(AgentPool::new_test(3099));
// Run two concurrent auto_assign calls.
let pool1 = pool.clone();
@@ -5748,7 +5783,7 @@ theirs
.output()
.unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let report = pool.merge_agent_work(repo, "42_story_foo").await.unwrap();
// Master should NEVER have conflict markers, regardless of merge outcome.
@@ -5815,7 +5850,7 @@ theirs
/// as Failed, emitting an Error event so that `wait_for_agent` unblocks.
#[tokio::test]
async fn watchdog_detects_orphaned_running_agent() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let handle = tokio::spawn(async {});
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
@@ -5849,7 +5884,7 @@ theirs
#[test]
fn remove_agents_for_story_removes_all_entries() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed);
pool.inject_test_agent("story_a", "qa", AgentStatus::Failed);
pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running);
@@ -5864,7 +5899,7 @@ theirs
#[test]
fn remove_agents_for_story_returns_zero_when_no_match() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running);
let removed = pool.remove_agents_for_story("nonexistent");
@@ -5878,7 +5913,7 @@ theirs
#[test]
fn reap_expired_agents_removes_old_completed_entries() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// Inject a completed agent with an artificial old completed_at.
{
@@ -5922,7 +5957,7 @@ theirs
#[test]
fn reap_expired_agents_removes_old_failed_entries() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// Inject a failed agent with an old completed_at.
{
@@ -5954,7 +5989,7 @@ theirs
#[test]
fn reap_expired_agents_skips_running_entries() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("running_story", "coder-1", AgentStatus::Running);
let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(0));
@@ -5975,7 +6010,7 @@ theirs
fs::create_dir_all(&current).unwrap();
fs::write(current.join("60_story_cleanup.md"), "test").unwrap();
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed);
pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed);
pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running);
@@ -6176,7 +6211,7 @@ theirs
#[tokio::test]
async fn check_orphaned_agents_returns_count_of_orphaned_agents() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// Spawn two tasks that finish immediately.
let h1 = tokio::spawn(async {});
@@ -6194,7 +6229,7 @@ theirs
#[test]
fn check_orphaned_agents_returns_zero_when_no_orphans() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// Inject agents in terminal states — not orphaned.
pool.inject_test_agent("story_a", "coder", AgentStatus::Completed);
pool.inject_test_agent("story_b", "qa", AgentStatus::Failed);
@@ -6208,7 +6243,7 @@ theirs
// This test verifies the contract that `check_orphaned_agents` returns
// a non-zero count when orphans exist, which the watchdog uses to
// decide whether to trigger auto-assign (bug 161).
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let handle = tokio::spawn(async {});
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
@@ -6262,7 +6297,7 @@ theirs
#[test]
fn kill_all_children_is_safe_on_empty_pool() {
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
// Should not panic or deadlock on an empty registry.
pool.kill_all_children();
assert_eq!(pool.child_killer_count(), 0);
@@ -6271,7 +6306,7 @@ theirs
#[test]
fn kill_all_children_kills_real_process() {
// GIVEN: a real PTY child process (sleep 100) with its killer registered.
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let pty_system = native_pty_system();
let pair = pty_system
@@ -6315,7 +6350,7 @@ theirs
#[test]
fn kill_all_children_clears_registry() {
// GIVEN: a pool with one registered killer.
let pool = AgentPool::new(3001);
let pool = AgentPool::new_test(3001);
let pty_system = native_pty_system();
let pair = pty_system