use crate::http::context::AppContext; use crate::http::workflow::{PipelineState, load_pipeline_state}; use crate::io::watcher::WatcherEvent; use crate::llm::chat; use crate::llm::types::Message; use futures::{SinkExt, StreamExt}; use poem::handler; use poem::web::Data; use poem::web::websocket::{Message as WsMessage, WebSocket}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; #[derive(Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] /// WebSocket request messages sent by the client. /// /// - `chat` starts a streaming chat session. /// - `cancel` stops the active session. /// - `permission_response` approves or denies a pending permission request. enum WsRequest { Chat { messages: Vec, config: chat::ProviderConfig, }, Cancel, PermissionResponse { request_id: String, approved: bool, }, } #[derive(Serialize)] #[serde(tag = "type", rename_all = "snake_case")] /// WebSocket response messages sent by the server. /// /// - `token` streams partial model output. /// - `update` pushes the updated message history. /// - `error` reports a request or processing failure. /// - `work_item_changed` notifies that a `.story_kit/work/` file changed. /// - `agent_config_changed` notifies that `.story_kit/project.toml` was modified. enum WsResponse { Token { content: String, }, Update { messages: Vec, }, /// Session ID for Claude Code conversation resumption. SessionId { session_id: String, }, Error { message: String, }, /// Filesystem watcher notification: a work-pipeline file was created or /// modified and auto-committed. The frontend can use this to refresh its /// story/bug list without polling. WorkItemChanged { stage: String, item_id: String, action: String, commit_msg: String, }, /// Full pipeline state pushed on connect and after every work-item watcher event. PipelineState { upcoming: Vec, current: Vec, qa: Vec, merge: Vec, }, /// `.story_kit/project.toml` was modified; the frontend should re-fetch the /// agent roster. Does NOT trigger a pipeline state refresh. AgentConfigChanged, /// Claude Code is requesting user approval before executing a tool. PermissionRequest { request_id: String, tool_name: String, tool_input: serde_json::Value, }, /// The agent started assembling a tool call; shows live status in the UI. ToolActivity { tool_name: String, }, /// Real-time progress from the server startup reconciliation pass. /// `status` is one of: "checking", "gates_running", "advanced", "skipped", /// "failed", "done". `story_id` is empty for the overall "done" event. ReconciliationProgress { story_id: String, status: String, message: String, }, } impl From for Option { fn from(e: WatcherEvent) -> Self { match e { WatcherEvent::WorkItem { stage, item_id, action, commit_msg, } => Some(WsResponse::WorkItemChanged { stage, item_id, action, commit_msg, }), WatcherEvent::ConfigChanged => Some(WsResponse::AgentConfigChanged), } } } impl From for WsResponse { fn from(s: PipelineState) -> Self { WsResponse::PipelineState { upcoming: s.upcoming, current: s.current, qa: s.qa, merge: s.merge, } } } #[handler] /// WebSocket endpoint for streaming chat responses, cancellation, and /// filesystem watcher notifications. /// /// Accepts JSON `WsRequest` messages and streams `WsResponse` messages. pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem::IntoResponse { let ctx = ctx.0.clone(); ws.on_upgrade(move |socket| async move { let (mut sink, mut stream) = socket.split(); let (tx, mut rx) = mpsc::unbounded_channel::(); let forward = tokio::spawn(async move { while let Some(msg) = rx.recv().await { if let Ok(text) = serde_json::to_string(&msg) && sink.send(WsMessage::Text(text)).await.is_err() { break; } } }); // Push initial pipeline state to the client on connect. if let Ok(state) = load_pipeline_state(ctx.as_ref()) { let _ = tx.send(state.into()); } // Subscribe to filesystem watcher events and forward them to the client. // After each work-item event, also push the updated pipeline state. // Config-changed events are forwarded as-is without a pipeline refresh. let tx_watcher = tx.clone(); let ctx_watcher = ctx.clone(); let mut watcher_rx = ctx.watcher_tx.subscribe(); tokio::spawn(async move { loop { match watcher_rx.recv().await { Ok(evt) => { let is_work_item = matches!(evt, crate::io::watcher::WatcherEvent::WorkItem { .. }); let ws_msg: Option = evt.into(); if let Some(msg) = ws_msg && tx_watcher.send(msg).is_err() { break; } // Only push refreshed pipeline state after work-item changes, // not after config-file changes. if is_work_item && let Ok(state) = load_pipeline_state(ctx_watcher.as_ref()) && tx_watcher.send(state.into()).is_err() { break; } } // Lagged: skip missed events, keep going. Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } }); // Subscribe to startup reconciliation events and forward them to the client. let tx_reconcile = tx.clone(); let mut reconcile_rx = ctx.reconciliation_tx.subscribe(); tokio::spawn(async move { loop { match reconcile_rx.recv().await { Ok(evt) => { if tx_reconcile .send(WsResponse::ReconciliationProgress { story_id: evt.story_id, status: evt.status, message: evt.message, }) .is_err() { break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } }); // Map of pending permission request_id → oneshot responder. // Permission requests arrive from the MCP `prompt_permission` tool via // `ctx.perm_rx` and are forwarded to the client as `PermissionRequest`. // When the client responds, we resolve the corresponding oneshot. let mut pending_perms: HashMap> = HashMap::new(); loop { // Outer loop: wait for the next WebSocket message. let Some(Ok(WsMessage::Text(text))) = stream.next().await else { break; }; let parsed: Result = serde_json::from_str(&text); match parsed { Ok(WsRequest::Chat { messages, config }) => { let tx_updates = tx.clone(); let tx_tokens = tx.clone(); let tx_activity = tx.clone(); let ctx_clone = ctx.clone(); // Build the chat future without driving it yet so we can // interleave it with permission-request forwarding. let chat_fut = chat::chat( messages, config, &ctx_clone.state, ctx_clone.store.as_ref(), move |history| { let _ = tx_updates.send(WsResponse::Update { messages: history.to_vec(), }); }, move |token| { let _ = tx_tokens.send(WsResponse::Token { content: token.to_string(), }); }, move |tool_name: &str| { let _ = tx_activity.send(WsResponse::ToolActivity { tool_name: tool_name.to_string(), }); }, ); tokio::pin!(chat_fut); // Lock the permission receiver for the duration of this chat // session. Permission requests from the MCP tool arrive here. let mut perm_rx = ctx.perm_rx.lock().await; // Inner loop: drive the chat while concurrently handling // permission requests (from MCP) and WebSocket messages. let chat_result = loop { tokio::select! { result = &mut chat_fut => break result, // Forward permission requests from MCP tool to the client. Some(perm_fwd) = perm_rx.recv() => { let _ = tx.send(WsResponse::PermissionRequest { request_id: perm_fwd.request_id.clone(), tool_name: perm_fwd.tool_name.clone(), tool_input: perm_fwd.tool_input.clone(), }); pending_perms.insert( perm_fwd.request_id, perm_fwd.response_tx, ); } // Handle WebSocket messages during an active chat // (permission responses and cancellations). Some(Ok(WsMessage::Text(inner_text))) = stream.next() => { match serde_json::from_str::(&inner_text) { Ok(WsRequest::PermissionResponse { request_id, approved }) => { if let Some(resp_tx) = pending_perms.remove(&request_id) { let _ = resp_tx.send(approved); } } Ok(WsRequest::Cancel) => { let _ = chat::cancel_chat(&ctx.state); } _ => {} } } } }; match chat_result { Ok(chat_result) => { if let Some(sid) = chat_result.session_id { let _ = tx.send(WsResponse::SessionId { session_id: sid }); } } Err(err) => { let _ = tx.send(WsResponse::Error { message: err }); } } } Ok(WsRequest::Cancel) => { let _ = chat::cancel_chat(&ctx.state); } Ok(WsRequest::PermissionResponse { .. }) => { // Permission responses outside an active chat are ignored. } Err(err) => { let _ = tx.send(WsResponse::Error { message: format!("Invalid request: {err}"), }); } } } drop(tx); let _ = forward.await; }) }