use crate::http::context::{AppContext, PermissionDecision}; use crate::http::workflow::{PipelineState, load_pipeline_state}; use crate::io::onboarding; use crate::io::watcher::WatcherEvent; use crate::llm::chat; use crate::llm::types::Message; use futures::{SinkExt, StreamExt}; use poem::handler; use poem::web::Data; use poem::web::websocket::{Message as WsMessage, WebSocket}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; #[derive(Deserialize)] #[serde(tag = "type", rename_all = "snake_case")] /// WebSocket request messages sent by the client. /// /// - `chat` starts a streaming chat session. /// - `cancel` stops the active session. /// - `permission_response` approves or denies a pending permission request. enum WsRequest { Chat { messages: Vec, config: chat::ProviderConfig, }, Cancel, PermissionResponse { request_id: String, approved: bool, #[serde(default)] always_allow: bool, }, /// Heartbeat ping from the client. The server responds with `Pong` so the /// client can detect stale (half-closed) connections. Ping, /// A quick side question answered from current conversation context. /// The question and response are NOT added to the conversation history /// and no tool calls are made. SideQuestion { question: String, context_messages: Vec, config: chat::ProviderConfig, }, } #[derive(Serialize)] #[serde(tag = "type", rename_all = "snake_case")] /// WebSocket response messages sent by the server. /// /// - `token` streams partial model output. /// - `update` pushes the updated message history. /// - `error` reports a request or processing failure. /// - `work_item_changed` notifies that a `.story_kit/work/` file changed. /// - `agent_config_changed` notifies that `.story_kit/project.toml` was modified. enum WsResponse { Token { content: String, }, Update { messages: Vec, }, /// Session ID for Claude Code conversation resumption. SessionId { session_id: String, }, Error { message: String, }, /// Filesystem watcher notification: a work-pipeline file was created or /// modified and auto-committed. The frontend can use this to refresh its /// story/bug list without polling. WorkItemChanged { stage: String, item_id: String, action: String, commit_msg: String, }, /// Full pipeline state pushed on connect and after every work-item watcher event. PipelineState { backlog: Vec, current: Vec, qa: Vec, merge: Vec, done: Vec, }, /// `.story_kit/project.toml` was modified; the frontend should re-fetch the /// agent roster. Does NOT trigger a pipeline state refresh. AgentConfigChanged, /// An agent's state changed (started, stopped, completed, etc.). /// Triggers a pipeline state refresh and tells the frontend to re-fetch /// the agent list. AgentStateChanged, /// Claude Code is requesting user approval before executing a tool. PermissionRequest { request_id: String, tool_name: String, tool_input: serde_json::Value, }, /// The agent started assembling a tool call; shows live status in the UI. ToolActivity { tool_name: String, }, /// Real-time progress from the server startup reconciliation pass. /// `status` is one of: "checking", "gates_running", "advanced", "skipped", /// "failed", "done". `story_id` is empty for the overall "done" event. ReconciliationProgress { story_id: String, status: String, message: String, }, /// Heartbeat response to a client `Ping`. Lets the client confirm the /// connection is alive and cancel any stale-connection timeout. Pong, /// Streaming thinking token from an extended-thinking block. /// Sent separately from `Token` so the frontend can render them in /// a constrained, scrollable ThinkingBlock rather than inline. ThinkingToken { content: String, }, /// Sent on connect when the project's spec files still contain scaffold /// placeholder content and the user needs to go through onboarding. OnboardingStatus { needs_onboarding: bool, }, /// Streaming token from a `/btw` side question response. SideQuestionToken { content: String, }, /// Final signal that the `/btw` side question has been fully answered. SideQuestionDone { response: String, }, } impl From for Option { fn from(e: WatcherEvent) -> Self { match e { WatcherEvent::WorkItem { stage, item_id, action, commit_msg, } => Some(WsResponse::WorkItemChanged { stage, item_id, action, commit_msg, }), WatcherEvent::ConfigChanged => Some(WsResponse::AgentConfigChanged), WatcherEvent::AgentStateChanged => Some(WsResponse::AgentStateChanged), // MergeFailure is handled by the Matrix notification listener only; // no WebSocket message is needed for the frontend. WatcherEvent::MergeFailure { .. } => None, } } } impl From for WsResponse { fn from(s: PipelineState) -> Self { WsResponse::PipelineState { backlog: s.backlog, current: s.current, qa: s.qa, merge: s.merge, done: s.done, } } } #[handler] /// WebSocket endpoint for streaming chat responses, cancellation, and /// filesystem watcher notifications. /// /// Accepts JSON `WsRequest` messages and streams `WsResponse` messages. pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem::IntoResponse { let ctx = ctx.0.clone(); ws.on_upgrade(move |socket| async move { let (mut sink, mut stream) = socket.split(); let (tx, mut rx) = mpsc::unbounded_channel::(); let forward = tokio::spawn(async move { while let Some(msg) = rx.recv().await { if let Ok(text) = serde_json::to_string(&msg) && sink.send(WsMessage::Text(text)).await.is_err() { break; } } }); // Push initial pipeline state to the client on connect. if let Ok(state) = load_pipeline_state(ctx.as_ref()) { let _ = tx.send(state.into()); } // Push onboarding status so the frontend knows whether to show // the onboarding welcome flow. { let needs = ctx .state .get_project_root() .map(|root| onboarding::check_onboarding_status(&root).needs_onboarding()) .unwrap_or(false); let _ = tx.send(WsResponse::OnboardingStatus { needs_onboarding: needs, }); } // Subscribe to filesystem watcher events and forward them to the client. // After each work-item event, also push the updated pipeline state. // Config-changed events are forwarded as-is without a pipeline refresh. let tx_watcher = tx.clone(); let ctx_watcher = ctx.clone(); let mut watcher_rx = ctx.watcher_tx.subscribe(); tokio::spawn(async move { loop { match watcher_rx.recv().await { Ok(evt) => { let needs_pipeline_refresh = matches!( evt, crate::io::watcher::WatcherEvent::WorkItem { .. } | crate::io::watcher::WatcherEvent::AgentStateChanged ); let ws_msg: Option = evt.into(); if let Some(msg) = ws_msg && tx_watcher.send(msg).is_err() { break; } // Push refreshed pipeline state after work-item changes and // agent state changes (so the board updates agent lozenges). if needs_pipeline_refresh && let Ok(state) = load_pipeline_state(ctx_watcher.as_ref()) && tx_watcher.send(state.into()).is_err() { break; } } // Lagged: skip missed events, keep going. Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } }); // Subscribe to startup reconciliation events and forward them to the client. let tx_reconcile = tx.clone(); let mut reconcile_rx = ctx.reconciliation_tx.subscribe(); tokio::spawn(async move { loop { match reconcile_rx.recv().await { Ok(evt) => { if tx_reconcile .send(WsResponse::ReconciliationProgress { story_id: evt.story_id, status: evt.status, message: evt.message, }) .is_err() { break; } } Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, Err(tokio::sync::broadcast::error::RecvError::Closed) => break, } } }); // Map of pending permission request_id → oneshot responder. // Permission requests arrive from the MCP `prompt_permission` tool via // `ctx.perm_rx` and are forwarded to the client as `PermissionRequest`. // When the client responds, we resolve the corresponding oneshot. let mut pending_perms: HashMap> = HashMap::new(); loop { // Outer loop: wait for the next WebSocket message. let Some(Ok(WsMessage::Text(text))) = stream.next().await else { break; }; let parsed: Result = serde_json::from_str(&text); match parsed { Ok(WsRequest::Chat { messages, config }) => { let tx_updates = tx.clone(); let tx_tokens = tx.clone(); let tx_thinking = tx.clone(); let tx_activity = tx.clone(); let ctx_clone = ctx.clone(); // Build the chat future without driving it yet so we can // interleave it with permission-request forwarding. let chat_fut = chat::chat( messages, config, &ctx_clone.state, ctx_clone.store.as_ref(), move |history| { let _ = tx_updates.send(WsResponse::Update { messages: history.to_vec(), }); }, move |token| { let _ = tx_tokens.send(WsResponse::Token { content: token.to_string(), }); }, move |thinking: &str| { let _ = tx_thinking.send(WsResponse::ThinkingToken { content: thinking.to_string(), }); }, move |tool_name: &str| { let _ = tx_activity.send(WsResponse::ToolActivity { tool_name: tool_name.to_string(), }); }, ); tokio::pin!(chat_fut); // Lock the permission receiver for the duration of this chat // session. Permission requests from the MCP tool arrive here. let mut perm_rx = ctx.perm_rx.lock().await; // Inner loop: drive the chat while concurrently handling // permission requests (from MCP) and WebSocket messages. let chat_result = loop { tokio::select! { result = &mut chat_fut => break result, // Forward permission requests from MCP tool to the client. Some(perm_fwd) = perm_rx.recv() => { let _ = tx.send(WsResponse::PermissionRequest { request_id: perm_fwd.request_id.clone(), tool_name: perm_fwd.tool_name.clone(), tool_input: perm_fwd.tool_input.clone(), }); pending_perms.insert( perm_fwd.request_id, perm_fwd.response_tx, ); } // Handle WebSocket messages during an active chat // (permission responses and cancellations). Some(Ok(WsMessage::Text(inner_text))) = stream.next() => { match serde_json::from_str::(&inner_text) { Ok(WsRequest::PermissionResponse { request_id, approved, always_allow }) => { if let Some(resp_tx) = pending_perms.remove(&request_id) { let decision = if always_allow { PermissionDecision::AlwaysAllow } else if approved { PermissionDecision::Approve } else { PermissionDecision::Deny }; let _ = resp_tx.send(decision); } } Ok(WsRequest::Cancel) => { let _ = chat::cancel_chat(&ctx.state); } Ok(WsRequest::Ping) => { let _ = tx.send(WsResponse::Pong); } Ok(WsRequest::SideQuestion { question, context_messages, config }) => { let tx_side = tx.clone(); let store = ctx.store.clone(); tokio::spawn(async move { let result = chat::side_question( context_messages, question, config, store.as_ref(), |token| { let _ = tx_side.send(WsResponse::SideQuestionToken { content: token.to_string(), }); }, ).await; match result { Ok(response) => { let _ = tx_side.send(WsResponse::SideQuestionDone { response }); } Err(err) => { let _ = tx_side.send(WsResponse::SideQuestionDone { response: format!("Error: {err}"), }); } } }); } _ => {} } } } }; match chat_result { Ok(chat_result) => { if let Some(sid) = chat_result.session_id { let _ = tx.send(WsResponse::SessionId { session_id: sid }); } } Err(err) => { let _ = tx.send(WsResponse::Error { message: err }); } } } Ok(WsRequest::Cancel) => { let _ = chat::cancel_chat(&ctx.state); } Ok(WsRequest::Ping) => { let _ = tx.send(WsResponse::Pong); } Ok(WsRequest::PermissionResponse { .. }) => { // Permission responses outside an active chat are ignored. } Ok(WsRequest::SideQuestion { question, context_messages, config, }) => { let tx_side = tx.clone(); let store = ctx.store.clone(); tokio::spawn(async move { let result = chat::side_question( context_messages, question, config, store.as_ref(), |token| { let _ = tx_side.send(WsResponse::SideQuestionToken { content: token.to_string(), }); }, ) .await; match result { Ok(response) => { let _ = tx_side .send(WsResponse::SideQuestionDone { response }); } Err(err) => { let _ = tx_side.send(WsResponse::SideQuestionDone { response: format!("Error: {err}"), }); } } }); } Err(err) => { let _ = tx.send(WsResponse::Error { message: format!("Invalid request: {err}"), }); } } } drop(tx); let _ = forward.await; }) } #[cfg(test)] mod tests { use super::*; use crate::http::workflow::{PipelineState, UpcomingStory}; use crate::io::watcher::WatcherEvent; // ── WsRequest deserialization ──────────────────────────────────── #[test] fn deserialize_chat_request() { let json = r#"{ "type": "chat", "messages": [ {"role": "user", "content": "hello"} ], "config": { "provider": "ollama", "model": "llama3" } }"#; let req: WsRequest = serde_json::from_str(json).unwrap(); match req { WsRequest::Chat { messages, config } => { assert_eq!(messages.len(), 1); assert_eq!(messages[0].content, "hello"); assert_eq!(config.provider, "ollama"); assert_eq!(config.model, "llama3"); } _ => panic!("expected Chat variant"), } } #[test] fn deserialize_chat_request_with_optional_fields() { let json = r#"{ "type": "chat", "messages": [], "config": { "provider": "anthropic", "model": "claude-3-5-sonnet", "base_url": "https://api.anthropic.com", "enable_tools": true, "session_id": "sess-123" } }"#; let req: WsRequest = serde_json::from_str(json).unwrap(); match req { WsRequest::Chat { messages, config } => { assert!(messages.is_empty()); assert_eq!(config.base_url.as_deref(), Some("https://api.anthropic.com")); assert_eq!(config.enable_tools, Some(true)); assert_eq!(config.session_id.as_deref(), Some("sess-123")); } _ => panic!("expected Chat variant"), } } #[test] fn deserialize_cancel_request() { let json = r#"{"type": "cancel"}"#; let req: WsRequest = serde_json::from_str(json).unwrap(); assert!(matches!(req, WsRequest::Cancel)); } #[test] fn deserialize_ping_request() { let json = r#"{"type": "ping"}"#; let req: WsRequest = serde_json::from_str(json).unwrap(); assert!(matches!(req, WsRequest::Ping)); } #[test] fn deserialize_permission_response_approved() { let json = r#"{ "type": "permission_response", "request_id": "req-42", "approved": true }"#; let req: WsRequest = serde_json::from_str(json).unwrap(); match req { WsRequest::PermissionResponse { request_id, approved, always_allow, } => { assert_eq!(request_id, "req-42"); assert!(approved); assert!(!always_allow); } _ => panic!("expected PermissionResponse variant"), } } #[test] fn deserialize_permission_response_denied() { let json = r#"{ "type": "permission_response", "request_id": "req-99", "approved": false }"#; let req: WsRequest = serde_json::from_str(json).unwrap(); match req { WsRequest::PermissionResponse { request_id, approved, always_allow, } => { assert_eq!(request_id, "req-99"); assert!(!approved); assert!(!always_allow); } _ => panic!("expected PermissionResponse variant"), } } #[test] fn deserialize_permission_response_always_allow() { let json = r#"{ "type": "permission_response", "request_id": "req-100", "approved": true, "always_allow": true }"#; let req: WsRequest = serde_json::from_str(json).unwrap(); match req { WsRequest::PermissionResponse { request_id, approved, always_allow, } => { assert_eq!(request_id, "req-100"); assert!(approved); assert!(always_allow); } _ => panic!("expected PermissionResponse variant"), } } #[test] fn deserialize_unknown_type_fails() { let json = r#"{"type": "unknown_type"}"#; let result: Result = serde_json::from_str(json); assert!(result.is_err()); } #[test] fn deserialize_invalid_json_fails() { let result: Result = serde_json::from_str("not json"); assert!(result.is_err()); } #[test] fn deserialize_missing_type_tag_fails() { let json = r#"{"messages": [], "config": {"provider": "x", "model": "y"}}"#; let result: Result = serde_json::from_str(json); assert!(result.is_err()); } // ── WsResponse serialization ──────────────────────────────────── #[test] fn serialize_token_response() { let resp = WsResponse::Token { content: "hello world".to_string(), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "token"); assert_eq!(json["content"], "hello world"); } #[test] fn serialize_update_response() { let msg = Message { role: crate::llm::types::Role::Assistant, content: "response".to_string(), tool_calls: None, tool_call_id: None, }; let resp = WsResponse::Update { messages: vec![msg], }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "update"); assert_eq!(json["messages"].as_array().unwrap().len(), 1); assert_eq!(json["messages"][0]["content"], "response"); } #[test] fn serialize_session_id_response() { let resp = WsResponse::SessionId { session_id: "sess-abc".to_string(), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "session_id"); assert_eq!(json["session_id"], "sess-abc"); } #[test] fn serialize_error_response() { let resp = WsResponse::Error { message: "something broke".to_string(), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "error"); assert_eq!(json["message"], "something broke"); } #[test] fn serialize_work_item_changed_response() { let resp = WsResponse::WorkItemChanged { stage: "2_current".to_string(), item_id: "42_story_foo".to_string(), action: "start".to_string(), commit_msg: "story-kit: start 42_story_foo".to_string(), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "work_item_changed"); assert_eq!(json["stage"], "2_current"); assert_eq!(json["item_id"], "42_story_foo"); assert_eq!(json["action"], "start"); assert_eq!(json["commit_msg"], "story-kit: start 42_story_foo"); } #[test] fn serialize_pipeline_state_response() { let story = crate::http::workflow::UpcomingStory { story_id: "10_story_test".to_string(), name: Some("Test".to_string()), error: None, merge_failure: None, agent: None, review_hold: None, manual_qa: None, }; let resp = WsResponse::PipelineState { backlog: vec![story], current: vec![], qa: vec![], merge: vec![], done: vec![], }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "pipeline_state"); assert_eq!(json["backlog"].as_array().unwrap().len(), 1); assert_eq!(json["backlog"][0]["story_id"], "10_story_test"); assert!(json["current"].as_array().unwrap().is_empty()); assert!(json["done"].as_array().unwrap().is_empty()); } #[test] fn serialize_agent_config_changed_response() { let resp = WsResponse::AgentConfigChanged; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "agent_config_changed"); } #[test] fn serialize_pong_response() { let resp = WsResponse::Pong; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "pong"); } #[test] fn serialize_thinking_token_response() { let resp = WsResponse::ThinkingToken { content: "I need to think about this...".to_string(), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "thinking_token"); assert_eq!(json["content"], "I need to think about this..."); } #[test] fn serialize_onboarding_status_true() { let resp = WsResponse::OnboardingStatus { needs_onboarding: true, }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "onboarding_status"); assert_eq!(json["needs_onboarding"], true); } #[test] fn serialize_onboarding_status_false() { let resp = WsResponse::OnboardingStatus { needs_onboarding: false, }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "onboarding_status"); assert_eq!(json["needs_onboarding"], false); } #[test] fn serialize_permission_request_response() { let resp = WsResponse::PermissionRequest { request_id: "perm-1".to_string(), tool_name: "Bash".to_string(), tool_input: serde_json::json!({"command": "ls"}), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "permission_request"); assert_eq!(json["request_id"], "perm-1"); assert_eq!(json["tool_name"], "Bash"); assert_eq!(json["tool_input"]["command"], "ls"); } #[test] fn serialize_tool_activity_response() { let resp = WsResponse::ToolActivity { tool_name: "Read".to_string(), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "tool_activity"); assert_eq!(json["tool_name"], "Read"); } #[test] fn serialize_reconciliation_progress_response() { let resp = WsResponse::ReconciliationProgress { story_id: "50_story_x".to_string(), status: "gates_running".to_string(), message: "Running clippy...".to_string(), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "reconciliation_progress"); assert_eq!(json["story_id"], "50_story_x"); assert_eq!(json["status"], "gates_running"); assert_eq!(json["message"], "Running clippy..."); } // ── From for Option ─────────────────── #[test] fn watcher_work_item_converts_to_ws_response() { let evt = WatcherEvent::WorkItem { stage: "2_current".to_string(), item_id: "42_story_foo".to_string(), action: "start".to_string(), commit_msg: "story-kit: start 42_story_foo".to_string(), }; let ws_msg: Option = evt.into(); let ws_msg = ws_msg.expect("WorkItem should produce Some"); let json = serde_json::to_value(&ws_msg).unwrap(); assert_eq!(json["type"], "work_item_changed"); assert_eq!(json["stage"], "2_current"); assert_eq!(json["item_id"], "42_story_foo"); assert_eq!(json["action"], "start"); } #[test] fn watcher_config_changed_converts_to_ws_response() { let evt = WatcherEvent::ConfigChanged; let ws_msg: Option = evt.into(); let ws_msg = ws_msg.expect("ConfigChanged should produce Some"); let json = serde_json::to_value(&ws_msg).unwrap(); assert_eq!(json["type"], "agent_config_changed"); } // ── From for WsResponse ────────────────────────── #[test] fn pipeline_state_converts_to_ws_response() { let state = PipelineState { backlog: vec![UpcomingStory { story_id: "1_story_a".to_string(), name: Some("Story A".to_string()), error: None, merge_failure: None, agent: None, review_hold: None, manual_qa: None, }], current: vec![UpcomingStory { story_id: "2_story_b".to_string(), name: Some("Story B".to_string()), error: None, merge_failure: None, agent: None, review_hold: None, manual_qa: None, }], qa: vec![], merge: vec![], done: vec![UpcomingStory { story_id: "50_story_done".to_string(), name: Some("Done Story".to_string()), error: None, merge_failure: None, agent: None, review_hold: None, manual_qa: None, }], }; let resp: WsResponse = state.into(); let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "pipeline_state"); assert_eq!(json["backlog"].as_array().unwrap().len(), 1); assert_eq!(json["backlog"][0]["story_id"], "1_story_a"); assert_eq!(json["current"].as_array().unwrap().len(), 1); assert_eq!(json["current"][0]["story_id"], "2_story_b"); assert!(json["qa"].as_array().unwrap().is_empty()); assert!(json["merge"].as_array().unwrap().is_empty()); assert_eq!(json["done"].as_array().unwrap().len(), 1); assert_eq!(json["done"][0]["story_id"], "50_story_done"); } #[test] fn empty_pipeline_state_converts_to_ws_response() { let state = PipelineState { backlog: vec![], current: vec![], qa: vec![], merge: vec![], done: vec![], }; let resp: WsResponse = state.into(); let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "pipeline_state"); assert!(json["backlog"].as_array().unwrap().is_empty()); assert!(json["current"].as_array().unwrap().is_empty()); assert!(json["qa"].as_array().unwrap().is_empty()); assert!(json["merge"].as_array().unwrap().is_empty()); assert!(json["done"].as_array().unwrap().is_empty()); } // ── WsResponse JSON round-trip (string form) ──────────────────── #[test] fn ws_response_serializes_to_parseable_json_string() { let resp = WsResponse::Error { message: "test error".to_string(), }; let text = serde_json::to_string(&resp).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); assert_eq!(parsed["type"], "error"); assert_eq!(parsed["message"], "test error"); } #[test] fn ws_response_update_with_empty_messages() { let resp = WsResponse::Update { messages: vec![] }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "update"); assert!(json["messages"].as_array().unwrap().is_empty()); } #[test] fn ws_response_token_with_empty_content() { let resp = WsResponse::Token { content: String::new(), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["type"], "token"); assert_eq!(json["content"], ""); } #[test] fn ws_response_error_with_special_characters() { let resp = WsResponse::Error { message: "error: \"quoted\" & ".to_string(), }; let text = serde_json::to_string(&resp).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); assert_eq!(parsed["message"], "error: \"quoted\" & "); } // ── WsRequest edge cases ──────────────────────────────────────── #[test] fn deserialize_chat_with_multiple_messages() { let json = r#"{ "type": "chat", "messages": [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Hello"}, {"role": "assistant", "content": "Hi there!"}, {"role": "user", "content": "How are you?"} ], "config": { "provider": "ollama", "model": "llama3" } }"#; let req: WsRequest = serde_json::from_str(json).unwrap(); match req { WsRequest::Chat { messages, .. } => { assert_eq!(messages.len(), 4); assert_eq!(messages[0].role, crate::llm::types::Role::System); assert_eq!(messages[3].role, crate::llm::types::Role::User); } _ => panic!("expected Chat variant"), } } #[test] fn deserialize_chat_with_tool_call_message() { let json = r#"{ "type": "chat", "messages": [ { "role": "assistant", "content": "", "tool_calls": [ { "id": "call_1", "type": "function", "function": { "name": "read_file", "arguments": "{\"path\": \"/tmp/test.rs\"}" } } ] } ], "config": { "provider": "anthropic", "model": "claude-3-5-sonnet" } }"#; let req: WsRequest = serde_json::from_str(json).unwrap(); match req { WsRequest::Chat { messages, .. } => { assert_eq!(messages.len(), 1); let tc = messages[0].tool_calls.as_ref().unwrap(); assert_eq!(tc.len(), 1); assert_eq!(tc[0].function.name, "read_file"); } _ => panic!("expected Chat variant"), } } // ── Pipeline state with agent assignment ──────────────────────── #[test] fn pipeline_state_with_agent_converts_correctly() { let state = PipelineState { backlog: vec![], current: vec![UpcomingStory { story_id: "10_story_x".to_string(), name: Some("Story X".to_string()), error: None, merge_failure: None, agent: Some(crate::http::workflow::AgentAssignment { agent_name: "coder-1".to_string(), model: Some("claude-3-5-sonnet".to_string()), status: "running".to_string(), }), review_hold: None, manual_qa: None, }], qa: vec![], merge: vec![], done: vec![], }; let resp: WsResponse = state.into(); let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["current"][0]["agent"]["agent_name"], "coder-1"); assert_eq!(json["current"][0]["agent"]["model"], "claude-3-5-sonnet"); assert_eq!(json["current"][0]["agent"]["status"], "running"); } // ── Reconciliation progress done event ────────────────────────── #[test] fn reconciliation_done_event_has_empty_story_id() { let resp = WsResponse::ReconciliationProgress { story_id: String::new(), status: "done".to_string(), message: "Reconciliation complete".to_string(), }; let json = serde_json::to_value(&resp).unwrap(); assert_eq!(json["story_id"], ""); assert_eq!(json["status"], "done"); } // ── ws_handler integration tests (real WebSocket connection) ───── use futures::stream::SplitSink; use poem::EndpointExt; use tokio_tungstenite::tungstenite; /// Helper: construct a tungstenite text message from a string. fn ws_text(s: &str) -> tungstenite::Message { tungstenite::Message::Text(s.into()) } /// Helper: start a poem server with ws_handler on an ephemeral port /// and return the WebSocket URL. async fn start_test_server() -> (String, Arc) { let tmp = tempfile::tempdir().unwrap(); let root = tmp.path().to_path_buf(); // Create minimal pipeline dirs so load_pipeline_state succeeds. for stage in &["1_backlog", "2_current", "3_qa", "4_merge"] { std::fs::create_dir_all(root.join(".story_kit").join("work").join(stage)).unwrap(); } let ctx = Arc::new(AppContext::new_test(root)); let ctx_data = ctx.clone(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let app = poem::Route::new() .at("/ws", poem::get(ws_handler)) .data(ctx_data); tokio::spawn(async move { let acceptor = poem::listener::TcpAcceptor::from_tokio(listener).unwrap(); let _ = poem::Server::new_with_acceptor(acceptor) .run(app) .await; }); // Small delay to let the server start. tokio::time::sleep(std::time::Duration::from_millis(50)).await; let url = format!("ws://127.0.0.1:{}/ws", addr.port()); (url, ctx) } type WsSink = SplitSink< tokio_tungstenite::WebSocketStream< tokio_tungstenite::MaybeTlsStream, >, tungstenite::Message, >; /// Helper: connect and return (sink, stream) plus read the initial /// pipeline_state and onboarding_status messages that are always sent /// on connect. async fn connect_ws( url: &str, ) -> ( WsSink, futures::stream::SplitStream< tokio_tungstenite::WebSocketStream< tokio_tungstenite::MaybeTlsStream, >, >, serde_json::Value, ) { let (ws, _resp) = tokio_tungstenite::connect_async(url).await.unwrap(); let (sink, mut stream) = futures::StreamExt::split(ws); // The first message should be the initial pipeline_state. let first = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()) .await .expect("timeout waiting for initial message") .expect("stream ended") .expect("ws error"); let initial: serde_json::Value = match first { tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(), other => panic!("expected text message, got: {other:?}"), }; // The second message is the onboarding_status — consume it so // callers only see application-level messages. let second = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()) .await .expect("timeout waiting for onboarding_status") .expect("stream ended") .expect("ws error"); let onboarding: serde_json::Value = match second { tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(), other => panic!("expected text message, got: {other:?}"), }; assert_eq!( onboarding["type"], "onboarding_status", "expected onboarding_status, got: {onboarding}" ); (sink, stream, initial) } /// Read next text message from the stream with a timeout. async fn next_msg( stream: &mut futures::stream::SplitStream< tokio_tungstenite::WebSocketStream< tokio_tungstenite::MaybeTlsStream, >, >, ) -> serde_json::Value { let msg = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()) .await .expect("timeout waiting for message") .expect("stream ended") .expect("ws error"); match msg { tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(), other => panic!("expected text message, got: {other:?}"), } } #[tokio::test] async fn ws_handler_sends_initial_pipeline_state_on_connect() { let (url, _ctx) = start_test_server().await; let (_sink, _stream, initial) = connect_ws(&url).await; assert_eq!(initial["type"], "pipeline_state"); // All stages should be empty arrays since no .md files were created. assert!(initial["backlog"].as_array().unwrap().is_empty()); assert!(initial["current"].as_array().unwrap().is_empty()); assert!(initial["qa"].as_array().unwrap().is_empty()); assert!(initial["merge"].as_array().unwrap().is_empty()); } #[tokio::test] async fn ws_handler_returns_error_for_invalid_json() { let (url, _ctx) = start_test_server().await; let (mut sink, mut stream, _initial) = connect_ws(&url).await; // Send invalid JSON. sink.send(ws_text("not valid json")) .await .unwrap(); let msg = next_msg(&mut stream).await; assert_eq!(msg["type"], "error"); assert!( msg["message"] .as_str() .unwrap() .contains("Invalid request"), "error message should indicate invalid request, got: {}", msg["message"] ); } #[tokio::test] async fn ws_handler_returns_error_for_unknown_type() { let (url, _ctx) = start_test_server().await; let (mut sink, mut stream, _initial) = connect_ws(&url).await; // Send a message with an unknown type. sink.send(ws_text(r#"{"type": "bogus"}"#)) .await .unwrap(); let msg = next_msg(&mut stream).await; assert_eq!(msg["type"], "error"); assert!(msg["message"].as_str().unwrap().contains("Invalid request")); } #[tokio::test] async fn ws_handler_cancel_outside_chat_does_not_error() { let (url, _ctx) = start_test_server().await; let (mut sink, mut stream, _initial) = connect_ws(&url).await; // Send cancel when no chat is active — should not produce an error. sink.send(ws_text(r#"{"type": "cancel"}"#)) .await .unwrap(); // Send another invalid message to check the connection is still alive. sink.send(ws_text("{}")) .await .unwrap(); let msg = next_msg(&mut stream).await; // The invalid JSON message should produce an error, confirming // the cancel didn't break the connection. assert_eq!(msg["type"], "error"); } #[tokio::test] async fn ws_handler_permission_response_outside_chat_is_ignored() { let (url, _ctx) = start_test_server().await; let (mut sink, mut stream, _initial) = connect_ws(&url).await; // Send permission response outside an active chat. sink.send(ws_text( r#"{"type": "permission_response", "request_id": "x", "approved": true}"#, )) .await .unwrap(); // Send a probe message to check the connection is still alive. sink.send(ws_text("bad")) .await .unwrap(); let msg = next_msg(&mut stream).await; assert_eq!(msg["type"], "error"); assert!(msg["message"].as_str().unwrap().contains("Invalid request")); } #[tokio::test] async fn ws_handler_forwards_watcher_events() { let (url, ctx) = start_test_server().await; let (_sink, mut stream, _initial) = connect_ws(&url).await; // Broadcast a watcher event. ctx.watcher_tx .send(WatcherEvent::WorkItem { stage: "2_current".to_string(), item_id: "99_story_test".to_string(), action: "start".to_string(), commit_msg: "story-kit: start 99_story_test".to_string(), }) .unwrap(); let msg = next_msg(&mut stream).await; assert_eq!(msg["type"], "work_item_changed"); assert_eq!(msg["item_id"], "99_story_test"); assert_eq!(msg["stage"], "2_current"); // After a work-item event, a pipeline_state refresh is pushed. let state_msg = next_msg(&mut stream).await; assert_eq!(state_msg["type"], "pipeline_state"); } #[tokio::test] async fn ws_handler_forwards_config_changed_without_pipeline_refresh() { let (url, ctx) = start_test_server().await; let (_sink, mut stream, _initial) = connect_ws(&url).await; // Broadcast a config-changed event. ctx.watcher_tx.send(WatcherEvent::ConfigChanged).unwrap(); let msg = next_msg(&mut stream).await; assert_eq!(msg["type"], "agent_config_changed"); // Config-changed should NOT be followed by a pipeline_state refresh. // Send a probe to check no extra message is queued. ctx.watcher_tx.send(WatcherEvent::ConfigChanged).unwrap(); let msg2 = next_msg(&mut stream).await; assert_eq!(msg2["type"], "agent_config_changed"); } #[tokio::test] async fn ws_handler_forwards_reconciliation_events() { let (url, ctx) = start_test_server().await; let (_sink, mut stream, _initial) = connect_ws(&url).await; // Broadcast a reconciliation event. ctx.reconciliation_tx .send(crate::agents::ReconciliationEvent { story_id: "50_story_recon".to_string(), status: "checking".to_string(), message: "Checking story...".to_string(), }) .unwrap(); let msg = next_msg(&mut stream).await; assert_eq!(msg["type"], "reconciliation_progress"); assert_eq!(msg["story_id"], "50_story_recon"); assert_eq!(msg["status"], "checking"); assert_eq!(msg["message"], "Checking story..."); } #[tokio::test] async fn ws_handler_handles_client_disconnect_gracefully() { let (url, _ctx) = start_test_server().await; let (mut sink, _stream, _initial) = connect_ws(&url).await; // Close the connection — should not panic the server. sink.close().await.unwrap(); // Give the server a moment to process the close. tokio::time::sleep(std::time::Duration::from_millis(100)).await; // Connect again to verify server is still alive. let (_sink2, _stream2, initial2) = connect_ws(&url).await; assert_eq!(initial2["type"], "pipeline_state"); } }