From d9e883c21ddb9a9fc3cbbf3de232b2dab59504a6 Mon Sep 17 00:00:00 2001 From: dave Date: Fri, 24 Apr 2026 14:32:36 +0000 Subject: [PATCH] huskies: merge 612_story_extract_ws_service --- server/src/http/ws.rs | 1020 ++--------------------------- server/src/service/mod.rs | 1 + server/src/service/ws/dispatch.rs | 342 ++++++++++ server/src/service/ws/error.rs | 98 +++ server/src/service/ws/io.rs | 144 ++++ server/src/service/ws/message.rs | 993 ++++++++++++++++++++++++++++ server/src/service/ws/mod.rs | 25 + 7 files changed, 1652 insertions(+), 971 deletions(-) create mode 100644 server/src/service/ws/dispatch.rs create mode 100644 server/src/service/ws/error.rs create mode 100644 server/src/service/ws/io.rs create mode 100644 server/src/service/ws/message.rs create mode 100644 server/src/service/ws/mod.rs diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index fb02f407..f44f91e1 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -1,204 +1,22 @@ -//! WebSocket endpoint — real-time pipeline updates, chat, and permission prompts. -use crate::http::context::{AppContext, PermissionDecision}; -use crate::http::workflow::{PipelineState, load_pipeline_state}; -use crate::io::onboarding; -use crate::io::watcher::WatcherEvent; -use crate::io::wizard; +//! WebSocket transport adapter — accept connection, serialise/deserialise frames, +//! invoke service methods. No business logic, no inline state transitions. + +use crate::http::context::AppContext; use crate::llm::chat; -use crate::llm::types::Message; -use crate::log_buffer; +use crate::service::ws::{self, WsResponse}; use futures::{SinkExt, StreamExt}; use poem::handler; use poem::web::Data; use poem::web::websocket::{Message as WsMessage, WebSocket}; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; -#[derive(Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] -/// WebSocket request messages sent by the client. -/// -/// - `chat` starts a streaming chat session. -/// - `cancel` stops the active session. -/// - `permission_response` approves or denies a pending permission request. -enum WsRequest { - Chat { - messages: Vec, - config: chat::ProviderConfig, - }, - Cancel, - PermissionResponse { - request_id: String, - approved: bool, - #[serde(default)] - always_allow: bool, - }, - /// Heartbeat ping from the client. The server responds with `Pong` so the - /// client can detect stale (half-closed) connections. - Ping, - /// A quick side question answered from current conversation context. - /// The question and response are NOT added to the conversation history - /// and no tool calls are made. - SideQuestion { - question: String, - context_messages: Vec, - config: chat::ProviderConfig, - }, -} +use crate::http::context::PermissionDecision; -/// Serialisable summary of a single wizard step for WebSocket broadcast. -#[derive(Serialize, Clone)] -pub struct WizardStepInfo { - pub step: String, - pub label: String, - pub status: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub content: Option, -} - -#[derive(Serialize)] -#[serde(tag = "type", rename_all = "snake_case")] -/// WebSocket response messages sent by the server. -/// -/// - `token` streams partial model output. -/// - `update` pushes the updated message history. -/// - `error` reports a request or processing failure. -/// - `work_item_changed` notifies that a `.huskies/work/` file changed. -/// - `agent_config_changed` notifies that `.huskies/project.toml` was modified. -enum WsResponse { - Token { - content: String, - }, - Update { - messages: Vec, - }, - /// Session ID for Claude Code conversation resumption. - SessionId { - session_id: String, - }, - Error { - message: String, - }, - /// Filesystem watcher notification: a work-pipeline file was created or - /// modified and auto-committed. The frontend can use this to refresh its - /// story/bug list without polling. - WorkItemChanged { - stage: String, - item_id: String, - action: String, - commit_msg: String, - }, - /// Full pipeline state pushed on connect and after every work-item watcher event. - PipelineState { - backlog: Vec, - current: Vec, - qa: Vec, - merge: Vec, - done: Vec, - }, - /// `.huskies/project.toml` was modified; the frontend should re-fetch the - /// agent roster. Does NOT trigger a pipeline state refresh. - AgentConfigChanged, - /// An agent's state changed (started, stopped, completed, etc.). - /// Triggers a pipeline state refresh and tells the frontend to re-fetch - /// the agent list. - AgentStateChanged, - /// Claude Code is requesting user approval before executing a tool. - PermissionRequest { - request_id: String, - tool_name: String, - tool_input: serde_json::Value, - }, - /// The agent started assembling a tool call; shows live status in the UI. - ToolActivity { - tool_name: String, - }, - /// Real-time progress from the server startup reconciliation pass. - /// `status` is one of: "checking", "gates_running", "advanced", "skipped", - /// "failed", "done". `story_id` is empty for the overall "done" event. - ReconciliationProgress { - story_id: String, - status: String, - message: String, - }, - /// Heartbeat response to a client `Ping`. Lets the client confirm the - /// connection is alive and cancel any stale-connection timeout. - Pong, - /// Streaming thinking token from an extended-thinking block. - /// Sent separately from `Token` so the frontend can render them in - /// a constrained, scrollable ThinkingBlock rather than inline. - ThinkingToken { - content: String, - }, - /// Sent on connect when the project's spec files still contain scaffold - /// placeholder content and the user needs to go through onboarding. - OnboardingStatus { - needs_onboarding: bool, - }, - /// Sent on connect when a setup wizard is active. Contains the full - /// wizard state so the frontend can render the step-by-step UI. - WizardState { - steps: Vec, - current_step_index: usize, - completed: bool, - }, - /// Streaming token from a `/btw` side question response. - SideQuestionToken { - content: String, - }, - /// Final signal that the `/btw` side question has been fully answered. - SideQuestionDone { - response: String, - }, - /// A single server log entry. Sent in bulk on connect (recent history), - /// then streamed live as new entries arrive. - LogEntry { - timestamp: String, - level: String, - message: String, - }, -} - -impl From for Option { - fn from(e: WatcherEvent) -> Self { - match e { - WatcherEvent::WorkItem { - stage, - item_id, - action, - commit_msg, - .. - } => Some(WsResponse::WorkItemChanged { - stage, - item_id, - action, - commit_msg, - }), - WatcherEvent::ConfigChanged => Some(WsResponse::AgentConfigChanged), - WatcherEvent::AgentStateChanged => Some(WsResponse::AgentStateChanged), - // MergeFailure, RateLimitWarning, StoryBlocked, and RateLimitHardBlock are handled - // by the chat notification listener only; no WebSocket message is needed for the frontend. - WatcherEvent::MergeFailure { .. } => None, - WatcherEvent::RateLimitWarning { .. } => None, - WatcherEvent::StoryBlocked { .. } => None, - WatcherEvent::RateLimitHardBlock { .. } => None, - } - } -} - -impl From for WsResponse { - fn from(s: PipelineState) -> Self { - WsResponse::PipelineState { - backlog: s.backlog, - current: s.current, - qa: s.qa, - merge: s.merge, - done: s.done, - } - } -} +// Re-export WizardStepInfo for any downstream code that imports it from here. +#[allow(unused_imports)] +pub use crate::service::ws::WizardStepInfo; #[handler] /// WebSocket endpoint for streaming chat responses, cancellation, and @@ -221,153 +39,26 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem } }); - // Push initial pipeline state to the client on connect. - if let Ok(state) = load_pipeline_state(ctx.as_ref()) { - let _ = tx.send(state.into()); + // ── Initial state burst ───────���───────────────────────────── + if let Some(state) = ws::load_initial_pipeline_state(ctx.as_ref()) { + let _ = tx.send(state); + } + let _ = tx.send(ws::check_onboarding(ctx.as_ref())); + if let Some(wiz) = ws::load_wizard_state(ctx.as_ref()) { + let _ = tx.send(wiz); + } + for log in ws::load_recent_logs(100) { + let _ = tx.send(log); } - // Push onboarding status so the frontend knows whether to show - // the onboarding welcome flow. - { - let needs = ctx - .state - .get_project_root() - .map(|root| onboarding::check_onboarding_status(&root).needs_onboarding()) - .unwrap_or(false); - let _ = tx.send(WsResponse::OnboardingStatus { - needs_onboarding: needs, - }); - } + // ── Background subscriptions ──────────────────────��───────── + ws::subscribe_logs(tx.clone()); + ws::subscribe_watcher(tx.clone(), ctx.clone(), ctx.watcher_tx.subscribe()); + ws::subscribe_reconciliation(tx.clone(), ctx.reconciliation_tx.subscribe()); - // Push wizard state if an active wizard exists. - { - if let Ok(root) = ctx.state.get_project_root() - && let Some(ws) = wizard::WizardState::load(&root) - { - let steps: Vec = ws - .steps - .iter() - .map(|s| WizardStepInfo { - step: serde_json::to_value(s.step) - .ok() - .and_then(|v| v.as_str().map(String::from)) - .unwrap_or_default(), - label: s.step.label().to_string(), - status: serde_json::to_value(&s.status) - .ok() - .and_then(|v| v.as_str().map(String::from)) - .unwrap_or_default(), - content: s.content.clone(), - }) - .collect(); - let _ = tx.send(WsResponse::WizardState { - steps, - current_step_index: ws.current_step_index(), - completed: ws.completed, - }); - } - } - - // Push recent server log entries so the client has history on connect. - { - let entries = log_buffer::global().get_recent_entries(100, None, None); - for entry in entries { - let _ = tx.send(WsResponse::LogEntry { - timestamp: entry.timestamp, - level: entry.level.as_str().to_string(), - message: entry.message, - }); - } - } - - // Subscribe to live log entries and forward them to the client. - let tx_logs = tx.clone(); - let mut log_rx = log_buffer::global().subscribe(); - tokio::spawn(async move { - loop { - match log_rx.recv().await { - Ok(entry) => { - if tx_logs - .send(WsResponse::LogEntry { - timestamp: entry.timestamp, - level: entry.level.as_str().to_string(), - message: entry.message, - }) - .is_err() - { - break; - } - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - }); - - // Subscribe to filesystem watcher events and forward them to the client. - // After each work-item event, also push the updated pipeline state. - // Config-changed events are forwarded as-is without a pipeline refresh. - let tx_watcher = tx.clone(); - let ctx_watcher = ctx.clone(); - let mut watcher_rx = ctx.watcher_tx.subscribe(); - tokio::spawn(async move { - loop { - match watcher_rx.recv().await { - Ok(evt) => { - let needs_pipeline_refresh = matches!( - evt, - crate::io::watcher::WatcherEvent::WorkItem { .. } - | crate::io::watcher::WatcherEvent::AgentStateChanged - ); - let ws_msg: Option = evt.into(); - if let Some(msg) = ws_msg && tx_watcher.send(msg).is_err() { - break; - } - // Push refreshed pipeline state after work-item changes and - // agent state changes (so the board updates agent lozenges). - if needs_pipeline_refresh - && let Ok(state) = load_pipeline_state(ctx_watcher.as_ref()) - && tx_watcher.send(state.into()).is_err() - { - break; - } - } - // Lagged: skip missed events, keep going. - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - }); - - // Subscribe to startup reconciliation events and forward them to the client. - let tx_reconcile = tx.clone(); - let mut reconcile_rx = ctx.reconciliation_tx.subscribe(); - tokio::spawn(async move { - loop { - match reconcile_rx.recv().await { - Ok(evt) => { - if tx_reconcile - .send(WsResponse::ReconciliationProgress { - story_id: evt.story_id, - status: evt.status, - message: evt.message, - }) - .is_err() - { - break; - } - } - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - } - } - }); - - // Map of pending permission request_id → oneshot responder. - // Permission requests arrive from the MCP `prompt_permission` tool via - // `ctx.perm_rx` and are forwarded to the client as `PermissionRequest`. - // When the client responds, we resolve the corresponding oneshot. - let mut pending_perms: HashMap> = HashMap::new(); + // Map of pending permission request_id -> oneshot responder. + let mut pending_perms: HashMap> = + HashMap::new(); loop { // Outer loop: wait for the next WebSocket message. @@ -375,17 +66,14 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem break; }; - let parsed: Result = serde_json::from_str(&text); - match parsed { - Ok(WsRequest::Chat { messages, config }) => { + match ws::dispatch_outer(&text) { + ws::DispatchResult::StartChat { messages, config } => { let tx_updates = tx.clone(); let tx_tokens = tx.clone(); let tx_thinking = tx.clone(); let tx_activity = tx.clone(); let ctx_clone = ctx.clone(); - // Build the chat future without driving it yet so we can - // interleave it with permission-request forwarding. let chat_fut = chat::chat( messages, config, @@ -414,52 +102,33 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem ); tokio::pin!(chat_fut); - // Lock the permission receiver for the duration of this chat - // session. Permission requests from the MCP tool arrive here. let mut perm_rx = ctx.perm_rx.lock().await; - // Inner loop: drive the chat while concurrently handling - // permission requests (from MCP) and WebSocket messages. let chat_result = loop { tokio::select! { result = &mut chat_fut => break result, - // Forward permission requests from MCP tool to the client. Some(perm_fwd) = perm_rx.recv() => { - let _ = tx.send(WsResponse::PermissionRequest { - request_id: perm_fwd.request_id.clone(), - tool_name: perm_fwd.tool_name.clone(), - tool_input: perm_fwd.tool_input.clone(), - }); + let _ = tx.send(ws::permission_request_response( + &perm_fwd.request_id, + &perm_fwd.tool_name, + &perm_fwd.tool_input, + )); pending_perms.insert( perm_fwd.request_id, perm_fwd.response_tx, ); } - // Handle WebSocket messages during an active chat - // (permission responses and cancellations). Some(Ok(WsMessage::Text(inner_text))) = stream.next() => { - match serde_json::from_str::(&inner_text) { - Ok(WsRequest::PermissionResponse { request_id, approved, always_allow }) => { - if let Some(resp_tx) = pending_perms.remove(&request_id) { - let decision = if always_allow { - PermissionDecision::AlwaysAllow - } else if approved { - PermissionDecision::Approve - } else { - PermissionDecision::Deny - }; - let _ = resp_tx.send(decision); - } - } - Ok(WsRequest::Cancel) => { + match ws::dispatch_inner(&inner_text, &mut pending_perms) { + ws::InnerDispatchResult::CancelChat => { let _ = chat::cancel_chat(&ctx.state); } - Ok(WsRequest::Ping) => { + ws::InnerDispatchResult::Pong => { let _ = tx.send(WsResponse::Pong); } - Ok(WsRequest::SideQuestion { question, context_messages, config }) => { + ws::InnerDispatchResult::StartSideQuestion { question, context_messages, config } => { let tx_side = tx.clone(); let store = ctx.store.clone(); tokio::spawn(async move { @@ -486,7 +155,8 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem } }); } - _ => {} + ws::InnerDispatchResult::PermissionResolved + | ws::InnerDispatchResult::Ignored => {} } } } @@ -499,24 +169,24 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem } } Err(err) => { - let _ = tx.send(WsResponse::Error { message: err }); + let _ = tx.send(ws::error_response(err)); } } } - Ok(WsRequest::Cancel) => { + ws::DispatchResult::CancelChat => { let _ = chat::cancel_chat(&ctx.state); } - Ok(WsRequest::Ping) => { + ws::DispatchResult::Pong => { let _ = tx.send(WsResponse::Pong); } - Ok(WsRequest::PermissionResponse { .. }) => { + ws::DispatchResult::IgnoredPermission => { // Permission responses outside an active chat are ignored. } - Ok(WsRequest::SideQuestion { + ws::DispatchResult::StartSideQuestion { question, context_messages, config, - }) => { + } => { let tx_side = tx.clone(); let store = ctx.store.clone(); tokio::spawn(async move { @@ -534,8 +204,8 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem .await; match result { Ok(response) => { - let _ = tx_side - .send(WsResponse::SideQuestionDone { response }); + let _ = + tx_side.send(WsResponse::SideQuestionDone { response }); } Err(err) => { let _ = tx_side.send(WsResponse::SideQuestionDone { @@ -545,10 +215,8 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem } }); } - Err(err) => { - let _ = tx.send(WsResponse::Error { - message: format!("Invalid request: {err}"), - }); + ws::DispatchResult::ParseError(msg) => { + let _ = tx.send(ws::error_response(msg)); } } } @@ -561,598 +229,8 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem #[cfg(test)] mod tests { use super::*; - use crate::http::workflow::{PipelineState, UpcomingStory}; use crate::io::watcher::WatcherEvent; - // ── WsRequest deserialization ──────────────────────────────────── - - #[test] - fn deserialize_chat_request() { - let json = r#"{ - "type": "chat", - "messages": [ - {"role": "user", "content": "hello"} - ], - "config": { - "provider": "ollama", - "model": "llama3" - } - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::Chat { messages, config } => { - assert_eq!(messages.len(), 1); - assert_eq!(messages[0].content, "hello"); - assert_eq!(config.provider, "ollama"); - assert_eq!(config.model, "llama3"); - } - _ => panic!("expected Chat variant"), - } - } - - #[test] - fn deserialize_chat_request_with_optional_fields() { - let json = r#"{ - "type": "chat", - "messages": [], - "config": { - "provider": "anthropic", - "model": "claude-3-5-sonnet", - "base_url": "https://api.anthropic.com", - "enable_tools": true, - "session_id": "sess-123" - } - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::Chat { messages, config } => { - assert!(messages.is_empty()); - assert_eq!( - config.base_url.as_deref(), - Some("https://api.anthropic.com") - ); - assert_eq!(config.enable_tools, Some(true)); - assert_eq!(config.session_id.as_deref(), Some("sess-123")); - } - _ => panic!("expected Chat variant"), - } - } - - #[test] - fn deserialize_cancel_request() { - let json = r#"{"type": "cancel"}"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - assert!(matches!(req, WsRequest::Cancel)); - } - - #[test] - fn deserialize_ping_request() { - let json = r#"{"type": "ping"}"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - assert!(matches!(req, WsRequest::Ping)); - } - - #[test] - fn deserialize_permission_response_approved() { - let json = r#"{ - "type": "permission_response", - "request_id": "req-42", - "approved": true - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::PermissionResponse { - request_id, - approved, - always_allow, - } => { - assert_eq!(request_id, "req-42"); - assert!(approved); - assert!(!always_allow); - } - _ => panic!("expected PermissionResponse variant"), - } - } - - #[test] - fn deserialize_permission_response_denied() { - let json = r#"{ - "type": "permission_response", - "request_id": "req-99", - "approved": false - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::PermissionResponse { - request_id, - approved, - always_allow, - } => { - assert_eq!(request_id, "req-99"); - assert!(!approved); - assert!(!always_allow); - } - _ => panic!("expected PermissionResponse variant"), - } - } - - #[test] - fn deserialize_permission_response_always_allow() { - let json = r#"{ - "type": "permission_response", - "request_id": "req-100", - "approved": true, - "always_allow": true - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::PermissionResponse { - request_id, - approved, - always_allow, - } => { - assert_eq!(request_id, "req-100"); - assert!(approved); - assert!(always_allow); - } - _ => panic!("expected PermissionResponse variant"), - } - } - - #[test] - fn deserialize_unknown_type_fails() { - let json = r#"{"type": "unknown_type"}"#; - let result: Result = serde_json::from_str(json); - assert!(result.is_err()); - } - - #[test] - fn deserialize_invalid_json_fails() { - let result: Result = serde_json::from_str("not json"); - assert!(result.is_err()); - } - - #[test] - fn deserialize_missing_type_tag_fails() { - let json = r#"{"messages": [], "config": {"provider": "x", "model": "y"}}"#; - let result: Result = serde_json::from_str(json); - assert!(result.is_err()); - } - - // ── WsResponse serialization ──────────────────────────────────── - - #[test] - fn serialize_token_response() { - let resp = WsResponse::Token { - content: "hello world".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "token"); - assert_eq!(json["content"], "hello world"); - } - - #[test] - fn serialize_update_response() { - let msg = Message { - role: crate::llm::types::Role::Assistant, - content: "response".to_string(), - tool_calls: None, - tool_call_id: None, - }; - let resp = WsResponse::Update { - messages: vec![msg], - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "update"); - assert_eq!(json["messages"].as_array().unwrap().len(), 1); - assert_eq!(json["messages"][0]["content"], "response"); - } - - #[test] - fn serialize_session_id_response() { - let resp = WsResponse::SessionId { - session_id: "sess-abc".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "session_id"); - assert_eq!(json["session_id"], "sess-abc"); - } - - #[test] - fn serialize_error_response() { - let resp = WsResponse::Error { - message: "something broke".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "error"); - assert_eq!(json["message"], "something broke"); - } - - #[test] - fn serialize_work_item_changed_response() { - let resp = WsResponse::WorkItemChanged { - stage: "2_current".to_string(), - item_id: "42_story_foo".to_string(), - action: "start".to_string(), - commit_msg: "huskies: start 42_story_foo".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "work_item_changed"); - assert_eq!(json["stage"], "2_current"); - assert_eq!(json["item_id"], "42_story_foo"); - assert_eq!(json["action"], "start"); - assert_eq!(json["commit_msg"], "huskies: start 42_story_foo"); - } - - #[test] - fn serialize_pipeline_state_response() { - let story = crate::http::workflow::UpcomingStory { - story_id: "10_story_test".to_string(), - name: Some("Test".to_string()), - error: None, - merge_failure: None, - agent: None, - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }; - let resp = WsResponse::PipelineState { - backlog: vec![story], - current: vec![], - qa: vec![], - merge: vec![], - done: vec![], - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "pipeline_state"); - assert_eq!(json["backlog"].as_array().unwrap().len(), 1); - assert_eq!(json["backlog"][0]["story_id"], "10_story_test"); - assert!(json["current"].as_array().unwrap().is_empty()); - assert!(json["done"].as_array().unwrap().is_empty()); - } - - #[test] - fn serialize_agent_config_changed_response() { - let resp = WsResponse::AgentConfigChanged; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "agent_config_changed"); - } - - #[test] - fn serialize_pong_response() { - let resp = WsResponse::Pong; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "pong"); - } - - #[test] - fn serialize_thinking_token_response() { - let resp = WsResponse::ThinkingToken { - content: "I need to think about this...".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "thinking_token"); - assert_eq!(json["content"], "I need to think about this..."); - } - - #[test] - fn serialize_onboarding_status_true() { - let resp = WsResponse::OnboardingStatus { - needs_onboarding: true, - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "onboarding_status"); - assert_eq!(json["needs_onboarding"], true); - } - - #[test] - fn serialize_onboarding_status_false() { - let resp = WsResponse::OnboardingStatus { - needs_onboarding: false, - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "onboarding_status"); - assert_eq!(json["needs_onboarding"], false); - } - - #[test] - fn serialize_permission_request_response() { - let resp = WsResponse::PermissionRequest { - request_id: "perm-1".to_string(), - tool_name: "Bash".to_string(), - tool_input: serde_json::json!({"command": "ls"}), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "permission_request"); - assert_eq!(json["request_id"], "perm-1"); - assert_eq!(json["tool_name"], "Bash"); - assert_eq!(json["tool_input"]["command"], "ls"); - } - - #[test] - fn serialize_tool_activity_response() { - let resp = WsResponse::ToolActivity { - tool_name: "Read".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "tool_activity"); - assert_eq!(json["tool_name"], "Read"); - } - - #[test] - fn serialize_reconciliation_progress_response() { - let resp = WsResponse::ReconciliationProgress { - story_id: "50_story_x".to_string(), - status: "gates_running".to_string(), - message: "Running clippy...".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "reconciliation_progress"); - assert_eq!(json["story_id"], "50_story_x"); - assert_eq!(json["status"], "gates_running"); - assert_eq!(json["message"], "Running clippy..."); - } - - // ── From for Option ─────────────────── - - #[test] - fn watcher_work_item_converts_to_ws_response() { - let evt = WatcherEvent::WorkItem { - stage: "2_current".to_string(), - item_id: "42_story_foo".to_string(), - action: "start".to_string(), - commit_msg: "huskies: start 42_story_foo".to_string(), - from_stage: None, - }; - let ws_msg: Option = evt.into(); - let ws_msg = ws_msg.expect("WorkItem should produce Some"); - let json = serde_json::to_value(&ws_msg).unwrap(); - assert_eq!(json["type"], "work_item_changed"); - assert_eq!(json["stage"], "2_current"); - assert_eq!(json["item_id"], "42_story_foo"); - assert_eq!(json["action"], "start"); - } - - #[test] - fn watcher_config_changed_converts_to_ws_response() { - let evt = WatcherEvent::ConfigChanged; - let ws_msg: Option = evt.into(); - let ws_msg = ws_msg.expect("ConfigChanged should produce Some"); - let json = serde_json::to_value(&ws_msg).unwrap(); - assert_eq!(json["type"], "agent_config_changed"); - } - - // ── From for WsResponse ────────────────────────── - - #[test] - fn pipeline_state_converts_to_ws_response() { - let state = PipelineState { - backlog: vec![UpcomingStory { - story_id: "1_story_a".to_string(), - name: Some("Story A".to_string()), - error: None, - merge_failure: None, - agent: None, - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }], - current: vec![UpcomingStory { - story_id: "2_story_b".to_string(), - name: Some("Story B".to_string()), - error: None, - merge_failure: None, - agent: None, - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }], - qa: vec![], - merge: vec![], - done: vec![UpcomingStory { - story_id: "50_story_done".to_string(), - name: Some("Done Story".to_string()), - error: None, - merge_failure: None, - agent: None, - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }], - }; - let resp: WsResponse = state.into(); - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "pipeline_state"); - assert_eq!(json["backlog"].as_array().unwrap().len(), 1); - assert_eq!(json["backlog"][0]["story_id"], "1_story_a"); - assert_eq!(json["current"].as_array().unwrap().len(), 1); - assert_eq!(json["current"][0]["story_id"], "2_story_b"); - assert!(json["qa"].as_array().unwrap().is_empty()); - assert!(json["merge"].as_array().unwrap().is_empty()); - assert_eq!(json["done"].as_array().unwrap().len(), 1); - assert_eq!(json["done"][0]["story_id"], "50_story_done"); - } - - #[test] - fn empty_pipeline_state_converts_to_ws_response() { - let state = PipelineState { - backlog: vec![], - current: vec![], - qa: vec![], - merge: vec![], - done: vec![], - }; - let resp: WsResponse = state.into(); - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "pipeline_state"); - assert!(json["backlog"].as_array().unwrap().is_empty()); - assert!(json["current"].as_array().unwrap().is_empty()); - assert!(json["qa"].as_array().unwrap().is_empty()); - assert!(json["merge"].as_array().unwrap().is_empty()); - assert!(json["done"].as_array().unwrap().is_empty()); - } - - // ── WsResponse JSON round-trip (string form) ──────────────────── - - #[test] - fn ws_response_serializes_to_parseable_json_string() { - let resp = WsResponse::Error { - message: "test error".to_string(), - }; - let text = serde_json::to_string(&resp).unwrap(); - let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); - assert_eq!(parsed["type"], "error"); - assert_eq!(parsed["message"], "test error"); - } - - #[test] - fn ws_response_update_with_empty_messages() { - let resp = WsResponse::Update { messages: vec![] }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "update"); - assert!(json["messages"].as_array().unwrap().is_empty()); - } - - #[test] - fn ws_response_token_with_empty_content() { - let resp = WsResponse::Token { - content: String::new(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "token"); - assert_eq!(json["content"], ""); - } - - #[test] - fn ws_response_error_with_special_characters() { - let resp = WsResponse::Error { - message: "error: \"quoted\" & ".to_string(), - }; - let text = serde_json::to_string(&resp).unwrap(); - let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); - assert_eq!(parsed["message"], "error: \"quoted\" & "); - } - - // ── WsRequest edge cases ──────────────────────────────────────── - - #[test] - fn deserialize_chat_with_multiple_messages() { - let json = r#"{ - "type": "chat", - "messages": [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi there!"}, - {"role": "user", "content": "How are you?"} - ], - "config": { - "provider": "ollama", - "model": "llama3" - } - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::Chat { messages, .. } => { - assert_eq!(messages.len(), 4); - assert_eq!(messages[0].role, crate::llm::types::Role::System); - assert_eq!(messages[3].role, crate::llm::types::Role::User); - } - _ => panic!("expected Chat variant"), - } - } - - #[test] - fn deserialize_chat_with_tool_call_message() { - let json = r#"{ - "type": "chat", - "messages": [ - { - "role": "assistant", - "content": "", - "tool_calls": [ - { - "id": "call_1", - "type": "function", - "function": { - "name": "read_file", - "arguments": "{\"path\": \"/tmp/test.rs\"}" - } - } - ] - } - ], - "config": { - "provider": "anthropic", - "model": "claude-3-5-sonnet" - } - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::Chat { messages, .. } => { - assert_eq!(messages.len(), 1); - let tc = messages[0].tool_calls.as_ref().unwrap(); - assert_eq!(tc.len(), 1); - assert_eq!(tc[0].function.name, "read_file"); - } - _ => panic!("expected Chat variant"), - } - } - - // ── Pipeline state with agent assignment ──────────────────────── - - #[test] - fn pipeline_state_with_agent_converts_correctly() { - let state = PipelineState { - backlog: vec![], - current: vec![UpcomingStory { - story_id: "10_story_x".to_string(), - name: Some("Story X".to_string()), - error: None, - merge_failure: None, - agent: Some(crate::http::workflow::AgentAssignment { - agent_name: "coder-1".to_string(), - model: Some("claude-3-5-sonnet".to_string()), - status: "running".to_string(), - }), - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }], - qa: vec![], - merge: vec![], - done: vec![], - }; - let resp: WsResponse = state.into(); - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["current"][0]["agent"]["agent_name"], "coder-1"); - assert_eq!(json["current"][0]["agent"]["model"], "claude-3-5-sonnet"); - assert_eq!(json["current"][0]["agent"]["status"], "running"); - } - - // ── Reconciliation progress done event ────────────────────────── - - #[test] - fn reconciliation_done_event_has_empty_story_id() { - let resp = WsResponse::ReconciliationProgress { - story_id: String::new(), - status: "done".to_string(), - message: "Reconciliation complete".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["story_id"], ""); - assert_eq!(json["status"], "done"); - } - // ── ws_handler integration tests (real WebSocket connection) ───── use futures::stream::SplitSink; diff --git a/server/src/service/mod.rs b/server/src/service/mod.rs index b4ea26c6..c80411b6 100644 --- a/server/src/service/mod.rs +++ b/server/src/service/mod.rs @@ -8,3 +8,4 @@ pub mod agents; pub mod events; pub mod health; +pub mod ws; diff --git a/server/src/service/ws/dispatch.rs b/server/src/service/ws/dispatch.rs new file mode 100644 index 00000000..35fdd460 --- /dev/null +++ b/server/src/service/ws/dispatch.rs @@ -0,0 +1,342 @@ +//! Pure request dispatch logic — no side effects. +//! +//! Contains the branching logic for resolving permission responses and +//! classifying incoming requests. All functions are pure data transformations; +//! I/O (socket reads, spawning tasks) lives in `io.rs`. + +use crate::http::context::PermissionDecision; +use std::collections::HashMap; +use tokio::sync::oneshot; + +use super::message::{WsRequest, WsResponse}; + +/// The result of dispatching a single `WsRequest` outside an active chat session. +pub enum DispatchResult { + /// Start a chat session with the given messages and config. + StartChat { + messages: Vec, + config: crate::llm::chat::ProviderConfig, + }, + /// Cancel the current chat session. + CancelChat, + /// Respond with a pong. + Pong, + /// Permission response outside an active chat — silently ignored. + IgnoredPermission, + /// Start a side question. + StartSideQuestion { + question: String, + context_messages: Vec, + config: crate::llm::chat::ProviderConfig, + }, + /// The request could not be parsed. + ParseError(String), +} + +/// Parse a raw JSON text into a [`DispatchResult`]. +/// +/// This is the outer-loop dispatch: determines what action to take for a +/// message received when no chat session is active. +pub fn dispatch_outer(text: &str) -> DispatchResult { + match serde_json::from_str::(text) { + Ok(WsRequest::Chat { messages, config }) => DispatchResult::StartChat { messages, config }, + Ok(WsRequest::Cancel) => DispatchResult::CancelChat, + Ok(WsRequest::Ping) => DispatchResult::Pong, + Ok(WsRequest::PermissionResponse { .. }) => DispatchResult::IgnoredPermission, + Ok(WsRequest::SideQuestion { + question, + context_messages, + config, + }) => DispatchResult::StartSideQuestion { + question, + context_messages, + config, + }, + Err(err) => DispatchResult::ParseError(format!("Invalid request: {err}")), + } +} + +/// The result of dispatching a message during an active chat session. +pub enum InnerDispatchResult { + /// A permission response was successfully resolved. + PermissionResolved, + /// Cancel the current chat session. + CancelChat, + /// Respond with a pong. + Pong, + /// Start a side question (can run concurrently with the chat). + StartSideQuestion { + question: String, + context_messages: Vec, + config: crate::llm::chat::ProviderConfig, + }, + /// The message was not actionable during a chat (unknown type, etc.). + Ignored, +} + +/// Parse a raw JSON text and dispatch it within an active chat session. +/// +/// Permission responses are resolved against the `pending_perms` map. +/// Returns what action the caller should take. +pub fn dispatch_inner( + text: &str, + pending_perms: &mut HashMap>, +) -> InnerDispatchResult { + match serde_json::from_str::(text) { + Ok(WsRequest::PermissionResponse { + request_id, + approved, + always_allow, + }) => { + if let Some(resp_tx) = pending_perms.remove(&request_id) { + let decision = resolve_permission(approved, always_allow); + let _ = resp_tx.send(decision); + } + InnerDispatchResult::PermissionResolved + } + Ok(WsRequest::Cancel) => InnerDispatchResult::CancelChat, + Ok(WsRequest::Ping) => InnerDispatchResult::Pong, + Ok(WsRequest::SideQuestion { + question, + context_messages, + config, + }) => InnerDispatchResult::StartSideQuestion { + question, + context_messages, + config, + }, + _ => InnerDispatchResult::Ignored, + } +} + +/// Map the `approved` and `always_allow` flags to a [`PermissionDecision`]. +pub fn resolve_permission(approved: bool, always_allow: bool) -> PermissionDecision { + if always_allow { + PermissionDecision::AlwaysAllow + } else if approved { + PermissionDecision::Approve + } else { + PermissionDecision::Deny + } +} + +/// Build a [`WsResponse::Error`] from an error message. +pub fn error_response(message: String) -> WsResponse { + WsResponse::Error { message } +} + +/// Build the permission request forward message for the client. +pub fn permission_request_response( + request_id: &str, + tool_name: &str, + tool_input: &serde_json::Value, +) -> WsResponse { + WsResponse::PermissionRequest { + request_id: request_id.to_string(), + tool_name: tool_name.to_string(), + tool_input: tool_input.clone(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // ── dispatch_outer ────────────────────────────────────────────── + + #[test] + fn dispatch_outer_chat() { + let json = r#"{"type":"chat","messages":[{"role":"user","content":"hi"}],"config":{"provider":"ollama","model":"m"}}"#; + let result = dispatch_outer(json); + assert!(matches!(result, DispatchResult::StartChat { .. })); + } + + #[test] + fn dispatch_outer_cancel() { + let result = dispatch_outer(r#"{"type":"cancel"}"#); + assert!(matches!(result, DispatchResult::CancelChat)); + } + + #[test] + fn dispatch_outer_ping() { + let result = dispatch_outer(r#"{"type":"ping"}"#); + assert!(matches!(result, DispatchResult::Pong)); + } + + #[test] + fn dispatch_outer_permission_response_ignored() { + let json = r#"{"type":"permission_response","request_id":"x","approved":true}"#; + let result = dispatch_outer(json); + assert!(matches!(result, DispatchResult::IgnoredPermission)); + } + + #[test] + fn dispatch_outer_side_question() { + let json = r#"{"type":"side_question","question":"what?","context_messages":[],"config":{"provider":"ollama","model":"m"}}"#; + let result = dispatch_outer(json); + assert!(matches!(result, DispatchResult::StartSideQuestion { .. })); + } + + #[test] + fn dispatch_outer_invalid_json() { + let result = dispatch_outer("not json"); + match result { + DispatchResult::ParseError(msg) => { + assert!(msg.contains("Invalid request")); + } + _ => panic!("expected ParseError"), + } + } + + #[test] + fn dispatch_outer_unknown_type() { + let result = dispatch_outer(r#"{"type":"bogus"}"#); + assert!(matches!(result, DispatchResult::ParseError(_))); + } + + #[test] + fn dispatch_outer_missing_type() { + let result = dispatch_outer(r#"{"messages":[]}"#); + assert!(matches!(result, DispatchResult::ParseError(_))); + } + + // ── dispatch_inner ────────────────────────────────────────────── + + #[test] + fn dispatch_inner_permission_response_resolves() { + let (tx, rx) = oneshot::channel(); + let mut perms = HashMap::new(); + perms.insert("req-1".to_string(), tx); + + let json = r#"{"type":"permission_response","request_id":"req-1","approved":true,"always_allow":false}"#; + let result = dispatch_inner(json, &mut perms); + assert!(matches!(result, InnerDispatchResult::PermissionResolved)); + assert!(perms.is_empty()); + assert_eq!(rx.blocking_recv().unwrap(), PermissionDecision::Approve); + } + + #[test] + fn dispatch_inner_permission_response_always_allow() { + let (tx, rx) = oneshot::channel(); + let mut perms = HashMap::new(); + perms.insert("req-2".to_string(), tx); + + let json = r#"{"type":"permission_response","request_id":"req-2","approved":true,"always_allow":true}"#; + let result = dispatch_inner(json, &mut perms); + assert!(matches!(result, InnerDispatchResult::PermissionResolved)); + assert_eq!(rx.blocking_recv().unwrap(), PermissionDecision::AlwaysAllow); + } + + #[test] + fn dispatch_inner_permission_response_deny() { + let (tx, rx) = oneshot::channel(); + let mut perms = HashMap::new(); + perms.insert("req-3".to_string(), tx); + + let json = r#"{"type":"permission_response","request_id":"req-3","approved":false}"#; + let result = dispatch_inner(json, &mut perms); + assert!(matches!(result, InnerDispatchResult::PermissionResolved)); + assert_eq!(rx.blocking_recv().unwrap(), PermissionDecision::Deny); + } + + #[test] + fn dispatch_inner_permission_unknown_request_id() { + let mut perms: HashMap> = HashMap::new(); + let json = r#"{"type":"permission_response","request_id":"unknown","approved":true}"#; + let result = dispatch_inner(json, &mut perms); + // Still returns PermissionResolved — the unknown ID is silently ignored. + assert!(matches!(result, InnerDispatchResult::PermissionResolved)); + } + + #[test] + fn dispatch_inner_cancel() { + let mut perms = HashMap::new(); + let result = dispatch_inner(r#"{"type":"cancel"}"#, &mut perms); + assert!(matches!(result, InnerDispatchResult::CancelChat)); + } + + #[test] + fn dispatch_inner_ping() { + let mut perms = HashMap::new(); + let result = dispatch_inner(r#"{"type":"ping"}"#, &mut perms); + assert!(matches!(result, InnerDispatchResult::Pong)); + } + + #[test] + fn dispatch_inner_side_question() { + let mut perms = HashMap::new(); + let json = r#"{"type":"side_question","question":"what?","context_messages":[],"config":{"provider":"ollama","model":"m"}}"#; + let result = dispatch_inner(json, &mut perms); + assert!(matches!( + result, + InnerDispatchResult::StartSideQuestion { .. } + )); + } + + #[test] + fn dispatch_inner_chat_during_chat_ignored() { + let mut perms = HashMap::new(); + let json = r#"{"type":"chat","messages":[],"config":{"provider":"ollama","model":"m"}}"#; + let result = dispatch_inner(json, &mut perms); + assert!(matches!(result, InnerDispatchResult::Ignored)); + } + + #[test] + fn dispatch_inner_invalid_json_ignored() { + let mut perms = HashMap::new(); + let result = dispatch_inner("not json", &mut perms); + assert!(matches!(result, InnerDispatchResult::Ignored)); + } + + // ── resolve_permission ────────────────────────────────────────── + + #[test] + fn resolve_approve() { + assert_eq!(resolve_permission(true, false), PermissionDecision::Approve); + } + + #[test] + fn resolve_deny() { + assert_eq!(resolve_permission(false, false), PermissionDecision::Deny); + } + + #[test] + fn resolve_always_allow() { + assert_eq!( + resolve_permission(true, true), + PermissionDecision::AlwaysAllow + ); + } + + #[test] + fn resolve_always_allow_overrides_denied() { + // always_allow=true should win even if approved=false + assert_eq!( + resolve_permission(false, true), + PermissionDecision::AlwaysAllow + ); + } + + // ── error_response ────────────────────────────────────────────── + + #[test] + fn error_response_creates_error_variant() { + let resp = error_response("oops".to_string()); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "error"); + assert_eq!(json["message"], "oops"); + } + + // ── permission_request_response ───────────────────────────────── + + #[test] + fn permission_request_response_creates_correct_variant() { + let input = serde_json::json!({"command": "rm -rf /"}); + let resp = permission_request_response("req-1", "Bash", &input); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "permission_request"); + assert_eq!(json["request_id"], "req-1"); + assert_eq!(json["tool_name"], "Bash"); + assert_eq!(json["tool_input"]["command"], "rm -rf /"); + } +} diff --git a/server/src/service/ws/error.rs b/server/src/service/ws/error.rs new file mode 100644 index 00000000..c380890c --- /dev/null +++ b/server/src/service/ws/error.rs @@ -0,0 +1,98 @@ +//! Typed error enum for the WebSocket service layer. +//! +//! Every distinct failure mode the WS layer can produce is represented here. +//! The HTTP/WS adapter maps these to close codes or error frames. + +/// Errors produced by the WebSocket service layer. +/// +/// Each variant maps to a distinct failure mode; the transport adapter +/// translates these into WS close codes or error-frame messages. +#[derive(Debug)] +#[allow(dead_code)] +pub enum Error { + /// Client sent a message that could not be parsed as a valid `WsRequest`. + /// Maps to an `error` frame with the parse failure detail. + InvalidMessage(String), + /// The chat subsystem returned an error (e.g. LLM provider failure). + /// Maps to an `error` frame with the provider message. + Chat(String), + /// A permission response referenced an unknown `request_id`. + /// Silently ignored in practice (no frame sent), but tracked for observability. + UnknownPermissionRequest(String), + /// Failed to load initial state (pipeline, onboarding, wizard). + /// Maps to an `error` frame. + Init(String), + /// The send channel to the client is closed (client disconnected). + /// Triggers connection teardown — no frame is sent. + ClientGone, +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::InvalidMessage(msg) => write!(f, "Invalid request: {msg}"), + Self::Chat(msg) => write!(f, "Chat error: {msg}"), + Self::UnknownPermissionRequest(id) => { + write!(f, "Unknown permission request: {id}") + } + Self::Init(msg) => write!(f, "Initialisation error: {msg}"), + Self::ClientGone => write!(f, "Client disconnected"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn display_invalid_message() { + let err = Error::InvalidMessage("bad json".to_string()); + assert_eq!(err.to_string(), "Invalid request: bad json"); + } + + #[test] + fn display_chat_error() { + let err = Error::Chat("timeout".to_string()); + assert_eq!(err.to_string(), "Chat error: timeout"); + } + + #[test] + fn display_unknown_permission_request() { + let err = Error::UnknownPermissionRequest("req-99".to_string()); + assert_eq!(err.to_string(), "Unknown permission request: req-99"); + } + + #[test] + fn display_init_error() { + let err = Error::Init("CRDT not ready".to_string()); + assert_eq!(err.to_string(), "Initialisation error: CRDT not ready"); + } + + #[test] + fn display_client_gone() { + let err = Error::ClientGone; + assert_eq!(err.to_string(), "Client disconnected"); + } + + #[test] + fn error_is_debug() { + let err = Error::InvalidMessage("test".to_string()); + let debug = format!("{err:?}"); + assert!(debug.contains("InvalidMessage")); + } + + #[test] + fn all_variants_display_without_panic() { + let variants: Vec = vec![ + Error::InvalidMessage("a".to_string()), + Error::Chat("b".to_string()), + Error::UnknownPermissionRequest("c".to_string()), + Error::Init("d".to_string()), + Error::ClientGone, + ]; + for v in &variants { + let _ = v.to_string(); + } + } +} diff --git a/server/src/service/ws/io.rs b/server/src/service/ws/io.rs new file mode 100644 index 00000000..938e48ad --- /dev/null +++ b/server/src/service/ws/io.rs @@ -0,0 +1,144 @@ +//! WebSocket I/O wrappers — the ONLY place in `service/ws/` that may perform +//! side effects such as spawning async tasks, subscribing to broadcast channels, +//! reading state from disk, or interacting with the system clock. + +use crate::agents::ReconciliationEvent; +use crate::http::context::AppContext; +use crate::http::workflow::load_pipeline_state; +use crate::io::onboarding; +use crate::io::watcher::WatcherEvent; +use crate::io::wizard; +use crate::log_buffer; +use std::sync::Arc; +use tokio::sync::{broadcast, mpsc}; + +use super::message::{self, WizardStepInfo, WsResponse}; + +/// Load the initial pipeline state from the CRDT and convert to a WsResponse. +pub fn load_initial_pipeline_state(ctx: &AppContext) -> Option { + load_pipeline_state(ctx).ok().map(|s| s.into()) +} + +/// Check whether the project needs onboarding and return the response. +pub fn check_onboarding(ctx: &AppContext) -> WsResponse { + let needs = ctx + .state + .get_project_root() + .map(|root| onboarding::check_onboarding_status(&root).needs_onboarding()) + .unwrap_or(false); + WsResponse::OnboardingStatus { + needs_onboarding: needs, + } +} + +/// Load the active wizard state (if any) and convert to a WsResponse. +pub fn load_wizard_state(ctx: &AppContext) -> Option { + let root = ctx.state.get_project_root().ok()?; + let ws = wizard::WizardState::load(&root)?; + let steps: Vec = message::wizard_steps_to_info(&ws.steps); + Some(WsResponse::WizardState { + steps, + current_step_index: ws.current_step_index(), + completed: ws.completed, + }) +} + +/// Load recent log entries and convert them to WsResponse messages. +pub fn load_recent_logs(count: usize) -> Vec { + log_buffer::global() + .get_recent_entries(count, None, None) + .into_iter() + .map(|entry| WsResponse::LogEntry { + timestamp: entry.timestamp, + level: entry.level.as_str().to_string(), + message: entry.message, + }) + .collect() +} + +/// Spawn a background task that forwards live log entries to the client. +/// +/// Returns when the client's send channel closes or the log broadcast ends. +pub fn subscribe_logs(tx: mpsc::UnboundedSender) { + let mut log_rx = log_buffer::global().subscribe(); + tokio::spawn(async move { + loop { + match log_rx.recv().await { + Ok(entry) => { + if tx + .send(WsResponse::LogEntry { + timestamp: entry.timestamp, + level: entry.level.as_str().to_string(), + message: entry.message, + }) + .is_err() + { + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); +} + +/// Spawn a background task that forwards filesystem watcher events to the client. +/// +/// After work-item and agent-state events, also pushes a refreshed pipeline state. +pub fn subscribe_watcher( + tx: mpsc::UnboundedSender, + ctx: Arc, + mut watcher_rx: broadcast::Receiver, +) { + tokio::spawn(async move { + loop { + match watcher_rx.recv().await { + Ok(evt) => { + let refresh = message::needs_pipeline_refresh(&evt); + let ws_msg: Option = evt.into(); + if let Some(msg) = ws_msg + && tx.send(msg).is_err() + { + break; + } + if refresh + && let Ok(state) = load_pipeline_state(ctx.as_ref()) + && tx.send(state.into()).is_err() + { + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); +} + +/// Spawn a background task that forwards reconciliation events to the client. +pub fn subscribe_reconciliation( + tx: mpsc::UnboundedSender, + mut reconcile_rx: broadcast::Receiver, +) { + tokio::spawn(async move { + loop { + match reconcile_rx.recv().await { + Ok(evt) => { + if tx + .send(WsResponse::ReconciliationProgress { + story_id: evt.story_id, + status: evt.status, + message: evt.message, + }) + .is_err() + { + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); +} diff --git a/server/src/service/ws/message.rs b/server/src/service/ws/message.rs new file mode 100644 index 00000000..1811abc4 --- /dev/null +++ b/server/src/service/ws/message.rs @@ -0,0 +1,993 @@ +//! Pure WebSocket message types — no side effects. +//! +//! `WsRequest` and `WsResponse` define the client/server protocol. +//! Conversions from domain events to WsResponse live here too. +//! All logic is pure data transformation; I/O lives in `io.rs`. + +use crate::http::workflow::{PipelineState, UpcomingStory}; +use crate::io::watcher::WatcherEvent; +use crate::llm::chat; +use crate::llm::types::Message; +use serde::{Deserialize, Serialize}; + +/// WebSocket request messages sent by the client. +/// +/// - `chat` starts a streaming chat session. +/// - `cancel` stops the active session. +/// - `permission_response` approves or denies a pending permission request. +#[derive(Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WsRequest { + Chat { + messages: Vec, + config: chat::ProviderConfig, + }, + Cancel, + PermissionResponse { + request_id: String, + approved: bool, + #[serde(default)] + always_allow: bool, + }, + /// Heartbeat ping from the client. The server responds with `Pong` so the + /// client can detect stale (half-closed) connections. + Ping, + /// A quick side question answered from current conversation context. + /// The question and response are NOT added to the conversation history + /// and no tool calls are made. + SideQuestion { + question: String, + context_messages: Vec, + config: chat::ProviderConfig, + }, +} + +/// Serialisable summary of a single wizard step for WebSocket broadcast. +#[derive(Serialize, Clone, Debug, PartialEq)] +pub struct WizardStepInfo { + pub step: String, + pub label: String, + pub status: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub content: Option, +} + +/// WebSocket response messages sent by the server. +/// +/// - `token` streams partial model output. +/// - `update` pushes the updated message history. +/// - `error` reports a request or processing failure. +/// - `work_item_changed` notifies that a `.huskies/work/` file changed. +/// - `agent_config_changed` notifies that `.huskies/project.toml` was modified. +#[derive(Serialize, Debug)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WsResponse { + Token { + content: String, + }, + Update { + messages: Vec, + }, + /// Session ID for Claude Code conversation resumption. + SessionId { + session_id: String, + }, + Error { + message: String, + }, + /// Filesystem watcher notification: a work-pipeline file was created or + /// modified and auto-committed. The frontend can use this to refresh its + /// story/bug list without polling. + WorkItemChanged { + stage: String, + item_id: String, + action: String, + commit_msg: String, + }, + /// Full pipeline state pushed on connect and after every work-item watcher event. + PipelineState { + backlog: Vec, + current: Vec, + qa: Vec, + merge: Vec, + done: Vec, + }, + /// `.huskies/project.toml` was modified; the frontend should re-fetch the + /// agent roster. Does NOT trigger a pipeline state refresh. + AgentConfigChanged, + /// An agent's state changed (started, stopped, completed, etc.). + /// Triggers a pipeline state refresh and tells the frontend to re-fetch + /// the agent list. + AgentStateChanged, + /// Claude Code is requesting user approval before executing a tool. + PermissionRequest { + request_id: String, + tool_name: String, + tool_input: serde_json::Value, + }, + /// The agent started assembling a tool call; shows live status in the UI. + ToolActivity { + tool_name: String, + }, + /// Real-time progress from the server startup reconciliation pass. + /// `status` is one of: "checking", "gates_running", "advanced", "skipped", + /// "failed", "done". `story_id` is empty for the overall "done" event. + ReconciliationProgress { + story_id: String, + status: String, + message: String, + }, + /// Heartbeat response to a client `Ping`. Lets the client confirm the + /// connection is alive and cancel any stale-connection timeout. + Pong, + /// Streaming thinking token from an extended-thinking block. + /// Sent separately from `Token` so the frontend can render them in + /// a constrained, scrollable ThinkingBlock rather than inline. + ThinkingToken { + content: String, + }, + /// Sent on connect when the project's spec files still contain scaffold + /// placeholder content and the user needs to go through onboarding. + OnboardingStatus { + needs_onboarding: bool, + }, + /// Sent on connect when a setup wizard is active. Contains the full + /// wizard state so the frontend can render the step-by-step UI. + WizardState { + steps: Vec, + current_step_index: usize, + completed: bool, + }, + /// Streaming token from a `/btw` side question response. + SideQuestionToken { + content: String, + }, + /// Final signal that the `/btw` side question has been fully answered. + SideQuestionDone { + response: String, + }, + /// A single server log entry. Sent in bulk on connect (recent history), + /// then streamed live as new entries arrive. + LogEntry { + timestamp: String, + level: String, + message: String, + }, +} + +// ── Domain event conversions ──────────────────────────────────────────────── + +/// Convert a [`WatcherEvent`] to an optional [`WsResponse`]. +/// +/// Returns `None` for events that have no WebSocket representation +/// (e.g. `MergeFailure`, `StoryBlocked` — handled elsewhere). +pub fn watcher_event_to_response(e: WatcherEvent) -> Option { + match e { + WatcherEvent::WorkItem { + stage, + item_id, + action, + commit_msg, + .. + } => Some(WsResponse::WorkItemChanged { + stage, + item_id, + action, + commit_msg, + }), + WatcherEvent::ConfigChanged => Some(WsResponse::AgentConfigChanged), + WatcherEvent::AgentStateChanged => Some(WsResponse::AgentStateChanged), + // MergeFailure, RateLimitWarning, StoryBlocked, and RateLimitHardBlock are handled + // by the chat notification listener only; no WebSocket message is needed for the frontend. + WatcherEvent::MergeFailure { .. } => None, + WatcherEvent::RateLimitWarning { .. } => None, + WatcherEvent::StoryBlocked { .. } => None, + WatcherEvent::RateLimitHardBlock { .. } => None, + } +} + +/// Returns `true` if this watcher event should trigger a pipeline state refresh. +pub fn needs_pipeline_refresh(evt: &WatcherEvent) -> bool { + matches!( + evt, + WatcherEvent::WorkItem { .. } | WatcherEvent::AgentStateChanged + ) +} + +/// Convert a [`PipelineState`] to a [`WsResponse::PipelineState`]. +pub fn pipeline_state_to_response(s: PipelineState) -> WsResponse { + WsResponse::PipelineState { + backlog: s.backlog, + current: s.current, + qa: s.qa, + merge: s.merge, + done: s.done, + } +} + +/// Build a [`WizardStepInfo`] list from wizard step states. +/// +/// Pure conversion — reads no filesystem, just transforms data. +pub fn wizard_steps_to_info(steps: &[crate::io::wizard::StepState]) -> Vec { + steps + .iter() + .map(|s| WizardStepInfo { + step: serde_json::to_value(s.step) + .ok() + .and_then(|v| v.as_str().map(String::from)) + .unwrap_or_default(), + label: s.step.label().to_string(), + status: serde_json::to_value(&s.status) + .ok() + .and_then(|v| v.as_str().map(String::from)) + .unwrap_or_default(), + content: s.content.clone(), + }) + .collect() +} + +// Keep backward-compatible From impls so existing code compiles during migration. +impl From for Option { + fn from(e: WatcherEvent) -> Self { + watcher_event_to_response(e) + } +} + +impl From for WsResponse { + fn from(s: PipelineState) -> Self { + pipeline_state_to_response(s) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::http::workflow::UpcomingStory; + use crate::io::watcher::WatcherEvent; + + // ── WsRequest deserialization ──────────────────────────────────── + + #[test] + fn deserialize_chat_request() { + let json = r#"{ + "type": "chat", + "messages": [ + {"role": "user", "content": "hello"} + ], + "config": { + "provider": "ollama", + "model": "llama3" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, config } => { + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].content, "hello"); + assert_eq!(config.provider, "ollama"); + assert_eq!(config.model, "llama3"); + } + _ => panic!("expected Chat variant"), + } + } + + #[test] + fn deserialize_chat_request_with_optional_fields() { + let json = r#"{ + "type": "chat", + "messages": [], + "config": { + "provider": "anthropic", + "model": "claude-3-5-sonnet", + "base_url": "https://api.anthropic.com", + "enable_tools": true, + "session_id": "sess-123" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, config } => { + assert!(messages.is_empty()); + assert_eq!( + config.base_url.as_deref(), + Some("https://api.anthropic.com") + ); + assert_eq!(config.enable_tools, Some(true)); + assert_eq!(config.session_id.as_deref(), Some("sess-123")); + } + _ => panic!("expected Chat variant"), + } + } + + #[test] + fn deserialize_cancel_request() { + let json = r#"{"type": "cancel"}"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + assert!(matches!(req, WsRequest::Cancel)); + } + + #[test] + fn deserialize_ping_request() { + let json = r#"{"type": "ping"}"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + assert!(matches!(req, WsRequest::Ping)); + } + + #[test] + fn deserialize_permission_response_approved() { + let json = r#"{ + "type": "permission_response", + "request_id": "req-42", + "approved": true + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::PermissionResponse { + request_id, + approved, + always_allow, + } => { + assert_eq!(request_id, "req-42"); + assert!(approved); + assert!(!always_allow); + } + _ => panic!("expected PermissionResponse variant"), + } + } + + #[test] + fn deserialize_permission_response_denied() { + let json = r#"{ + "type": "permission_response", + "request_id": "req-99", + "approved": false + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::PermissionResponse { + request_id, + approved, + always_allow, + } => { + assert_eq!(request_id, "req-99"); + assert!(!approved); + assert!(!always_allow); + } + _ => panic!("expected PermissionResponse variant"), + } + } + + #[test] + fn deserialize_permission_response_always_allow() { + let json = r#"{ + "type": "permission_response", + "request_id": "req-100", + "approved": true, + "always_allow": true + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::PermissionResponse { + request_id, + approved, + always_allow, + } => { + assert_eq!(request_id, "req-100"); + assert!(approved); + assert!(always_allow); + } + _ => panic!("expected PermissionResponse variant"), + } + } + + #[test] + fn deserialize_unknown_type_fails() { + let json = r#"{"type": "unknown_type"}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + #[test] + fn deserialize_invalid_json_fails() { + let result: Result = serde_json::from_str("not json"); + assert!(result.is_err()); + } + + #[test] + fn deserialize_missing_type_tag_fails() { + let json = r#"{"messages": [], "config": {"provider": "x", "model": "y"}}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + #[test] + fn deserialize_side_question() { + let json = r#"{ + "type": "side_question", + "question": "what is this?", + "context_messages": [{"role": "user", "content": "hi"}], + "config": {"provider": "ollama", "model": "llama3"} + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::SideQuestion { + question, + context_messages, + config, + } => { + assert_eq!(question, "what is this?"); + assert_eq!(context_messages.len(), 1); + assert_eq!(config.model, "llama3"); + } + _ => panic!("expected SideQuestion variant"), + } + } + + // ── WsResponse serialization ──────────────────────────────────── + + #[test] + fn serialize_token_response() { + let resp = WsResponse::Token { + content: "hello world".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "token"); + assert_eq!(json["content"], "hello world"); + } + + #[test] + fn serialize_update_response() { + let msg = Message { + role: crate::llm::types::Role::Assistant, + content: "response".to_string(), + tool_calls: None, + tool_call_id: None, + }; + let resp = WsResponse::Update { + messages: vec![msg], + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "update"); + assert_eq!(json["messages"].as_array().unwrap().len(), 1); + assert_eq!(json["messages"][0]["content"], "response"); + } + + #[test] + fn serialize_session_id_response() { + let resp = WsResponse::SessionId { + session_id: "sess-abc".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "session_id"); + assert_eq!(json["session_id"], "sess-abc"); + } + + #[test] + fn serialize_error_response() { + let resp = WsResponse::Error { + message: "something broke".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "error"); + assert_eq!(json["message"], "something broke"); + } + + #[test] + fn serialize_work_item_changed_response() { + let resp = WsResponse::WorkItemChanged { + stage: "2_current".to_string(), + item_id: "42_story_foo".to_string(), + action: "start".to_string(), + commit_msg: "huskies: start 42_story_foo".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "work_item_changed"); + assert_eq!(json["stage"], "2_current"); + assert_eq!(json["item_id"], "42_story_foo"); + assert_eq!(json["action"], "start"); + assert_eq!(json["commit_msg"], "huskies: start 42_story_foo"); + } + + #[test] + fn serialize_pipeline_state_response() { + let story = UpcomingStory { + story_id: "10_story_test".to_string(), + name: Some("Test".to_string()), + error: None, + merge_failure: None, + agent: None, + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }; + let resp = WsResponse::PipelineState { + backlog: vec![story], + current: vec![], + qa: vec![], + merge: vec![], + done: vec![], + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pipeline_state"); + assert_eq!(json["backlog"].as_array().unwrap().len(), 1); + assert_eq!(json["backlog"][0]["story_id"], "10_story_test"); + assert!(json["current"].as_array().unwrap().is_empty()); + assert!(json["done"].as_array().unwrap().is_empty()); + } + + #[test] + fn serialize_agent_config_changed_response() { + let resp = WsResponse::AgentConfigChanged; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "agent_config_changed"); + } + + #[test] + fn serialize_pong_response() { + let resp = WsResponse::Pong; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pong"); + } + + #[test] + fn serialize_thinking_token_response() { + let resp = WsResponse::ThinkingToken { + content: "I need to think about this...".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "thinking_token"); + assert_eq!(json["content"], "I need to think about this..."); + } + + #[test] + fn serialize_onboarding_status_true() { + let resp = WsResponse::OnboardingStatus { + needs_onboarding: true, + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "onboarding_status"); + assert_eq!(json["needs_onboarding"], true); + } + + #[test] + fn serialize_onboarding_status_false() { + let resp = WsResponse::OnboardingStatus { + needs_onboarding: false, + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "onboarding_status"); + assert_eq!(json["needs_onboarding"], false); + } + + #[test] + fn serialize_permission_request_response() { + let resp = WsResponse::PermissionRequest { + request_id: "perm-1".to_string(), + tool_name: "Bash".to_string(), + tool_input: serde_json::json!({"command": "ls"}), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "permission_request"); + assert_eq!(json["request_id"], "perm-1"); + assert_eq!(json["tool_name"], "Bash"); + assert_eq!(json["tool_input"]["command"], "ls"); + } + + #[test] + fn serialize_tool_activity_response() { + let resp = WsResponse::ToolActivity { + tool_name: "Read".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "tool_activity"); + assert_eq!(json["tool_name"], "Read"); + } + + #[test] + fn serialize_reconciliation_progress_response() { + let resp = WsResponse::ReconciliationProgress { + story_id: "50_story_x".to_string(), + status: "gates_running".to_string(), + message: "Running clippy...".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "reconciliation_progress"); + assert_eq!(json["story_id"], "50_story_x"); + assert_eq!(json["status"], "gates_running"); + assert_eq!(json["message"], "Running clippy..."); + } + + #[test] + fn serialize_wizard_state_response() { + let resp = WsResponse::WizardState { + steps: vec![WizardStepInfo { + step: "scaffold".to_string(), + label: "Scaffold directory structure".to_string(), + status: "pending".to_string(), + content: None, + }], + current_step_index: 0, + completed: false, + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "wizard_state"); + assert_eq!(json["steps"][0]["step"], "scaffold"); + assert_eq!(json["current_step_index"], 0); + assert_eq!(json["completed"], false); + } + + #[test] + fn serialize_side_question_token() { + let resp = WsResponse::SideQuestionToken { + content: "partial answer".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "side_question_token"); + assert_eq!(json["content"], "partial answer"); + } + + #[test] + fn serialize_side_question_done() { + let resp = WsResponse::SideQuestionDone { + response: "full answer".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "side_question_done"); + assert_eq!(json["response"], "full answer"); + } + + #[test] + fn serialize_log_entry() { + let resp = WsResponse::LogEntry { + timestamp: "2026-01-01T00:00:00Z".to_string(), + level: "INFO".to_string(), + message: "server started".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "log_entry"); + assert_eq!(json["level"], "INFO"); + assert_eq!(json["message"], "server started"); + } + + // ── watcher_event_to_response ─────────────────────────────────── + + #[test] + fn watcher_work_item_converts_to_ws_response() { + let evt = WatcherEvent::WorkItem { + stage: "2_current".to_string(), + item_id: "42_story_foo".to_string(), + action: "start".to_string(), + commit_msg: "huskies: start 42_story_foo".to_string(), + from_stage: None, + }; + let ws_msg = watcher_event_to_response(evt).expect("WorkItem should produce Some"); + let json = serde_json::to_value(&ws_msg).unwrap(); + assert_eq!(json["type"], "work_item_changed"); + assert_eq!(json["stage"], "2_current"); + assert_eq!(json["item_id"], "42_story_foo"); + assert_eq!(json["action"], "start"); + } + + #[test] + fn watcher_config_changed_converts_to_ws_response() { + let evt = WatcherEvent::ConfigChanged; + let ws_msg = watcher_event_to_response(evt).expect("ConfigChanged should produce Some"); + let json = serde_json::to_value(&ws_msg).unwrap(); + assert_eq!(json["type"], "agent_config_changed"); + } + + #[test] + fn watcher_agent_state_changed_converts_to_ws_response() { + let evt = WatcherEvent::AgentStateChanged; + let ws_msg = watcher_event_to_response(evt).expect("AgentStateChanged should produce Some"); + let json = serde_json::to_value(&ws_msg).unwrap(); + assert_eq!(json["type"], "agent_state_changed"); + } + + #[test] + fn watcher_merge_failure_produces_none() { + let evt = WatcherEvent::MergeFailure { + story_id: "x".to_string(), + reason: "conflict".to_string(), + }; + assert!(watcher_event_to_response(evt).is_none()); + } + + #[test] + fn watcher_rate_limit_warning_produces_none() { + let evt = WatcherEvent::RateLimitWarning { + story_id: "x".to_string(), + agent_name: "coder".to_string(), + }; + assert!(watcher_event_to_response(evt).is_none()); + } + + #[test] + fn watcher_story_blocked_produces_none() { + let evt = WatcherEvent::StoryBlocked { + story_id: "x".to_string(), + reason: "retries exhausted".to_string(), + }; + assert!(watcher_event_to_response(evt).is_none()); + } + + #[test] + fn watcher_rate_limit_hard_block_produces_none() { + let evt = WatcherEvent::RateLimitHardBlock { + story_id: "x".to_string(), + agent_name: "coder".to_string(), + reset_at: chrono::Utc::now(), + }; + assert!(watcher_event_to_response(evt).is_none()); + } + + // ── needs_pipeline_refresh ────────────────────────────────────── + + #[test] + fn work_item_needs_pipeline_refresh() { + let evt = WatcherEvent::WorkItem { + stage: "2_current".to_string(), + item_id: "x".to_string(), + action: "start".to_string(), + commit_msg: "msg".to_string(), + from_stage: None, + }; + assert!(needs_pipeline_refresh(&evt)); + } + + #[test] + fn agent_state_changed_needs_pipeline_refresh() { + assert!(needs_pipeline_refresh(&WatcherEvent::AgentStateChanged)); + } + + #[test] + fn config_changed_does_not_need_pipeline_refresh() { + assert!(!needs_pipeline_refresh(&WatcherEvent::ConfigChanged)); + } + + #[test] + fn merge_failure_does_not_need_pipeline_refresh() { + let evt = WatcherEvent::MergeFailure { + story_id: "x".to_string(), + reason: "y".to_string(), + }; + assert!(!needs_pipeline_refresh(&evt)); + } + + // ── pipeline_state_to_response ────────────────────────────────── + + #[test] + fn pipeline_state_converts_to_ws_response() { + let state = PipelineState { + backlog: vec![UpcomingStory { + story_id: "1_story_a".to_string(), + name: Some("Story A".to_string()), + error: None, + merge_failure: None, + agent: None, + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }], + current: vec![UpcomingStory { + story_id: "2_story_b".to_string(), + name: Some("Story B".to_string()), + error: None, + merge_failure: None, + agent: None, + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }], + qa: vec![], + merge: vec![], + done: vec![UpcomingStory { + story_id: "50_story_done".to_string(), + name: Some("Done Story".to_string()), + error: None, + merge_failure: None, + agent: None, + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }], + }; + let resp = pipeline_state_to_response(state); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pipeline_state"); + assert_eq!(json["backlog"].as_array().unwrap().len(), 1); + assert_eq!(json["backlog"][0]["story_id"], "1_story_a"); + assert_eq!(json["current"].as_array().unwrap().len(), 1); + assert_eq!(json["current"][0]["story_id"], "2_story_b"); + assert!(json["qa"].as_array().unwrap().is_empty()); + assert!(json["merge"].as_array().unwrap().is_empty()); + assert_eq!(json["done"].as_array().unwrap().len(), 1); + assert_eq!(json["done"][0]["story_id"], "50_story_done"); + } + + #[test] + fn empty_pipeline_state_converts_to_ws_response() { + let state = PipelineState { + backlog: vec![], + current: vec![], + qa: vec![], + merge: vec![], + done: vec![], + }; + let resp = pipeline_state_to_response(state); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pipeline_state"); + assert!(json["backlog"].as_array().unwrap().is_empty()); + assert!(json["current"].as_array().unwrap().is_empty()); + assert!(json["qa"].as_array().unwrap().is_empty()); + assert!(json["merge"].as_array().unwrap().is_empty()); + assert!(json["done"].as_array().unwrap().is_empty()); + } + + #[test] + fn pipeline_state_with_agent_converts_correctly() { + let state = PipelineState { + backlog: vec![], + current: vec![UpcomingStory { + story_id: "10_story_x".to_string(), + name: Some("Story X".to_string()), + error: None, + merge_failure: None, + agent: Some(crate::http::workflow::AgentAssignment { + agent_name: "coder-1".to_string(), + model: Some("claude-3-5-sonnet".to_string()), + status: "running".to_string(), + }), + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }], + qa: vec![], + merge: vec![], + done: vec![], + }; + let resp: WsResponse = state.into(); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["current"][0]["agent"]["agent_name"], "coder-1"); + assert_eq!(json["current"][0]["agent"]["model"], "claude-3-5-sonnet"); + assert_eq!(json["current"][0]["agent"]["status"], "running"); + } + + // ── WsResponse JSON round-trip (string form) ──────────────────── + + #[test] + fn ws_response_serializes_to_parseable_json_string() { + let resp = WsResponse::Error { + message: "test error".to_string(), + }; + let text = serde_json::to_string(&resp).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); + assert_eq!(parsed["type"], "error"); + assert_eq!(parsed["message"], "test error"); + } + + #[test] + fn ws_response_update_with_empty_messages() { + let resp = WsResponse::Update { messages: vec![] }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "update"); + assert!(json["messages"].as_array().unwrap().is_empty()); + } + + #[test] + fn ws_response_token_with_empty_content() { + let resp = WsResponse::Token { + content: String::new(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "token"); + assert_eq!(json["content"], ""); + } + + #[test] + fn ws_response_error_with_special_characters() { + let resp = WsResponse::Error { + message: "error: \"quoted\" & ".to_string(), + }; + let text = serde_json::to_string(&resp).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); + assert_eq!(parsed["message"], "error: \"quoted\" & "); + } + + // ── WsRequest edge cases ──────────────────────────────────────── + + #[test] + fn deserialize_chat_with_multiple_messages() { + let json = r#"{ + "type": "chat", + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + {"role": "user", "content": "How are you?"} + ], + "config": { + "provider": "ollama", + "model": "llama3" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, .. } => { + assert_eq!(messages.len(), 4); + assert_eq!(messages[0].role, crate::llm::types::Role::System); + assert_eq!(messages[3].role, crate::llm::types::Role::User); + } + _ => panic!("expected Chat variant"), + } + } + + #[test] + fn deserialize_chat_with_tool_call_message() { + let json = r#"{ + "type": "chat", + "messages": [ + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_1", + "type": "function", + "function": { + "name": "read_file", + "arguments": "{\"path\": \"/tmp/test.rs\"}" + } + } + ] + } + ], + "config": { + "provider": "anthropic", + "model": "claude-3-5-sonnet" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, .. } => { + assert_eq!(messages.len(), 1); + let tc = messages[0].tool_calls.as_ref().unwrap(); + assert_eq!(tc.len(), 1); + assert_eq!(tc[0].function.name, "read_file"); + } + _ => panic!("expected Chat variant"), + } + } + + // ── wizard_steps_to_info ──────────────────────────────────────── + + #[test] + fn wizard_steps_to_info_empty() { + let result = wizard_steps_to_info(&[]); + assert!(result.is_empty()); + } + + // ── Reconciliation progress done event ────────────────────────── + + #[test] + fn reconciliation_done_event_has_empty_story_id() { + let resp = WsResponse::ReconciliationProgress { + story_id: String::new(), + status: "done".to_string(), + message: "Reconciliation complete".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["story_id"], ""); + assert_eq!(json["status"], "done"); + } +} diff --git a/server/src/service/ws/mod.rs b/server/src/service/ws/mod.rs new file mode 100644 index 00000000..e3cbf0d3 --- /dev/null +++ b/server/src/service/ws/mod.rs @@ -0,0 +1,25 @@ +//! WebSocket service — domain logic for real-time pipeline updates, chat, and +//! permission prompts. +//! +//! This module extracts the business logic from `http/ws.rs` into the service +//! layer following the conventions in `docs/architecture/service-modules.md`: +//! - `mod.rs` — public API, typed `Error`, orchestration +//! - `io.rs` — the only file that may perform side effects +//! - `message.rs` — pure message types and conversions +//! - `dispatch.rs` — pure request routing and permission resolution +//! - `error.rs` — typed error enum + +pub mod dispatch; +pub mod error; +pub(super) mod io; +pub mod message; + +pub use dispatch::{ + DispatchResult, InnerDispatchResult, dispatch_inner, dispatch_outer, error_response, + permission_request_response, +}; +pub use io::{ + check_onboarding, load_initial_pipeline_state, load_recent_logs, load_wizard_state, + subscribe_logs, subscribe_reconciliation, subscribe_watcher, +}; +pub use message::{WizardStepInfo, WsResponse};