From dc631d1933fde715641cb133b76259640a237066 Mon Sep 17 00:00:00 2001 From: Dave Date: Tue, 24 Feb 2026 23:09:13 +0000 Subject: [PATCH] story-kit: merge 149_bug_web_ui_does_not_update_when_agents_are_started_or_stopped --- frontend/src/api/client.ts | 6 ++ frontend/src/components/AgentPanel.tsx | 112 +++++++++++--------- frontend/src/components/Chat.tsx | 9 +- server/src/agents.rs | 135 ++++++++++++++++--------- server/src/http/context.rs | 2 +- server/src/http/ws.rs | 18 +++- server/src/io/watcher.rs | 4 + server/src/main.rs | 9 +- 8 files changed, 187 insertions(+), 108 deletions(-) diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index be14d2a..7ec2745 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -61,6 +61,8 @@ export type WsResponse = } /** `.story_kit/project.toml` was modified; re-fetch the agent roster. */ | { type: "agent_config_changed" } + /** An agent started, stopped, or changed state; re-fetch agent list. */ + | { type: "agent_state_changed" } | { type: "tool_activity"; tool_name: string } /** Heartbeat response confirming the connection is alive. */ | { type: "pong" } @@ -282,6 +284,7 @@ export class ChatWebSocket { message: string, ) => void; private onAgentConfigChanged?: () => void; + private onAgentStateChanged?: () => void; private onOnboardingStatus?: (needsOnboarding: boolean) => void; private connected = false; private closeTimer?: number; @@ -358,6 +361,7 @@ export class ChatWebSocket { data.message, ); if (data.type === "agent_config_changed") this.onAgentConfigChanged?.(); + if (data.type === "agent_state_changed") this.onAgentStateChanged?.(); if (data.type === "onboarding_status") this.onOnboardingStatus?.(data.needs_onboarding); if (data.type === "pong") { @@ -410,6 +414,7 @@ export class ChatWebSocket { message: string, ) => void; onAgentConfigChanged?: () => void; + onAgentStateChanged?: () => void; onOnboardingStatus?: (needsOnboarding: boolean) => void; }, wsPath = DEFAULT_WS_PATH, @@ -423,6 +428,7 @@ export class ChatWebSocket { this.onActivity = handlers.onActivity; this.onReconciliationProgress = handlers.onReconciliationProgress; this.onAgentConfigChanged = handlers.onAgentConfigChanged; + this.onAgentStateChanged = handlers.onAgentStateChanged; this.onOnboardingStatus = handlers.onOnboardingStatus; this.wsPath = wsPath; this.shouldReconnect = true; diff --git a/frontend/src/components/AgentPanel.tsx b/frontend/src/components/AgentPanel.tsx index 6db2a29..29fe437 100644 --- a/frontend/src/components/AgentPanel.tsx +++ b/frontend/src/components/AgentPanel.tsx @@ -136,9 +136,14 @@ function agentKey(storyId: string, agentName: string): string { interface AgentPanelProps { /** Increment this to trigger a re-fetch of the agent roster. */ configVersion?: number; + /** Increment this to trigger a re-fetch of the agent list (agent state changed). */ + stateVersion?: number; } -export function AgentPanel({ configVersion = 0 }: AgentPanelProps) { +export function AgentPanel({ + configVersion = 0, + stateVersion = 0, +}: AgentPanelProps) { const { hiddenRosterAgents } = useLozengeFly(); const [agents, setAgents] = useState>({}); const [roster, setRoster] = useState([]); @@ -157,52 +162,6 @@ export function AgentPanel({ configVersion = 0 }: AgentPanelProps) { .catch((err) => console.error("Failed to load agent config:", err)); }, [configVersion]); - // Load existing agents and editor preference on mount - useEffect(() => { - agentsApi - .listAgents() - .then((agentList) => { - const agentMap: Record = {}; - const now = Date.now(); - for (const a of agentList) { - const key = agentKey(a.story_id, a.agent_name); - const isTerminal = a.status === "completed" || a.status === "failed"; - agentMap[key] = { - agentName: a.agent_name, - status: a.status, - log: [], - thinking: "", - thinkingDone: false, - sessionId: a.session_id, - worktreePath: a.worktree_path, - baseBranch: a.base_branch, - terminalAt: isTerminal ? now : null, - }; - if (a.status === "running" || a.status === "pending") { - subscribeToAgent(a.story_id, a.agent_name); - } - } - setAgents(agentMap); - setLastRefresh(new Date()); - }) - .catch((err) => console.error("Failed to load agents:", err)); - - settingsApi - .getEditorCommand() - .then((s) => { - setEditorCommand(s.editor_command); - setEditorInput(s.editor_command ?? ""); - }) - .catch((err) => console.error("Failed to load editor command:", err)); - - return () => { - for (const cleanup of Object.values(cleanupRefs.current)) { - cleanup(); - } - }; - // eslint-disable-next-line react-hooks/exhaustive-deps - }, []); - const subscribeToAgent = useCallback((storyId: string, agentName: string) => { const key = agentKey(storyId, agentName); cleanupRefs.current[key]?.(); @@ -296,6 +255,65 @@ export function AgentPanel({ configVersion = 0 }: AgentPanelProps) { cleanupRefs.current[key] = cleanup; }, []); + /** Shared helper: fetch the agent list and update state + SSE subscriptions. */ + const refreshAgents = useCallback(() => { + agentsApi + .listAgents() + .then((agentList) => { + const agentMap: Record = {}; + const now = Date.now(); + for (const a of agentList) { + const key = agentKey(a.story_id, a.agent_name); + const isTerminal = a.status === "completed" || a.status === "failed"; + agentMap[key] = { + agentName: a.agent_name, + status: a.status, + log: [], + thinking: "", + thinkingDone: false, + sessionId: a.session_id, + worktreePath: a.worktree_path, + baseBranch: a.base_branch, + terminalAt: isTerminal ? now : null, + }; + if (a.status === "running" || a.status === "pending") { + subscribeToAgent(a.story_id, a.agent_name); + } + } + setAgents(agentMap); + setLastRefresh(new Date()); + }) + .catch((err) => console.error("Failed to load agents:", err)); + }, [subscribeToAgent]); + + // Load existing agents and editor preference on mount + useEffect(() => { + refreshAgents(); + + settingsApi + .getEditorCommand() + .then((s) => { + setEditorCommand(s.editor_command); + setEditorInput(s.editor_command ?? ""); + }) + .catch((err) => console.error("Failed to load editor command:", err)); + + return () => { + for (const cleanup of Object.values(cleanupRefs.current)) { + cleanup(); + } + }; + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + + // Re-fetch agent list when agent state changes (via WebSocket notification). + // Skip the initial render (stateVersion=0) since the mount effect handles that. + useEffect(() => { + if (stateVersion > 0) { + refreshAgents(); + } + }, [stateVersion, refreshAgents]); + const handleSaveEditor = async () => { try { const trimmed = editorInput.trim() || null; diff --git a/frontend/src/components/Chat.tsx b/frontend/src/components/Chat.tsx index a72033f..382fc0b 100644 --- a/frontend/src/components/Chat.tsx +++ b/frontend/src/components/Chat.tsx @@ -89,6 +89,7 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { >([]); const reconciliationEventIdRef = useRef(0); const [agentConfigVersion, setAgentConfigVersion] = useState(0); + const [agentStateVersion, setAgentStateVersion] = useState(0); const [needsOnboarding, setNeedsOnboarding] = useState(false); const onboardingTriggeredRef = useRef(false); const [queuedMessages, setQueuedMessages] = useState< @@ -258,6 +259,9 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { onAgentConfigChanged: () => { setAgentConfigVersion((v) => v + 1); }, + onAgentStateChanged: () => { + setAgentStateVersion((v) => v + 1); + }, onOnboardingStatus: (onboarding: boolean) => { setNeedsOnboarding(onboarding); }, @@ -1065,7 +1069,10 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { }} > - + diff --git a/server/src/agents.rs b/server/src/agents.rs index f96b90b..b67054f 100644 --- a/server/src/agents.rs +++ b/server/src/agents.rs @@ -1,4 +1,5 @@ use crate::agent_log::AgentLogWriter; +use crate::io::watcher::WatcherEvent; use crate::slog; use crate::slog_error; use crate::slog_warn; @@ -253,6 +254,11 @@ pub struct AgentPool { /// Used to terminate child processes on server shutdown or agent stop, preventing /// orphaned Claude Code processes from running after the server exits. child_killers: Arc>>>, + /// Broadcast channel for notifying WebSocket clients of agent state changes. + /// When an agent transitions state (Pending, Running, Completed, Failed, Stopped), + /// an `AgentStateChanged` event is emitted so the frontend can refresh the + /// pipeline board without waiting for a filesystem event. + watcher_tx: broadcast::Sender, } /// RAII guard that removes a child killer from the registry on drop. @@ -273,14 +279,28 @@ impl Drop for ChildKillerGuard { } impl AgentPool { - pub fn new(port: u16) -> Self { + pub fn new(port: u16, watcher_tx: broadcast::Sender) -> Self { Self { agents: Arc::new(Mutex::new(HashMap::new())), port, child_killers: Arc::new(Mutex::new(HashMap::new())), + watcher_tx, } } + /// Create a pool with a dummy watcher channel for unit tests. + #[cfg(test)] + pub fn new_test(port: u16) -> Self { + let (watcher_tx, _) = broadcast::channel(16); + Self::new(port, watcher_tx) + } + + /// Notify WebSocket clients that agent state has changed, so the pipeline + /// board and agent panel can refresh. + fn notify_agent_state_changed(watcher_tx: &broadcast::Sender) { + let _ = watcher_tx.send(WatcherEvent::AgentStateChanged); + } + /// Kill all active PTY child processes. /// /// Called on server shutdown to prevent orphaned Claude Code processes from @@ -423,6 +443,9 @@ impl AgentPool { } let mut pending_guard = PendingGuard::new(self.agents.clone(), key.clone()); + // Notify WebSocket clients that a new agent is pending. + Self::notify_agent_state_changed(&self.watcher_tx); + let _ = tx.send(AgentEvent::Status { story_id: story_id.to_string(), agent_name: resolved_name.clone(), @@ -451,6 +474,7 @@ impl AgentPool { let port_for_task = self.port; let log_writer_clone = log_writer.clone(); let child_killers_clone = self.child_killers.clone(); + let watcher_tx_clone = self.watcher_tx.clone(); // Spawn the background task. Worktree creation and agent launch happen here // so `start_agent` returns immediately after registering the agent as @@ -479,6 +503,7 @@ impl AgentPool { agent_name: aname.clone(), message: format!("Failed to create worktree: {e}"), }); + Self::notify_agent_state_changed(&watcher_tx_clone); return; } }; @@ -512,6 +537,7 @@ impl AgentPool { agent_name: aname.clone(), message: format!("Failed to render agent args: {e}"), }); + Self::notify_agent_state_changed(&watcher_tx_clone); return; } }; @@ -534,6 +560,7 @@ impl AgentPool { agent_name: aname.clone(), status: "running".to_string(), }); + Self::notify_agent_state_changed(&watcher_tx_clone); // Step 4: launch the agent process. match run_agent_pty_streaming( @@ -562,6 +589,7 @@ impl AgentPool { session_id, ) .await; + Self::notify_agent_state_changed(&watcher_tx_clone); } Err(e) => { if let Ok(mut agents) = agents_ref.lock() @@ -575,6 +603,7 @@ impl AgentPool { agent_name: aname.clone(), message: e, }); + Self::notify_agent_state_changed(&watcher_tx_clone); } } }); @@ -653,6 +682,9 @@ impl AgentPool { agents.remove(&key); } + // Notify WebSocket clients so pipeline board and agent panel update. + Self::notify_agent_state_changed(&self.watcher_tx); + Ok(()) } @@ -1138,6 +1170,7 @@ impl AgentPool { agents: Arc::clone(&self.agents), port: self.port, child_killers: Arc::clone(&self.child_killers), + watcher_tx: self.watcher_tx.clone(), }; let sid = story_id.to_string(); let aname = agent_name.to_string(); @@ -2077,10 +2110,12 @@ fn spawn_pipeline_advance( let sid = story_id.to_string(); let aname = agent_name.to_string(); tokio::spawn(async move { + let (watcher_tx, _) = broadcast::channel(16); let pool = AgentPool { agents, port, child_killers: Arc::new(Mutex::new(HashMap::new())), + watcher_tx, }; pool.run_pipeline_advance_for_completed_agent(&sid, &aname) .await; @@ -3469,7 +3504,7 @@ mod tests { #[tokio::test] async fn wait_for_agent_returns_immediately_if_completed() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("s1", "bot", AgentStatus::Completed); let info = pool.wait_for_agent("s1", "bot", 1000).await.unwrap(); @@ -3480,7 +3515,7 @@ mod tests { #[tokio::test] async fn wait_for_agent_returns_immediately_if_failed() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("s2", "bot", AgentStatus::Failed); let info = pool.wait_for_agent("s2", "bot", 1000).await.unwrap(); @@ -3489,7 +3524,7 @@ mod tests { #[tokio::test] async fn wait_for_agent_completes_on_done_event() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let tx = pool.inject_test_agent("s3", "bot", AgentStatus::Running); // Send Done event after a short delay @@ -3514,7 +3549,7 @@ mod tests { #[tokio::test] async fn wait_for_agent_times_out() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("s4", "bot", AgentStatus::Running); let result = pool.wait_for_agent("s4", "bot", 50).await; @@ -3525,14 +3560,14 @@ mod tests { #[tokio::test] async fn wait_for_agent_errors_for_nonexistent() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let result = pool.wait_for_agent("no_story", "no_bot", 100).await; assert!(result.is_err()); } #[tokio::test] async fn wait_for_agent_completes_on_stopped_status_event() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let tx = pool.inject_test_agent("s5", "bot", AgentStatus::Running); let tx_clone = tx.clone(); @@ -3553,7 +3588,7 @@ mod tests { #[tokio::test] async fn report_completion_rejects_nonexistent_agent() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let result = pool .report_completion("no_story", "no_bot", "done") .await; @@ -3564,7 +3599,7 @@ mod tests { #[tokio::test] async fn report_completion_rejects_non_running_agent() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("s6", "bot", AgentStatus::Completed); let result = pool.report_completion("s6", "bot", "done").await; @@ -3599,7 +3634,7 @@ mod tests { // Write an uncommitted file fs::write(repo.join("dirty.txt"), "not committed").unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent_with_path("s7", "bot", AgentStatus::Running, repo.to_path_buf()); let result = pool.report_completion("s7", "bot", "done").await; @@ -3615,7 +3650,7 @@ mod tests { #[tokio::test] async fn server_owned_completion_skips_when_already_completed() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let report = CompletionReport { summary: "Already done".to_string(), gates_passed: true, @@ -3662,7 +3697,7 @@ mod tests { let repo = tmp.path(); init_git_repo(repo); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent_with_path( "s11", "coder-1", @@ -3717,7 +3752,7 @@ mod tests { // Create an uncommitted file. fs::write(repo.join("dirty.txt"), "not committed").unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent_with_path( "s12", "coder-1", @@ -3747,7 +3782,7 @@ mod tests { #[tokio::test] async fn server_owned_completion_nonexistent_agent_is_noop() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // Should not panic or error — just silently return. run_server_owned_completion(&pool.agents, pool.port, "nonexistent", "bot", None) .await; @@ -3909,7 +3944,7 @@ mod tests { fs::create_dir_all(¤t).unwrap(); fs::write(current.join("50_story_test.md"), "test").unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent_with_completion( "50_story_test", "coder-1", @@ -3949,7 +3984,7 @@ mod tests { fs::create_dir_all(&qa_dir).unwrap(); fs::write(qa_dir.join("51_story_test.md"), "test").unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent_with_completion( "51_story_test", "qa", @@ -3986,7 +4021,7 @@ mod tests { fs::create_dir_all(¤t).unwrap(); fs::write(current.join("52_story_test.md"), "test").unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent_with_completion( "52_story_test", "supervisor", @@ -4132,7 +4167,7 @@ mod tests { let repo = tmp.path(); init_git_repo(repo); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // branch feature/story-99_nonexistent does not exist let result = pool .merge_agent_work(repo, "99_nonexistent") @@ -4192,7 +4227,7 @@ mod tests { .output() .unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let report = pool.merge_agent_work(repo, "23_test").await.unwrap(); // Merge should succeed (gates will run but cargo/pnpm results will depend on env) @@ -4460,7 +4495,7 @@ mod tests { fn is_story_assigned_returns_true_for_running_coder() { use crate::config::ProjectConfig; let config = ProjectConfig::default(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Running); let agents = pool.agents.lock().unwrap(); @@ -4490,7 +4525,7 @@ mod tests { fn is_story_assigned_returns_false_for_completed_agent() { use crate::config::ProjectConfig; let config = ProjectConfig::default(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("42_story_foo", "coder-1", AgentStatus::Completed); let agents = pool.agents.lock().unwrap(); @@ -4516,7 +4551,7 @@ name = "coder-2" ) .unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("s1", "coder-1", AgentStatus::Running); pool.inject_test_agent("s2", "coder-2", AgentStatus::Running); @@ -4540,7 +4575,7 @@ name = "coder-3" ) .unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // coder-1 is busy, coder-2 is free pool.inject_test_agent("s1", "coder-1", AgentStatus::Running); @@ -4560,7 +4595,7 @@ name = "coder-1" ) .unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // coder-1 completed its previous story — it's free for a new one pool.inject_test_agent("s1", "coder-1", AgentStatus::Completed); @@ -4640,7 +4675,7 @@ stage = "qa" ) .unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("42_story_foo", "qa-2", AgentStatus::Running); let agents = pool.agents.lock().unwrap(); @@ -4734,7 +4769,7 @@ stage = "qa" ) .unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // Simulate qa already running on story-a. pool.inject_test_agent("story-a", "qa", AgentStatus::Running); @@ -4771,7 +4806,7 @@ stage = "qa" ) .unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // Previous run completed — should NOT block a new story. pool.inject_test_agent("story-a", "qa", AgentStatus::Completed); @@ -4859,7 +4894,7 @@ stage = "qa" #[tokio::test] async fn reconcile_on_startup_noop_when_no_worktrees() { let tmp = tempfile::tempdir().unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let (tx, _rx) = broadcast::channel(16); // Should not panic; no worktrees to reconcile. pool.reconcile_on_startup(tmp.path(), &tx).await; @@ -4868,7 +4903,7 @@ stage = "qa" #[tokio::test] async fn reconcile_on_startup_emits_done_event() { let tmp = tempfile::tempdir().unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let (tx, mut rx) = broadcast::channel::(16); pool.reconcile_on_startup(tmp.path(), &tx).await; @@ -4901,7 +4936,7 @@ stage = "qa" fs::create_dir_all(&wt_dir).unwrap(); init_git_repo(&wt_dir); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let (tx, _rx) = broadcast::channel(16); pool.reconcile_on_startup(root, &tx).await; @@ -4985,7 +5020,7 @@ stage = "qa" "test setup: worktree should have committed work" ); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let (tx, _rx) = broadcast::channel(16); pool.reconcile_on_startup(root, &tx).await; @@ -5160,7 +5195,7 @@ stage = "qa" ) .unwrap(); - let pool = AgentPool::new(3099); + let pool = AgentPool::new_test(3099); let result = pool .start_agent(root, "50_story_test", Some("qa"), None) @@ -5222,7 +5257,7 @@ stage = "qa" ) .unwrap(); - let pool = AgentPool::new(3099); + let pool = AgentPool::new_test(3099); // Manually inject a Running agent (simulates successful start). pool.inject_test_agent("story-x", "qa", AgentStatus::Running); @@ -5258,7 +5293,7 @@ stage = "qa" fs::create_dir_all(&sk_dir).unwrap(); fs::write(sk_dir.join("project.toml"), "[[agent]]\nname = \"coder-1\"\n").unwrap(); - let pool = AgentPool::new(3099); + let pool = AgentPool::new_test(3099); // Simulate what the winning concurrent call would have done: insert a // Pending entry for coder-1 on story-86. @@ -5308,7 +5343,7 @@ stage = "qa" ) .unwrap(); - let pool = Arc::new(AgentPool::new(3099)); + let pool = Arc::new(AgentPool::new_test(3099)); let pool1 = pool.clone(); let root1 = root.clone(); @@ -5379,7 +5414,7 @@ stage = "qa" ) .unwrap(); - let pool = Arc::new(AgentPool::new(3099)); + let pool = Arc::new(AgentPool::new_test(3099)); // Run two concurrent auto_assign calls. let pool1 = pool.clone(); @@ -5748,7 +5783,7 @@ theirs .output() .unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let report = pool.merge_agent_work(repo, "42_story_foo").await.unwrap(); // Master should NEVER have conflict markers, regardless of merge outcome. @@ -5815,7 +5850,7 @@ theirs /// as Failed, emitting an Error event so that `wait_for_agent` unblocks. #[tokio::test] async fn watchdog_detects_orphaned_running_agent() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let handle = tokio::spawn(async {}); tokio::time::sleep(std::time::Duration::from_millis(20)).await; @@ -5849,7 +5884,7 @@ theirs #[test] fn remove_agents_for_story_removes_all_entries() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("story_a", "coder-1", AgentStatus::Completed); pool.inject_test_agent("story_a", "qa", AgentStatus::Failed); pool.inject_test_agent("story_b", "coder-1", AgentStatus::Running); @@ -5864,7 +5899,7 @@ theirs #[test] fn remove_agents_for_story_returns_zero_when_no_match() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("story_a", "coder-1", AgentStatus::Running); let removed = pool.remove_agents_for_story("nonexistent"); @@ -5878,7 +5913,7 @@ theirs #[test] fn reap_expired_agents_removes_old_completed_entries() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // Inject a completed agent with an artificial old completed_at. { @@ -5922,7 +5957,7 @@ theirs #[test] fn reap_expired_agents_removes_old_failed_entries() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // Inject a failed agent with an old completed_at. { @@ -5954,7 +5989,7 @@ theirs #[test] fn reap_expired_agents_skips_running_entries() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("running_story", "coder-1", AgentStatus::Running); let reaped = pool.reap_expired_agents(std::time::Duration::from_secs(0)); @@ -5975,7 +6010,7 @@ theirs fs::create_dir_all(¤t).unwrap(); fs::write(current.join("60_story_cleanup.md"), "test").unwrap(); - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); pool.inject_test_agent("60_story_cleanup", "coder-1", AgentStatus::Completed); pool.inject_test_agent("60_story_cleanup", "qa", AgentStatus::Completed); pool.inject_test_agent("61_story_other", "coder-1", AgentStatus::Running); @@ -6176,7 +6211,7 @@ theirs #[tokio::test] async fn check_orphaned_agents_returns_count_of_orphaned_agents() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // Spawn two tasks that finish immediately. let h1 = tokio::spawn(async {}); @@ -6194,7 +6229,7 @@ theirs #[test] fn check_orphaned_agents_returns_zero_when_no_orphans() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // Inject agents in terminal states — not orphaned. pool.inject_test_agent("story_a", "coder", AgentStatus::Completed); pool.inject_test_agent("story_b", "qa", AgentStatus::Failed); @@ -6208,7 +6243,7 @@ theirs // This test verifies the contract that `check_orphaned_agents` returns // a non-zero count when orphans exist, which the watchdog uses to // decide whether to trigger auto-assign (bug 161). - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let handle = tokio::spawn(async {}); tokio::time::sleep(std::time::Duration::from_millis(20)).await; @@ -6262,7 +6297,7 @@ theirs #[test] fn kill_all_children_is_safe_on_empty_pool() { - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); // Should not panic or deadlock on an empty registry. pool.kill_all_children(); assert_eq!(pool.child_killer_count(), 0); @@ -6271,7 +6306,7 @@ theirs #[test] fn kill_all_children_kills_real_process() { // GIVEN: a real PTY child process (sleep 100) with its killer registered. - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let pty_system = native_pty_system(); let pair = pty_system @@ -6315,7 +6350,7 @@ theirs #[test] fn kill_all_children_clears_registry() { // GIVEN: a pool with one registered killer. - let pool = AgentPool::new(3001); + let pool = AgentPool::new_test(3001); let pty_system = native_pty_system(); let pair = pty_system diff --git a/server/src/http/context.rs b/server/src/http/context.rs index 868b233..9e2726a 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -52,7 +52,7 @@ impl AppContext { state: Arc::new(state), store: Arc::new(JsonFileStore::new(store_path).unwrap()), workflow: Arc::new(std::sync::Mutex::new(WorkflowState::default())), - agents: Arc::new(AgentPool::new(3001)), + agents: Arc::new(AgentPool::new(3001, watcher_tx.clone())), watcher_tx, reconciliation_tx, perm_tx, diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index 444360e..561f5b8 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -77,6 +77,10 @@ enum WsResponse { /// `.story_kit/project.toml` was modified; the frontend should re-fetch the /// agent roster. Does NOT trigger a pipeline state refresh. AgentConfigChanged, + /// An agent's state changed (started, stopped, completed, etc.). + /// Triggers a pipeline state refresh and tells the frontend to re-fetch + /// the agent list. + AgentStateChanged, /// Claude Code is requesting user approval before executing a tool. PermissionRequest { request_id: String, @@ -120,6 +124,7 @@ impl From for Option { commit_msg, }), WatcherEvent::ConfigChanged => Some(WsResponse::AgentConfigChanged), + WatcherEvent::AgentStateChanged => Some(WsResponse::AgentStateChanged), } } } @@ -184,15 +189,18 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem loop { match watcher_rx.recv().await { Ok(evt) => { - let is_work_item = - matches!(evt, crate::io::watcher::WatcherEvent::WorkItem { .. }); + let needs_pipeline_refresh = matches!( + evt, + crate::io::watcher::WatcherEvent::WorkItem { .. } + | crate::io::watcher::WatcherEvent::AgentStateChanged + ); let ws_msg: Option = evt.into(); if let Some(msg) = ws_msg && tx_watcher.send(msg).is_err() { break; } - // Only push refreshed pipeline state after work-item changes, - // not after config-file changes. - if is_work_item + // Push refreshed pipeline state after work-item changes and + // agent state changes (so the board updates agent lozenges). + if needs_pipeline_refresh && let Ok(state) = load_pipeline_state(ctx_watcher.as_ref()) && tx_watcher.send(state.into()).is_err() { diff --git a/server/src/io/watcher.rs b/server/src/io/watcher.rs index b85992b..23df218 100644 --- a/server/src/io/watcher.rs +++ b/server/src/io/watcher.rs @@ -45,6 +45,10 @@ pub enum WatcherEvent { }, /// `.story_kit/project.toml` was modified at the project root (not inside a worktree). ConfigChanged, + /// An agent's state changed (started, stopped, completed, etc.). + /// Triggers a pipeline state refresh so the frontend can update agent + /// assignments without waiting for a filesystem event. + AgentStateChanged, } /// Return `true` if `path` is the root-level `.story_kit/project.toml`, i.e. diff --git a/server/src/main.rs b/server/src/main.rs index bd7ee23..d83868b 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -55,15 +55,16 @@ async fn main() -> Result<(), std::io::Error> { let workflow = Arc::new(std::sync::Mutex::new(WorkflowState::default())); let port = resolve_port(); - let agents = Arc::new(AgentPool::new(port)); + + // Filesystem watcher: broadcast channel for work/ pipeline changes. + // Created before AgentPool so the pool can emit AgentStateChanged events. + let (watcher_tx, _) = broadcast::channel::(1024); + let agents = Arc::new(AgentPool::new(port, watcher_tx.clone())); // Start the background watchdog that detects and cleans up orphaned Running agents. // When orphans are found, auto-assign is triggered to reassign free agents. let watchdog_root: Option = app_state.project_root.lock().unwrap().clone(); AgentPool::spawn_watchdog(Arc::clone(&agents), watchdog_root); - - // Filesystem watcher: broadcast channel for work/ pipeline changes. - let (watcher_tx, _) = broadcast::channel::(1024); if let Some(ref root) = *app_state.project_root.lock().unwrap() { let work_dir = root.join(".story_kit").join("work"); if work_dir.is_dir() {