diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index 89fa946..ba155eb 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -51,7 +51,13 @@ export type WsResponse = tool_name: string; tool_input: Record; } - | { type: "tool_activity"; tool_name: string }; + | { type: "tool_activity"; tool_name: string } + | { + type: "reconciliation_progress"; + story_id: string; + status: string; + message: string; + }; export interface ProviderConfig { provider: string; @@ -262,6 +268,11 @@ export class ChatWebSocket { toolInput: Record, ) => void; private onActivity?: (toolName: string) => void; + private onReconciliationProgress?: ( + storyId: string, + status: string, + message: string, + ) => void; private connected = false; private closeTimer?: number; private wsPath = DEFAULT_WS_PATH; @@ -305,6 +316,12 @@ export class ChatWebSocket { data.tool_input, ); if (data.type === "tool_activity") this.onActivity?.(data.tool_name); + if (data.type === "reconciliation_progress") + this.onReconciliationProgress?.( + data.story_id, + data.status, + data.message, + ); } catch (err) { this.onError?.(String(err)); } @@ -345,6 +362,11 @@ export class ChatWebSocket { toolInput: Record, ) => void; onActivity?: (toolName: string) => void; + onReconciliationProgress?: ( + storyId: string, + status: string, + message: string, + ) => void; }, wsPath = DEFAULT_WS_PATH, ) { @@ -355,6 +377,7 @@ export class ChatWebSocket { this.onPipelineState = handlers.onPipelineState; this.onPermissionRequest = handlers.onPermissionRequest; this.onActivity = handlers.onActivity; + this.onReconciliationProgress = handlers.onReconciliationProgress; this.wsPath = wsPath; this.shouldReconnect = true; diff --git a/frontend/src/components/Chat.test.tsx b/frontend/src/components/Chat.test.tsx index d36ea54..d732d00 100644 --- a/frontend/src/components/Chat.test.tsx +++ b/frontend/src/components/Chat.test.tsx @@ -18,6 +18,11 @@ type WsHandlers = { onUpdate: (history: Message[]) => void; onSessionId: (sessionId: string) => void; onError: (message: string) => void; + onReconciliationProgress: ( + storyId: string, + status: string, + message: string, + ) => void; }; let capturedWsHandlers: WsHandlers | null = null; @@ -310,3 +315,81 @@ describe("Chat input Shift+Enter behavior", () => { expect((input as HTMLTextAreaElement).value).toBe("Hello"); }); }); + +describe("Chat reconciliation banner", () => { + beforeEach(() => { + capturedWsHandlers = null; + setupMocks(); + }); + + it("shows banner when a non-done reconciliation event is received", async () => { + render(); + + await waitFor(() => expect(capturedWsHandlers).not.toBeNull()); + + act(() => { + capturedWsHandlers?.onReconciliationProgress( + "42_story_test", + "checking", + "Checking for committed work in 2_current/", + ); + }); + + expect( + await screen.findByTestId("reconciliation-banner"), + ).toBeInTheDocument(); + expect( + await screen.findByText("Reconciling startup state..."), + ).toBeInTheDocument(); + }); + + it("shows event message in the banner", async () => { + render(); + + await waitFor(() => expect(capturedWsHandlers).not.toBeNull()); + + act(() => { + capturedWsHandlers?.onReconciliationProgress( + "42_story_test", + "gates_running", + "Running acceptance gates…", + ); + }); + + expect( + await screen.findByText(/Running acceptance gates/), + ).toBeInTheDocument(); + }); + + it("dismisses banner when done event is received", async () => { + render(); + + await waitFor(() => expect(capturedWsHandlers).not.toBeNull()); + + act(() => { + capturedWsHandlers?.onReconciliationProgress( + "42_story_test", + "checking", + "Checking for committed work", + ); + }); + + expect( + await screen.findByTestId("reconciliation-banner"), + ).toBeInTheDocument(); + + act(() => { + capturedWsHandlers?.onReconciliationProgress( + "", + "done", + "Startup reconciliation complete.", + ); + }); + + await waitFor(() => { + expect( + screen.queryByTestId("reconciliation-banner"), + ).not.toBeInTheDocument(); + }); + }); +}); diff --git a/frontend/src/components/Chat.tsx b/frontend/src/components/Chat.tsx index b51105d..710501b 100644 --- a/frontend/src/components/Chat.tsx +++ b/frontend/src/components/Chat.tsx @@ -64,6 +64,11 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { const [isNarrowScreen, setIsNarrowScreen] = useState( window.innerWidth < NARROW_BREAKPOINT, ); + const [reconciliationActive, setReconciliationActive] = useState(false); + const [reconciliationEvents, setReconciliationEvents] = useState< + { id: string; storyId: string; status: string; message: string }[] + >([]); + const reconciliationEventIdRef = useRef(0); const wsRef = useRef(null); const messagesEndRef = useRef(null); @@ -197,6 +202,19 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { onActivity: (toolName) => { setActivityStatus(formatToolActivity(toolName)); }, + onReconciliationProgress: (storyId, status, message) => { + if (status === "done") { + setReconciliationActive(false); + } else { + setReconciliationActive(true); + setReconciliationEvents((prev) => { + const id = String(reconciliationEventIdRef.current++); + const next = [...prev, { id, storyId, status, message }]; + // Keep only the last 8 events to avoid the banner growing too tall. + return next.slice(-8); + }); + } + }, }); return () => { @@ -679,6 +697,52 @@ export function Chat({ projectPath, onCloseProject }: ChatProps) { + {/* Startup reconciliation progress banner */} + {reconciliationActive && ( +
+
+ Reconciling startup state... +
+ {reconciliationEvents.map((evt) => ( +
+ {evt.storyId ? `[${evt.storyId}] ` : ""} + {evt.message} +
+ ))} +
+ )} + {/* Chat input pinned at bottom of left column */}
String { format!("{story_id}:{agent_name}") @@ -1198,11 +1210,20 @@ impl AgentPool { /// start a fresh agent to retry. /// 4. Stories in `4_merge/` are left for `auto_assign_available_work` to handle via a /// fresh mergemaster (squash-merge must be re-executed by the mergemaster agent). - pub async fn reconcile_on_startup(&self, project_root: &Path) { + pub async fn reconcile_on_startup( + &self, + project_root: &Path, + progress_tx: &broadcast::Sender, + ) { let worktrees = match worktree::list_worktrees(project_root) { Ok(wt) => wt, Err(e) => { eprintln!("[startup:reconcile] Failed to list worktrees: {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: String::new(), + status: "done".to_string(), + message: format!("Reconciliation failed: {e}"), + }); return; } }; @@ -1222,6 +1243,12 @@ impl AgentPool { continue; } + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "checking".to_string(), + message: format!("Checking for committed work in {stage_dir}/"), + }); + // Check whether the worktree has commits ahead of the base branch. let wt_path_for_check = wt_path.clone(); let has_work = tokio::task::spawn_blocking(move || { @@ -1234,12 +1261,22 @@ impl AgentPool { eprintln!( "[startup:reconcile] No committed work for '{story_id}' in {stage_dir}/; skipping." ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "skipped".to_string(), + message: "No committed work found; skipping.".to_string(), + }); continue; } eprintln!( "[startup:reconcile] Found committed work for '{story_id}' in {stage_dir}/. Running acceptance gates." ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "gates_running".to_string(), + message: "Running acceptance gates…".to_string(), + }); // Run acceptance gates on the worktree. let wt_path_for_gates = wt_path.clone(); @@ -1253,12 +1290,22 @@ impl AgentPool { Ok(Ok(pair)) => pair, Ok(Err(e)) => { eprintln!("[startup:reconcile] Gate check error for '{story_id}': {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Gate error: {e}"), + }); continue; } Err(e) => { eprintln!( "[startup:reconcile] Gate check task panicked for '{story_id}': {e}" ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Gate task panicked: {e}"), + }); continue; } }; @@ -1268,6 +1315,11 @@ impl AgentPool { "[startup:reconcile] Gates failed for '{story_id}': {gate_output}\n\ Leaving in {stage_dir}/ for auto-assign to restart the agent." ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: "Gates failed; will be retried by auto-assign.".to_string(), + }); continue; } @@ -1279,8 +1331,18 @@ impl AgentPool { // Coder stage → advance to QA. if let Err(e) = move_story_to_qa(project_root, story_id) { eprintln!("[startup:reconcile] Failed to move '{story_id}' to 3_qa/: {e}"); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Failed to advance to QA: {e}"), + }); } else { eprintln!("[startup:reconcile] Moved '{story_id}' → 3_qa/."); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "advanced".to_string(), + message: "Gates passed — moved to QA.".to_string(), + }); } } else if stage_dir == "3_qa" { // QA stage → run coverage gate before advancing to merge. @@ -1295,12 +1357,22 @@ impl AgentPool { eprintln!( "[startup:reconcile] Coverage gate error for '{story_id}': {e}" ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Coverage gate error: {e}"), + }); continue; } Err(e) => { eprintln!( "[startup:reconcile] Coverage gate panicked for '{story_id}': {e}" ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Coverage gate panicked: {e}"), + }); continue; } }; @@ -1310,17 +1382,39 @@ impl AgentPool { eprintln!( "[startup:reconcile] Failed to move '{story_id}' to 4_merge/: {e}" ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: format!("Failed to advance to merge: {e}"), + }); } else { eprintln!("[startup:reconcile] Moved '{story_id}' → 4_merge/."); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "advanced".to_string(), + message: "Gates passed — moved to merge.".to_string(), + }); } } else { eprintln!( "[startup:reconcile] Coverage gate failed for '{story_id}': {coverage_output}\n\ Leaving in 3_qa/ for auto-assign to restart the QA agent." ); + let _ = progress_tx.send(ReconciliationEvent { + story_id: story_id.clone(), + status: "failed".to_string(), + message: "Coverage gate failed; will be retried.".to_string(), + }); } } } + + // Signal that reconciliation is complete. + let _ = progress_tx.send(ReconciliationEvent { + story_id: String::new(), + status: "done".to_string(), + message: "Startup reconciliation complete.".to_string(), + }); } /// Test helper: inject an agent with a completion report and project_root @@ -3566,8 +3660,28 @@ name = "qa" async fn reconcile_on_startup_noop_when_no_worktrees() { let tmp = tempfile::tempdir().unwrap(); let pool = AgentPool::new(3001); + let (tx, _rx) = broadcast::channel(16); // Should not panic; no worktrees to reconcile. - pool.reconcile_on_startup(tmp.path()).await; + pool.reconcile_on_startup(tmp.path(), &tx).await; + } + + #[tokio::test] + async fn reconcile_on_startup_emits_done_event() { + let tmp = tempfile::tempdir().unwrap(); + let pool = AgentPool::new(3001); + let (tx, mut rx) = broadcast::channel::(16); + pool.reconcile_on_startup(tmp.path(), &tx).await; + + // Collect all events; the last must be "done". + let mut events: Vec = Vec::new(); + while let Ok(evt) = rx.try_recv() { + events.push(evt); + } + assert!( + events.iter().any(|e| e.status == "done"), + "reconcile_on_startup must emit a 'done' event; got: {:?}", + events.iter().map(|e| &e.status).collect::>() + ); } #[tokio::test] @@ -3588,7 +3702,8 @@ name = "qa" init_git_repo(&wt_dir); let pool = AgentPool::new(3001); - pool.reconcile_on_startup(root).await; + let (tx, _rx) = broadcast::channel(16); + pool.reconcile_on_startup(root, &tx).await; // Story should still be in 2_current/ — nothing was reconciled. assert!( @@ -3671,7 +3786,8 @@ name = "qa" ); let pool = AgentPool::new(3001); - pool.reconcile_on_startup(root).await; + let (tx, _rx) = broadcast::channel(16); + pool.reconcile_on_startup(root, &tx).await; // In the test env, cargo clippy will fail (no Cargo.toml) so gates fail // and the story stays in 2_current/. The important assertion is that diff --git a/server/src/http/context.rs b/server/src/http/context.rs index 0e7c6dd..868b233 100644 --- a/server/src/http/context.rs +++ b/server/src/http/context.rs @@ -1,4 +1,4 @@ -use crate::agents::AgentPool; +use crate::agents::{AgentPool, ReconciliationEvent}; use crate::io::watcher::WatcherEvent; use crate::state::SessionState; use crate::store::JsonFileStore; @@ -26,6 +26,10 @@ pub struct AppContext { /// Broadcast channel for filesystem watcher events. WebSocket handlers /// subscribe to this to push lifecycle notifications to connected clients. pub watcher_tx: broadcast::Sender, + /// Broadcast channel for startup reconciliation progress events. + /// WebSocket handlers subscribe to this to push real-time reconciliation + /// updates to connected clients. + pub reconciliation_tx: broadcast::Sender, /// Sender for permission requests originating from the MCP /// `prompt_permission` tool. The MCP handler sends a [`PermissionForward`] /// and awaits the oneshot response. @@ -42,6 +46,7 @@ impl AppContext { *state.project_root.lock().unwrap() = Some(project_root.clone()); let store_path = project_root.join(".story_kit_store.json"); let (watcher_tx, _) = broadcast::channel(64); + let (reconciliation_tx, _) = broadcast::channel(64); let (perm_tx, perm_rx) = mpsc::unbounded_channel(); Self { state: Arc::new(state), @@ -49,6 +54,7 @@ impl AppContext { workflow: Arc::new(std::sync::Mutex::new(WorkflowState::default())), agents: Arc::new(AgentPool::new(3001)), watcher_tx, + reconciliation_tx, perm_tx, perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), } diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index 6a0a20d..6668acd 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -79,6 +79,14 @@ enum WsResponse { ToolActivity { tool_name: String, }, + /// Real-time progress from the server startup reconciliation pass. + /// `status` is one of: "checking", "gates_running", "advanced", "skipped", + /// "failed", "done". `story_id` is empty for the overall "done" event. + ReconciliationProgress { + story_id: String, + status: String, + message: String, + }, } impl From for WsResponse { @@ -155,6 +163,30 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem } }); + // Subscribe to startup reconciliation events and forward them to the client. + let tx_reconcile = tx.clone(); + let mut reconcile_rx = ctx.reconciliation_tx.subscribe(); + tokio::spawn(async move { + loop { + match reconcile_rx.recv().await { + Ok(evt) => { + if tx_reconcile + .send(WsResponse::ReconciliationProgress { + story_id: evt.story_id, + status: evt.status, + message: evt.message, + }) + .is_err() + { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); + // Map of pending permission request_id → oneshot responder. // Permission requests arrive from the MCP `prompt_permission` tool via // `ctx.perm_rx` and are forwarded to the client as `PermissionRequest`. diff --git a/server/src/main.rs b/server/src/main.rs index 5715d47..824d9af 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -66,12 +66,18 @@ async fn main() -> Result<(), std::io::Error> { } } + // Reconciliation progress channel: startup reconciliation → WebSocket clients. + let (reconciliation_tx, _) = + broadcast::channel::(64); + // Permission channel: MCP prompt_permission → WebSocket handler. let (perm_tx, perm_rx) = tokio::sync::mpsc::unbounded_channel(); - // Capture project root and agents Arc before ctx is consumed by build_routes. + // Capture project root, agents Arc, and reconciliation sender before ctx + // is consumed by build_routes. let startup_root: Option = app_state.project_root.lock().unwrap().clone(); let startup_agents = Arc::clone(&agents); + let startup_reconciliation_tx = reconciliation_tx.clone(); let ctx = AppContext { state: app_state, @@ -79,6 +85,7 @@ async fn main() -> Result<(), std::io::Error> { workflow, agents, watcher_tx, + reconciliation_tx, perm_tx, perm_rx: Arc::new(tokio::sync::Mutex::new(perm_rx)), }; @@ -94,7 +101,9 @@ async fn main() -> Result<(), std::io::Error> { slog!( "[startup] Reconciling completed worktrees from previous session." ); - startup_agents.reconcile_on_startup(&root).await; + startup_agents + .reconcile_on_startup(&root, &startup_reconciliation_tx) + .await; slog!( "[auto-assign] Scanning pipeline stages for unassigned work." );