diff --git a/server/src/service/ws/message.rs b/server/src/service/ws/message.rs deleted file mode 100644 index 16f2db88..00000000 --- a/server/src/service/ws/message.rs +++ /dev/null @@ -1,1006 +0,0 @@ -//! Pure WebSocket message types — no side effects. -//! -//! `WsRequest` and `WsResponse` define the client/server protocol. -//! Conversions from domain events to WsResponse live here too. -//! All logic is pure data transformation; I/O lives in `io.rs`. - -use crate::http::workflow::{PipelineState, UpcomingStory}; -use crate::io::watcher::WatcherEvent; -use crate::llm::chat; -use crate::llm::types::Message; -use crate::service::status::StatusEvent; -use serde::{Deserialize, Serialize}; - -/// WebSocket request messages sent by the client. -/// -/// - `chat` starts a streaming chat session. -/// - `cancel` stops the active session. -/// - `permission_response` approves or denies a pending permission request. -#[derive(Deserialize)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum WsRequest { - Chat { - messages: Vec, - config: chat::ProviderConfig, - }, - Cancel, - PermissionResponse { - request_id: String, - approved: bool, - #[serde(default)] - always_allow: bool, - }, - /// Heartbeat ping from the client. The server responds with `Pong` so the - /// client can detect stale (half-closed) connections. - Ping, - /// A quick side question answered from current conversation context. - /// The question and response are NOT added to the conversation history - /// and no tool calls are made. - SideQuestion { - question: String, - context_messages: Vec, - config: chat::ProviderConfig, - }, -} - -/// Serialisable summary of a single wizard step for WebSocket broadcast. -#[derive(Serialize, Clone, Debug, PartialEq)] -pub struct WizardStepInfo { - pub step: String, - pub label: String, - pub status: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub content: Option, -} - -/// WebSocket response messages sent by the server. -/// -/// - `token` streams partial model output. -/// - `update` pushes the updated message history. -/// - `error` reports a request or processing failure. -/// - `work_item_changed` notifies that a `.huskies/work/` file changed. -/// - `agent_config_changed` notifies that `.huskies/project.toml` was modified. -#[derive(Serialize, Debug)] -#[serde(tag = "type", rename_all = "snake_case")] -pub enum WsResponse { - Token { - content: String, - }, - Update { - messages: Vec, - }, - /// Session ID for Claude Code conversation resumption. - SessionId { - session_id: String, - }, - Error { - message: String, - }, - /// Filesystem watcher notification: a work-pipeline file was created or - /// modified and auto-committed. The frontend can use this to refresh its - /// story/bug list without polling. - WorkItemChanged { - stage: String, - item_id: String, - action: String, - commit_msg: String, - }, - /// Full pipeline state pushed on connect and after every work-item watcher event. - PipelineState { - backlog: Vec, - current: Vec, - qa: Vec, - merge: Vec, - done: Vec, - }, - /// `.huskies/project.toml` was modified; the frontend should re-fetch the - /// agent roster. Does NOT trigger a pipeline state refresh. - AgentConfigChanged, - /// An agent's state changed (started, stopped, completed, etc.). - /// Triggers a pipeline state refresh and tells the frontend to re-fetch - /// the agent list. - AgentStateChanged, - /// Claude Code is requesting user approval before executing a tool. - PermissionRequest { - request_id: String, - tool_name: String, - tool_input: serde_json::Value, - }, - /// The agent started assembling a tool call; shows live status in the UI. - ToolActivity { - tool_name: String, - }, - /// Real-time progress from the server startup reconciliation pass. - /// `status` is one of: "checking", "gates_running", "advanced", "skipped", - /// "failed", "done". `story_id` is empty for the overall "done" event. - ReconciliationProgress { - story_id: String, - status: String, - message: String, - }, - /// Heartbeat response to a client `Ping`. Lets the client confirm the - /// connection is alive and cancel any stale-connection timeout. - Pong, - /// Streaming thinking token from an extended-thinking block. - /// Sent separately from `Token` so the frontend can render them in - /// a constrained, scrollable ThinkingBlock rather than inline. - ThinkingToken { - content: String, - }, - /// Sent on connect when the project's spec files still contain scaffold - /// placeholder content and the user needs to go through onboarding. - OnboardingStatus { - needs_onboarding: bool, - }, - /// Sent on connect when a setup wizard is active. Contains the full - /// wizard state so the frontend can render the step-by-step UI. - WizardState { - steps: Vec, - current_step_index: usize, - completed: bool, - }, - /// Streaming token from a `/btw` side question response. - SideQuestionToken { - content: String, - }, - /// Final signal that the `/btw` side question has been fully answered. - SideQuestionDone { - response: String, - }, - /// A single server log entry. Sent in bulk on connect (recent history), - /// then streamed live as new entries arrive. - LogEntry { - timestamp: String, - level: String, - message: String, - }, - /// A structured pipeline status event forwarded from the status broadcaster. - /// - /// The structured [`StatusEvent`] fields are preserved on the wire so - /// frontend consumers can do per-type presentation without parsing strings. - /// This frame intentionally does NOT call `format_status_event` — that - /// formatter is reserved for chat transports (story 644). - StatusUpdate { - event: StatusEvent, - }, -} - -// ── Domain event conversions ──────────────────────────────────────────────── - -/// Convert a [`WatcherEvent`] to an optional [`WsResponse`]. -/// -/// Returns `None` for events that have no WebSocket representation -/// (e.g. `MergeFailure`, `StoryBlocked` — handled elsewhere). -pub fn watcher_event_to_response(e: WatcherEvent) -> Option { - match e { - WatcherEvent::WorkItem { - stage, - item_id, - action, - commit_msg, - .. - } => Some(WsResponse::WorkItemChanged { - stage, - item_id, - action, - commit_msg, - }), - WatcherEvent::ConfigChanged => Some(WsResponse::AgentConfigChanged), - WatcherEvent::AgentStateChanged => Some(WsResponse::AgentStateChanged), - // MergeFailure, RateLimitWarning, StoryBlocked, and RateLimitHardBlock are handled - // by the chat notification listener only; no WebSocket message is needed for the frontend. - WatcherEvent::MergeFailure { .. } => None, - WatcherEvent::RateLimitWarning { .. } => None, - WatcherEvent::StoryBlocked { .. } => None, - WatcherEvent::RateLimitHardBlock { .. } => None, - // OAuth events are forwarded to chat transports only; no WebSocket message for the frontend. - WatcherEvent::OAuthAccountSwapped { .. } => None, - WatcherEvent::OAuthAccountsExhausted { .. } => None, - } -} - -/// Returns `true` if this watcher event should trigger a pipeline state refresh. -pub fn needs_pipeline_refresh(evt: &WatcherEvent) -> bool { - matches!( - evt, - WatcherEvent::WorkItem { .. } | WatcherEvent::AgentStateChanged - ) -} - -/// Convert a [`PipelineState`] to a [`WsResponse::PipelineState`]. -pub fn pipeline_state_to_response(s: PipelineState) -> WsResponse { - WsResponse::PipelineState { - backlog: s.backlog, - current: s.current, - qa: s.qa, - merge: s.merge, - done: s.done, - } -} - -/// Build a [`WizardStepInfo`] list from wizard step states. -/// -/// Pure conversion — reads no filesystem, just transforms data. -pub fn wizard_steps_to_info(steps: &[crate::io::wizard::StepState]) -> Vec { - steps - .iter() - .map(|s| WizardStepInfo { - step: serde_json::to_value(s.step) - .ok() - .and_then(|v| v.as_str().map(String::from)) - .unwrap_or_default(), - label: s.step.label().to_string(), - status: serde_json::to_value(&s.status) - .ok() - .and_then(|v| v.as_str().map(String::from)) - .unwrap_or_default(), - content: s.content.clone(), - }) - .collect() -} - -// Keep backward-compatible From impls so existing code compiles during migration. -impl From for Option { - fn from(e: WatcherEvent) -> Self { - watcher_event_to_response(e) - } -} - -impl From for WsResponse { - fn from(s: PipelineState) -> Self { - pipeline_state_to_response(s) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::http::workflow::UpcomingStory; - use crate::io::watcher::WatcherEvent; - - // ── WsRequest deserialization ──────────────────────────────────── - - #[test] - fn deserialize_chat_request() { - let json = r#"{ - "type": "chat", - "messages": [ - {"role": "user", "content": "hello"} - ], - "config": { - "provider": "ollama", - "model": "llama3" - } - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::Chat { messages, config } => { - assert_eq!(messages.len(), 1); - assert_eq!(messages[0].content, "hello"); - assert_eq!(config.provider, "ollama"); - assert_eq!(config.model, "llama3"); - } - _ => panic!("expected Chat variant"), - } - } - - #[test] - fn deserialize_chat_request_with_optional_fields() { - let json = r#"{ - "type": "chat", - "messages": [], - "config": { - "provider": "anthropic", - "model": "claude-3-5-sonnet", - "base_url": "https://api.anthropic.com", - "enable_tools": true, - "session_id": "sess-123" - } - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::Chat { messages, config } => { - assert!(messages.is_empty()); - assert_eq!( - config.base_url.as_deref(), - Some("https://api.anthropic.com") - ); - assert_eq!(config.enable_tools, Some(true)); - assert_eq!(config.session_id.as_deref(), Some("sess-123")); - } - _ => panic!("expected Chat variant"), - } - } - - #[test] - fn deserialize_cancel_request() { - let json = r#"{"type": "cancel"}"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - assert!(matches!(req, WsRequest::Cancel)); - } - - #[test] - fn deserialize_ping_request() { - let json = r#"{"type": "ping"}"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - assert!(matches!(req, WsRequest::Ping)); - } - - #[test] - fn deserialize_permission_response_approved() { - let json = r#"{ - "type": "permission_response", - "request_id": "req-42", - "approved": true - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::PermissionResponse { - request_id, - approved, - always_allow, - } => { - assert_eq!(request_id, "req-42"); - assert!(approved); - assert!(!always_allow); - } - _ => panic!("expected PermissionResponse variant"), - } - } - - #[test] - fn deserialize_permission_response_denied() { - let json = r#"{ - "type": "permission_response", - "request_id": "req-99", - "approved": false - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::PermissionResponse { - request_id, - approved, - always_allow, - } => { - assert_eq!(request_id, "req-99"); - assert!(!approved); - assert!(!always_allow); - } - _ => panic!("expected PermissionResponse variant"), - } - } - - #[test] - fn deserialize_permission_response_always_allow() { - let json = r#"{ - "type": "permission_response", - "request_id": "req-100", - "approved": true, - "always_allow": true - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::PermissionResponse { - request_id, - approved, - always_allow, - } => { - assert_eq!(request_id, "req-100"); - assert!(approved); - assert!(always_allow); - } - _ => panic!("expected PermissionResponse variant"), - } - } - - #[test] - fn deserialize_unknown_type_fails() { - let json = r#"{"type": "unknown_type"}"#; - let result: Result = serde_json::from_str(json); - assert!(result.is_err()); - } - - #[test] - fn deserialize_invalid_json_fails() { - let result: Result = serde_json::from_str("not json"); - assert!(result.is_err()); - } - - #[test] - fn deserialize_missing_type_tag_fails() { - let json = r#"{"messages": [], "config": {"provider": "x", "model": "y"}}"#; - let result: Result = serde_json::from_str(json); - assert!(result.is_err()); - } - - #[test] - fn deserialize_side_question() { - let json = r#"{ - "type": "side_question", - "question": "what is this?", - "context_messages": [{"role": "user", "content": "hi"}], - "config": {"provider": "ollama", "model": "llama3"} - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::SideQuestion { - question, - context_messages, - config, - } => { - assert_eq!(question, "what is this?"); - assert_eq!(context_messages.len(), 1); - assert_eq!(config.model, "llama3"); - } - _ => panic!("expected SideQuestion variant"), - } - } - - // ── WsResponse serialization ──────────────────────────────────── - - #[test] - fn serialize_token_response() { - let resp = WsResponse::Token { - content: "hello world".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "token"); - assert_eq!(json["content"], "hello world"); - } - - #[test] - fn serialize_update_response() { - let msg = Message { - role: crate::llm::types::Role::Assistant, - content: "response".to_string(), - tool_calls: None, - tool_call_id: None, - }; - let resp = WsResponse::Update { - messages: vec![msg], - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "update"); - assert_eq!(json["messages"].as_array().unwrap().len(), 1); - assert_eq!(json["messages"][0]["content"], "response"); - } - - #[test] - fn serialize_session_id_response() { - let resp = WsResponse::SessionId { - session_id: "sess-abc".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "session_id"); - assert_eq!(json["session_id"], "sess-abc"); - } - - #[test] - fn serialize_error_response() { - let resp = WsResponse::Error { - message: "something broke".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "error"); - assert_eq!(json["message"], "something broke"); - } - - #[test] - fn serialize_work_item_changed_response() { - let resp = WsResponse::WorkItemChanged { - stage: "2_current".to_string(), - item_id: "42_story_foo".to_string(), - action: "start".to_string(), - commit_msg: "huskies: start 42_story_foo".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "work_item_changed"); - assert_eq!(json["stage"], "2_current"); - assert_eq!(json["item_id"], "42_story_foo"); - assert_eq!(json["action"], "start"); - assert_eq!(json["commit_msg"], "huskies: start 42_story_foo"); - } - - #[test] - fn serialize_pipeline_state_response() { - let story = UpcomingStory { - story_id: "10_story_test".to_string(), - name: Some("Test".to_string()), - error: None, - merge_failure: None, - agent: None, - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }; - let resp = WsResponse::PipelineState { - backlog: vec![story], - current: vec![], - qa: vec![], - merge: vec![], - done: vec![], - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "pipeline_state"); - assert_eq!(json["backlog"].as_array().unwrap().len(), 1); - assert_eq!(json["backlog"][0]["story_id"], "10_story_test"); - assert!(json["current"].as_array().unwrap().is_empty()); - assert!(json["done"].as_array().unwrap().is_empty()); - } - - #[test] - fn serialize_agent_config_changed_response() { - let resp = WsResponse::AgentConfigChanged; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "agent_config_changed"); - } - - #[test] - fn serialize_pong_response() { - let resp = WsResponse::Pong; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "pong"); - } - - #[test] - fn serialize_thinking_token_response() { - let resp = WsResponse::ThinkingToken { - content: "I need to think about this...".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "thinking_token"); - assert_eq!(json["content"], "I need to think about this..."); - } - - #[test] - fn serialize_onboarding_status_true() { - let resp = WsResponse::OnboardingStatus { - needs_onboarding: true, - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "onboarding_status"); - assert_eq!(json["needs_onboarding"], true); - } - - #[test] - fn serialize_onboarding_status_false() { - let resp = WsResponse::OnboardingStatus { - needs_onboarding: false, - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "onboarding_status"); - assert_eq!(json["needs_onboarding"], false); - } - - #[test] - fn serialize_permission_request_response() { - let resp = WsResponse::PermissionRequest { - request_id: "perm-1".to_string(), - tool_name: "Bash".to_string(), - tool_input: serde_json::json!({"command": "ls"}), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "permission_request"); - assert_eq!(json["request_id"], "perm-1"); - assert_eq!(json["tool_name"], "Bash"); - assert_eq!(json["tool_input"]["command"], "ls"); - } - - #[test] - fn serialize_tool_activity_response() { - let resp = WsResponse::ToolActivity { - tool_name: "Read".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "tool_activity"); - assert_eq!(json["tool_name"], "Read"); - } - - #[test] - fn serialize_reconciliation_progress_response() { - let resp = WsResponse::ReconciliationProgress { - story_id: "50_story_x".to_string(), - status: "gates_running".to_string(), - message: "Running clippy...".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "reconciliation_progress"); - assert_eq!(json["story_id"], "50_story_x"); - assert_eq!(json["status"], "gates_running"); - assert_eq!(json["message"], "Running clippy..."); - } - - #[test] - fn serialize_wizard_state_response() { - let resp = WsResponse::WizardState { - steps: vec![WizardStepInfo { - step: "scaffold".to_string(), - label: "Scaffold directory structure".to_string(), - status: "pending".to_string(), - content: None, - }], - current_step_index: 0, - completed: false, - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "wizard_state"); - assert_eq!(json["steps"][0]["step"], "scaffold"); - assert_eq!(json["current_step_index"], 0); - assert_eq!(json["completed"], false); - } - - #[test] - fn serialize_side_question_token() { - let resp = WsResponse::SideQuestionToken { - content: "partial answer".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "side_question_token"); - assert_eq!(json["content"], "partial answer"); - } - - #[test] - fn serialize_side_question_done() { - let resp = WsResponse::SideQuestionDone { - response: "full answer".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "side_question_done"); - assert_eq!(json["response"], "full answer"); - } - - #[test] - fn serialize_log_entry() { - let resp = WsResponse::LogEntry { - timestamp: "2026-01-01T00:00:00Z".to_string(), - level: "INFO".to_string(), - message: "server started".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "log_entry"); - assert_eq!(json["level"], "INFO"); - assert_eq!(json["message"], "server started"); - } - - // ── watcher_event_to_response ─────────────────────────────────── - - #[test] - fn watcher_work_item_converts_to_ws_response() { - let evt = WatcherEvent::WorkItem { - stage: "2_current".to_string(), - item_id: "42_story_foo".to_string(), - action: "start".to_string(), - commit_msg: "huskies: start 42_story_foo".to_string(), - from_stage: None, - }; - let ws_msg = watcher_event_to_response(evt).expect("WorkItem should produce Some"); - let json = serde_json::to_value(&ws_msg).unwrap(); - assert_eq!(json["type"], "work_item_changed"); - assert_eq!(json["stage"], "2_current"); - assert_eq!(json["item_id"], "42_story_foo"); - assert_eq!(json["action"], "start"); - } - - #[test] - fn watcher_config_changed_converts_to_ws_response() { - let evt = WatcherEvent::ConfigChanged; - let ws_msg = watcher_event_to_response(evt).expect("ConfigChanged should produce Some"); - let json = serde_json::to_value(&ws_msg).unwrap(); - assert_eq!(json["type"], "agent_config_changed"); - } - - #[test] - fn watcher_agent_state_changed_converts_to_ws_response() { - let evt = WatcherEvent::AgentStateChanged; - let ws_msg = watcher_event_to_response(evt).expect("AgentStateChanged should produce Some"); - let json = serde_json::to_value(&ws_msg).unwrap(); - assert_eq!(json["type"], "agent_state_changed"); - } - - #[test] - fn watcher_merge_failure_produces_none() { - let evt = WatcherEvent::MergeFailure { - story_id: "x".to_string(), - reason: "conflict".to_string(), - }; - assert!(watcher_event_to_response(evt).is_none()); - } - - #[test] - fn watcher_rate_limit_warning_produces_none() { - let evt = WatcherEvent::RateLimitWarning { - story_id: "x".to_string(), - agent_name: "coder".to_string(), - }; - assert!(watcher_event_to_response(evt).is_none()); - } - - #[test] - fn watcher_story_blocked_produces_none() { - let evt = WatcherEvent::StoryBlocked { - story_id: "x".to_string(), - reason: "retries exhausted".to_string(), - }; - assert!(watcher_event_to_response(evt).is_none()); - } - - #[test] - fn watcher_rate_limit_hard_block_produces_none() { - let evt = WatcherEvent::RateLimitHardBlock { - story_id: "x".to_string(), - agent_name: "coder".to_string(), - reset_at: chrono::Utc::now(), - }; - assert!(watcher_event_to_response(evt).is_none()); - } - - // ── needs_pipeline_refresh ────────────────────────────────────── - - #[test] - fn work_item_needs_pipeline_refresh() { - let evt = WatcherEvent::WorkItem { - stage: "2_current".to_string(), - item_id: "x".to_string(), - action: "start".to_string(), - commit_msg: "msg".to_string(), - from_stage: None, - }; - assert!(needs_pipeline_refresh(&evt)); - } - - #[test] - fn agent_state_changed_needs_pipeline_refresh() { - assert!(needs_pipeline_refresh(&WatcherEvent::AgentStateChanged)); - } - - #[test] - fn config_changed_does_not_need_pipeline_refresh() { - assert!(!needs_pipeline_refresh(&WatcherEvent::ConfigChanged)); - } - - #[test] - fn merge_failure_does_not_need_pipeline_refresh() { - let evt = WatcherEvent::MergeFailure { - story_id: "x".to_string(), - reason: "y".to_string(), - }; - assert!(!needs_pipeline_refresh(&evt)); - } - - // ── pipeline_state_to_response ────────────────────────────────── - - #[test] - fn pipeline_state_converts_to_ws_response() { - let state = PipelineState { - backlog: vec![UpcomingStory { - story_id: "1_story_a".to_string(), - name: Some("Story A".to_string()), - error: None, - merge_failure: None, - agent: None, - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }], - current: vec![UpcomingStory { - story_id: "2_story_b".to_string(), - name: Some("Story B".to_string()), - error: None, - merge_failure: None, - agent: None, - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }], - qa: vec![], - merge: vec![], - done: vec![UpcomingStory { - story_id: "50_story_done".to_string(), - name: Some("Done Story".to_string()), - error: None, - merge_failure: None, - agent: None, - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }], - }; - let resp = pipeline_state_to_response(state); - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "pipeline_state"); - assert_eq!(json["backlog"].as_array().unwrap().len(), 1); - assert_eq!(json["backlog"][0]["story_id"], "1_story_a"); - assert_eq!(json["current"].as_array().unwrap().len(), 1); - assert_eq!(json["current"][0]["story_id"], "2_story_b"); - assert!(json["qa"].as_array().unwrap().is_empty()); - assert!(json["merge"].as_array().unwrap().is_empty()); - assert_eq!(json["done"].as_array().unwrap().len(), 1); - assert_eq!(json["done"][0]["story_id"], "50_story_done"); - } - - #[test] - fn empty_pipeline_state_converts_to_ws_response() { - let state = PipelineState { - backlog: vec![], - current: vec![], - qa: vec![], - merge: vec![], - done: vec![], - }; - let resp = pipeline_state_to_response(state); - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "pipeline_state"); - assert!(json["backlog"].as_array().unwrap().is_empty()); - assert!(json["current"].as_array().unwrap().is_empty()); - assert!(json["qa"].as_array().unwrap().is_empty()); - assert!(json["merge"].as_array().unwrap().is_empty()); - assert!(json["done"].as_array().unwrap().is_empty()); - } - - #[test] - fn pipeline_state_with_agent_converts_correctly() { - let state = PipelineState { - backlog: vec![], - current: vec![UpcomingStory { - story_id: "10_story_x".to_string(), - name: Some("Story X".to_string()), - error: None, - merge_failure: None, - agent: Some(crate::http::workflow::pipeline::AgentAssignment { - agent_name: "coder-1".to_string(), - model: Some("claude-3-5-sonnet".to_string()), - status: "running".to_string(), - }), - review_hold: None, - qa: None, - retry_count: None, - blocked: None, - depends_on: None, - }], - qa: vec![], - merge: vec![], - done: vec![], - }; - let resp: WsResponse = state.into(); - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["current"][0]["agent"]["agent_name"], "coder-1"); - assert_eq!(json["current"][0]["agent"]["model"], "claude-3-5-sonnet"); - assert_eq!(json["current"][0]["agent"]["status"], "running"); - } - - // ── WsResponse JSON round-trip (string form) ──────────────────── - - #[test] - fn ws_response_serializes_to_parseable_json_string() { - let resp = WsResponse::Error { - message: "test error".to_string(), - }; - let text = serde_json::to_string(&resp).unwrap(); - let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); - assert_eq!(parsed["type"], "error"); - assert_eq!(parsed["message"], "test error"); - } - - #[test] - fn ws_response_update_with_empty_messages() { - let resp = WsResponse::Update { messages: vec![] }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "update"); - assert!(json["messages"].as_array().unwrap().is_empty()); - } - - #[test] - fn ws_response_token_with_empty_content() { - let resp = WsResponse::Token { - content: String::new(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["type"], "token"); - assert_eq!(json["content"], ""); - } - - #[test] - fn ws_response_error_with_special_characters() { - let resp = WsResponse::Error { - message: "error: \"quoted\" & ".to_string(), - }; - let text = serde_json::to_string(&resp).unwrap(); - let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); - assert_eq!(parsed["message"], "error: \"quoted\" & "); - } - - // ── WsRequest edge cases ──────────────────────────────────────── - - #[test] - fn deserialize_chat_with_multiple_messages() { - let json = r#"{ - "type": "chat", - "messages": [ - {"role": "system", "content": "You are a helpful assistant."}, - {"role": "user", "content": "Hello"}, - {"role": "assistant", "content": "Hi there!"}, - {"role": "user", "content": "How are you?"} - ], - "config": { - "provider": "ollama", - "model": "llama3" - } - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::Chat { messages, .. } => { - assert_eq!(messages.len(), 4); - assert_eq!(messages[0].role, crate::llm::types::Role::System); - assert_eq!(messages[3].role, crate::llm::types::Role::User); - } - _ => panic!("expected Chat variant"), - } - } - - #[test] - fn deserialize_chat_with_tool_call_message() { - let json = r#"{ - "type": "chat", - "messages": [ - { - "role": "assistant", - "content": "", - "tool_calls": [ - { - "id": "call_1", - "type": "function", - "function": { - "name": "read_file", - "arguments": "{\"path\": \"/tmp/test.rs\"}" - } - } - ] - } - ], - "config": { - "provider": "anthropic", - "model": "claude-3-5-sonnet" - } - }"#; - let req: WsRequest = serde_json::from_str(json).unwrap(); - match req { - WsRequest::Chat { messages, .. } => { - assert_eq!(messages.len(), 1); - let tc = messages[0].tool_calls.as_ref().unwrap(); - assert_eq!(tc.len(), 1); - assert_eq!(tc[0].function.name, "read_file"); - } - _ => panic!("expected Chat variant"), - } - } - - // ── wizard_steps_to_info ──────────────────────────────────────── - - #[test] - fn wizard_steps_to_info_empty() { - let result = wizard_steps_to_info(&[]); - assert!(result.is_empty()); - } - - // ── Reconciliation progress done event ────────────────────────── - - #[test] - fn reconciliation_done_event_has_empty_story_id() { - let resp = WsResponse::ReconciliationProgress { - story_id: String::new(), - status: "done".to_string(), - message: "Reconciliation complete".to_string(), - }; - let json = serde_json::to_value(&resp).unwrap(); - assert_eq!(json["story_id"], ""); - assert_eq!(json["status"], "done"); - } -} diff --git a/server/src/service/ws/message/convert.rs b/server/src/service/ws/message/convert.rs new file mode 100644 index 00000000..f9bd60fd --- /dev/null +++ b/server/src/service/ws/message/convert.rs @@ -0,0 +1,310 @@ +//! Conversions from domain events to WebSocket response messages. + +use crate::http::workflow::PipelineState; +use crate::io::watcher::WatcherEvent; + +use super::response::{WizardStepInfo, WsResponse}; + +/// Convert a [`WatcherEvent`] to an optional [`WsResponse`]. +/// +/// Returns `None` for events that have no WebSocket representation +/// (e.g. `MergeFailure`, `StoryBlocked` — handled elsewhere). +pub fn watcher_event_to_response(e: WatcherEvent) -> Option { + match e { + WatcherEvent::WorkItem { + stage, + item_id, + action, + commit_msg, + .. + } => Some(WsResponse::WorkItemChanged { + stage, + item_id, + action, + commit_msg, + }), + WatcherEvent::ConfigChanged => Some(WsResponse::AgentConfigChanged), + WatcherEvent::AgentStateChanged => Some(WsResponse::AgentStateChanged), + // MergeFailure, RateLimitWarning, StoryBlocked, and RateLimitHardBlock are handled + // by the chat notification listener only; no WebSocket message is needed for the frontend. + WatcherEvent::MergeFailure { .. } => None, + WatcherEvent::RateLimitWarning { .. } => None, + WatcherEvent::StoryBlocked { .. } => None, + WatcherEvent::RateLimitHardBlock { .. } => None, + // OAuth events are forwarded to chat transports only; no WebSocket message for the frontend. + WatcherEvent::OAuthAccountSwapped { .. } => None, + WatcherEvent::OAuthAccountsExhausted { .. } => None, + } +} + +/// Returns `true` if this watcher event should trigger a pipeline state refresh. +pub fn needs_pipeline_refresh(evt: &WatcherEvent) -> bool { + matches!( + evt, + WatcherEvent::WorkItem { .. } | WatcherEvent::AgentStateChanged + ) +} + +/// Convert a [`PipelineState`] to a [`WsResponse::PipelineState`]. +pub fn pipeline_state_to_response(s: PipelineState) -> WsResponse { + WsResponse::PipelineState { + backlog: s.backlog, + current: s.current, + qa: s.qa, + merge: s.merge, + done: s.done, + } +} + +/// Build a [`WizardStepInfo`] list from wizard step states. +/// +/// Pure conversion — reads no filesystem, just transforms data. +pub fn wizard_steps_to_info(steps: &[crate::io::wizard::StepState]) -> Vec { + steps + .iter() + .map(|s| WizardStepInfo { + step: serde_json::to_value(s.step) + .ok() + .and_then(|v| v.as_str().map(String::from)) + .unwrap_or_default(), + label: s.step.label().to_string(), + status: serde_json::to_value(&s.status) + .ok() + .and_then(|v| v.as_str().map(String::from)) + .unwrap_or_default(), + content: s.content.clone(), + }) + .collect() +} + +// Keep backward-compatible From impls so existing code compiles during migration. +impl From for Option { + fn from(e: WatcherEvent) -> Self { + watcher_event_to_response(e) + } +} + +impl From for WsResponse { + fn from(s: PipelineState) -> Self { + pipeline_state_to_response(s) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::http::workflow::UpcomingStory; + use crate::io::watcher::WatcherEvent; + + #[test] + fn watcher_work_item_converts_to_ws_response() { + let evt = WatcherEvent::WorkItem { + stage: "2_current".to_string(), + item_id: "42_story_foo".to_string(), + action: "start".to_string(), + commit_msg: "huskies: start 42_story_foo".to_string(), + from_stage: None, + }; + let ws_msg = watcher_event_to_response(evt).expect("WorkItem should produce Some"); + let json = serde_json::to_value(&ws_msg).unwrap(); + assert_eq!(json["type"], "work_item_changed"); + assert_eq!(json["stage"], "2_current"); + assert_eq!(json["item_id"], "42_story_foo"); + assert_eq!(json["action"], "start"); + } + + #[test] + fn watcher_config_changed_converts_to_ws_response() { + let evt = WatcherEvent::ConfigChanged; + let ws_msg = watcher_event_to_response(evt).expect("ConfigChanged should produce Some"); + let json = serde_json::to_value(&ws_msg).unwrap(); + assert_eq!(json["type"], "agent_config_changed"); + } + + #[test] + fn watcher_agent_state_changed_converts_to_ws_response() { + let evt = WatcherEvent::AgentStateChanged; + let ws_msg = watcher_event_to_response(evt).expect("AgentStateChanged should produce Some"); + let json = serde_json::to_value(&ws_msg).unwrap(); + assert_eq!(json["type"], "agent_state_changed"); + } + + #[test] + fn watcher_merge_failure_produces_none() { + let evt = WatcherEvent::MergeFailure { + story_id: "x".to_string(), + reason: "conflict".to_string(), + }; + assert!(watcher_event_to_response(evt).is_none()); + } + + #[test] + fn watcher_rate_limit_warning_produces_none() { + let evt = WatcherEvent::RateLimitWarning { + story_id: "x".to_string(), + agent_name: "coder".to_string(), + }; + assert!(watcher_event_to_response(evt).is_none()); + } + + #[test] + fn watcher_story_blocked_produces_none() { + let evt = WatcherEvent::StoryBlocked { + story_id: "x".to_string(), + reason: "retries exhausted".to_string(), + }; + assert!(watcher_event_to_response(evt).is_none()); + } + + #[test] + fn watcher_rate_limit_hard_block_produces_none() { + let evt = WatcherEvent::RateLimitHardBlock { + story_id: "x".to_string(), + agent_name: "coder".to_string(), + reset_at: chrono::Utc::now(), + }; + assert!(watcher_event_to_response(evt).is_none()); + } + + #[test] + fn work_item_needs_pipeline_refresh() { + let evt = WatcherEvent::WorkItem { + stage: "2_current".to_string(), + item_id: "x".to_string(), + action: "start".to_string(), + commit_msg: "msg".to_string(), + from_stage: None, + }; + assert!(needs_pipeline_refresh(&evt)); + } + + #[test] + fn agent_state_changed_needs_pipeline_refresh() { + assert!(needs_pipeline_refresh(&WatcherEvent::AgentStateChanged)); + } + + #[test] + fn config_changed_does_not_need_pipeline_refresh() { + assert!(!needs_pipeline_refresh(&WatcherEvent::ConfigChanged)); + } + + #[test] + fn merge_failure_does_not_need_pipeline_refresh() { + let evt = WatcherEvent::MergeFailure { + story_id: "x".to_string(), + reason: "y".to_string(), + }; + assert!(!needs_pipeline_refresh(&evt)); + } + + #[test] + fn pipeline_state_converts_to_ws_response() { + let state = PipelineState { + backlog: vec![UpcomingStory { + story_id: "1_story_a".to_string(), + name: Some("Story A".to_string()), + error: None, + merge_failure: None, + agent: None, + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }], + current: vec![UpcomingStory { + story_id: "2_story_b".to_string(), + name: Some("Story B".to_string()), + error: None, + merge_failure: None, + agent: None, + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }], + qa: vec![], + merge: vec![], + done: vec![UpcomingStory { + story_id: "50_story_done".to_string(), + name: Some("Done Story".to_string()), + error: None, + merge_failure: None, + agent: None, + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }], + }; + let resp = pipeline_state_to_response(state); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pipeline_state"); + assert_eq!(json["backlog"].as_array().unwrap().len(), 1); + assert_eq!(json["backlog"][0]["story_id"], "1_story_a"); + assert_eq!(json["current"].as_array().unwrap().len(), 1); + assert_eq!(json["current"][0]["story_id"], "2_story_b"); + assert!(json["qa"].as_array().unwrap().is_empty()); + assert!(json["merge"].as_array().unwrap().is_empty()); + assert_eq!(json["done"].as_array().unwrap().len(), 1); + assert_eq!(json["done"][0]["story_id"], "50_story_done"); + } + + #[test] + fn empty_pipeline_state_converts_to_ws_response() { + let state = PipelineState { + backlog: vec![], + current: vec![], + qa: vec![], + merge: vec![], + done: vec![], + }; + let resp = pipeline_state_to_response(state); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pipeline_state"); + assert!(json["backlog"].as_array().unwrap().is_empty()); + assert!(json["current"].as_array().unwrap().is_empty()); + assert!(json["qa"].as_array().unwrap().is_empty()); + assert!(json["merge"].as_array().unwrap().is_empty()); + assert!(json["done"].as_array().unwrap().is_empty()); + } + + #[test] + fn pipeline_state_with_agent_converts_correctly() { + let state = PipelineState { + backlog: vec![], + current: vec![UpcomingStory { + story_id: "10_story_x".to_string(), + name: Some("Story X".to_string()), + error: None, + merge_failure: None, + agent: Some(crate::http::workflow::pipeline::AgentAssignment { + agent_name: "coder-1".to_string(), + model: Some("claude-3-5-sonnet".to_string()), + status: "running".to_string(), + }), + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }], + qa: vec![], + merge: vec![], + done: vec![], + }; + let resp: WsResponse = state.into(); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["current"][0]["agent"]["agent_name"], "coder-1"); + assert_eq!(json["current"][0]["agent"]["model"], "claude-3-5-sonnet"); + assert_eq!(json["current"][0]["agent"]["status"], "running"); + } + + #[test] + fn wizard_steps_to_info_empty() { + let result = wizard_steps_to_info(&[]); + assert!(result.is_empty()); + } +} diff --git a/server/src/service/ws/message/mod.rs b/server/src/service/ws/message/mod.rs new file mode 100644 index 00000000..0bb06ed1 --- /dev/null +++ b/server/src/service/ws/message/mod.rs @@ -0,0 +1,13 @@ +//! Pure WebSocket message types — no side effects. +//! +//! `WsRequest` and `WsResponse` define the client/server protocol. +//! Conversions from domain events to WsResponse live here too. +//! All logic is pure data transformation; I/O lives in `io.rs`. + +pub mod convert; +pub mod request; +pub mod response; + +pub use convert::{needs_pipeline_refresh, wizard_steps_to_info}; +pub use request::WsRequest; +pub use response::{WizardStepInfo, WsResponse}; diff --git a/server/src/service/ws/message/request.rs b/server/src/service/ws/message/request.rs new file mode 100644 index 00000000..64572307 --- /dev/null +++ b/server/src/service/ws/message/request.rs @@ -0,0 +1,281 @@ +//! WebSocket request messages sent by the client. + +use crate::llm::chat; +use crate::llm::types::Message; +use serde::Deserialize; + +/// WebSocket request messages sent by the client. +/// +/// - `chat` starts a streaming chat session. +/// - `cancel` stops the active session. +/// - `permission_response` approves or denies a pending permission request. +#[derive(Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WsRequest { + Chat { + messages: Vec, + config: chat::ProviderConfig, + }, + Cancel, + PermissionResponse { + request_id: String, + approved: bool, + #[serde(default)] + always_allow: bool, + }, + /// Heartbeat ping from the client. The server responds with `Pong` so the + /// client can detect stale (half-closed) connections. + Ping, + /// A quick side question answered from current conversation context. + /// The question and response are NOT added to the conversation history + /// and no tool calls are made. + SideQuestion { + question: String, + context_messages: Vec, + config: chat::ProviderConfig, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn deserialize_chat_request() { + let json = r#"{ + "type": "chat", + "messages": [ + {"role": "user", "content": "hello"} + ], + "config": { + "provider": "ollama", + "model": "llama3" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, config } => { + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].content, "hello"); + assert_eq!(config.provider, "ollama"); + assert_eq!(config.model, "llama3"); + } + _ => panic!("expected Chat variant"), + } + } + + #[test] + fn deserialize_chat_request_with_optional_fields() { + let json = r#"{ + "type": "chat", + "messages": [], + "config": { + "provider": "anthropic", + "model": "claude-3-5-sonnet", + "base_url": "https://api.anthropic.com", + "enable_tools": true, + "session_id": "sess-123" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, config } => { + assert!(messages.is_empty()); + assert_eq!( + config.base_url.as_deref(), + Some("https://api.anthropic.com") + ); + assert_eq!(config.enable_tools, Some(true)); + assert_eq!(config.session_id.as_deref(), Some("sess-123")); + } + _ => panic!("expected Chat variant"), + } + } + + #[test] + fn deserialize_cancel_request() { + let json = r#"{"type": "cancel"}"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + assert!(matches!(req, WsRequest::Cancel)); + } + + #[test] + fn deserialize_ping_request() { + let json = r#"{"type": "ping"}"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + assert!(matches!(req, WsRequest::Ping)); + } + + #[test] + fn deserialize_permission_response_approved() { + let json = r#"{ + "type": "permission_response", + "request_id": "req-42", + "approved": true + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::PermissionResponse { + request_id, + approved, + always_allow, + } => { + assert_eq!(request_id, "req-42"); + assert!(approved); + assert!(!always_allow); + } + _ => panic!("expected PermissionResponse variant"), + } + } + + #[test] + fn deserialize_permission_response_denied() { + let json = r#"{ + "type": "permission_response", + "request_id": "req-99", + "approved": false + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::PermissionResponse { + request_id, + approved, + always_allow, + } => { + assert_eq!(request_id, "req-99"); + assert!(!approved); + assert!(!always_allow); + } + _ => panic!("expected PermissionResponse variant"), + } + } + + #[test] + fn deserialize_permission_response_always_allow() { + let json = r#"{ + "type": "permission_response", + "request_id": "req-100", + "approved": true, + "always_allow": true + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::PermissionResponse { + request_id, + approved, + always_allow, + } => { + assert_eq!(request_id, "req-100"); + assert!(approved); + assert!(always_allow); + } + _ => panic!("expected PermissionResponse variant"), + } + } + + #[test] + fn deserialize_unknown_type_fails() { + let json = r#"{"type": "unknown_type"}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + #[test] + fn deserialize_invalid_json_fails() { + let result: Result = serde_json::from_str("not json"); + assert!(result.is_err()); + } + + #[test] + fn deserialize_missing_type_tag_fails() { + let json = r#"{"messages": [], "config": {"provider": "x", "model": "y"}}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + #[test] + fn deserialize_side_question() { + let json = r#"{ + "type": "side_question", + "question": "what is this?", + "context_messages": [{"role": "user", "content": "hi"}], + "config": {"provider": "ollama", "model": "llama3"} + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::SideQuestion { + question, + context_messages, + config, + } => { + assert_eq!(question, "what is this?"); + assert_eq!(context_messages.len(), 1); + assert_eq!(config.model, "llama3"); + } + _ => panic!("expected SideQuestion variant"), + } + } + + #[test] + fn deserialize_chat_with_multiple_messages() { + let json = r#"{ + "type": "chat", + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + {"role": "user", "content": "How are you?"} + ], + "config": { + "provider": "ollama", + "model": "llama3" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, .. } => { + assert_eq!(messages.len(), 4); + assert_eq!(messages[0].role, crate::llm::types::Role::System); + assert_eq!(messages[3].role, crate::llm::types::Role::User); + } + _ => panic!("expected Chat variant"), + } + } + + #[test] + fn deserialize_chat_with_tool_call_message() { + let json = r#"{ + "type": "chat", + "messages": [ + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_1", + "type": "function", + "function": { + "name": "read_file", + "arguments": "{\"path\": \"/tmp/test.rs\"}" + } + } + ] + } + ], + "config": { + "provider": "anthropic", + "model": "claude-3-5-sonnet" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, .. } => { + assert_eq!(messages.len(), 1); + let tc = messages[0].tool_calls.as_ref().unwrap(); + assert_eq!(tc.len(), 1); + assert_eq!(tc[0].function.name, "read_file"); + } + _ => panic!("expected Chat variant"), + } + } +} diff --git a/server/src/service/ws/message/response.rs b/server/src/service/ws/message/response.rs new file mode 100644 index 00000000..259b3b5b --- /dev/null +++ b/server/src/service/ws/message/response.rs @@ -0,0 +1,411 @@ +//! WebSocket response messages sent by the server. + +use crate::http::workflow::UpcomingStory; +use crate::llm::types::Message; +use crate::service::status::StatusEvent; +use serde::Serialize; + +/// Serialisable summary of a single wizard step for WebSocket broadcast. +#[derive(Serialize, Clone, Debug, PartialEq)] +pub struct WizardStepInfo { + pub step: String, + pub label: String, + pub status: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub content: Option, +} + +/// WebSocket response messages sent by the server. +/// +/// - `token` streams partial model output. +/// - `update` pushes the updated message history. +/// - `error` reports a request or processing failure. +/// - `work_item_changed` notifies that a `.huskies/work/` file changed. +/// - `agent_config_changed` notifies that `.huskies/project.toml` was modified. +#[derive(Serialize, Debug)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum WsResponse { + Token { + content: String, + }, + Update { + messages: Vec, + }, + /// Session ID for Claude Code conversation resumption. + SessionId { + session_id: String, + }, + Error { + message: String, + }, + /// Filesystem watcher notification: a work-pipeline file was created or + /// modified and auto-committed. The frontend can use this to refresh its + /// story/bug list without polling. + WorkItemChanged { + stage: String, + item_id: String, + action: String, + commit_msg: String, + }, + /// Full pipeline state pushed on connect and after every work-item watcher event. + PipelineState { + backlog: Vec, + current: Vec, + qa: Vec, + merge: Vec, + done: Vec, + }, + /// `.huskies/project.toml` was modified; the frontend should re-fetch the + /// agent roster. Does NOT trigger a pipeline state refresh. + AgentConfigChanged, + /// An agent's state changed (started, stopped, completed, etc.). + /// Triggers a pipeline state refresh and tells the frontend to re-fetch + /// the agent list. + AgentStateChanged, + /// Claude Code is requesting user approval before executing a tool. + PermissionRequest { + request_id: String, + tool_name: String, + tool_input: serde_json::Value, + }, + /// The agent started assembling a tool call; shows live status in the UI. + ToolActivity { + tool_name: String, + }, + /// Real-time progress from the server startup reconciliation pass. + /// `status` is one of: "checking", "gates_running", "advanced", "skipped", + /// "failed", "done". `story_id` is empty for the overall "done" event. + ReconciliationProgress { + story_id: String, + status: String, + message: String, + }, + /// Heartbeat response to a client `Ping`. Lets the client confirm the + /// connection is alive and cancel any stale-connection timeout. + Pong, + /// Streaming thinking token from an extended-thinking block. + /// Sent separately from `Token` so the frontend can render them in + /// a constrained, scrollable ThinkingBlock rather than inline. + ThinkingToken { + content: String, + }, + /// Sent on connect when the project's spec files still contain scaffold + /// placeholder content and the user needs to go through onboarding. + OnboardingStatus { + needs_onboarding: bool, + }, + /// Sent on connect when a setup wizard is active. Contains the full + /// wizard state so the frontend can render the step-by-step UI. + WizardState { + steps: Vec, + current_step_index: usize, + completed: bool, + }, + /// Streaming token from a `/btw` side question response. + SideQuestionToken { + content: String, + }, + /// Final signal that the `/btw` side question has been fully answered. + SideQuestionDone { + response: String, + }, + /// A single server log entry. Sent in bulk on connect (recent history), + /// then streamed live as new entries arrive. + LogEntry { + timestamp: String, + level: String, + message: String, + }, + /// A structured pipeline status event forwarded from the status broadcaster. + /// + /// The structured [`StatusEvent`] fields are preserved on the wire so + /// frontend consumers can do per-type presentation without parsing strings. + /// This frame intentionally does NOT call `format_status_event` — that + /// formatter is reserved for chat transports (story 644). + StatusUpdate { + event: StatusEvent, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::http::workflow::UpcomingStory; + + #[test] + fn serialize_token_response() { + let resp = WsResponse::Token { + content: "hello world".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "token"); + assert_eq!(json["content"], "hello world"); + } + + #[test] + fn serialize_update_response() { + let msg = Message { + role: crate::llm::types::Role::Assistant, + content: "response".to_string(), + tool_calls: None, + tool_call_id: None, + }; + let resp = WsResponse::Update { + messages: vec![msg], + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "update"); + assert_eq!(json["messages"].as_array().unwrap().len(), 1); + assert_eq!(json["messages"][0]["content"], "response"); + } + + #[test] + fn serialize_session_id_response() { + let resp = WsResponse::SessionId { + session_id: "sess-abc".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "session_id"); + assert_eq!(json["session_id"], "sess-abc"); + } + + #[test] + fn serialize_error_response() { + let resp = WsResponse::Error { + message: "something broke".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "error"); + assert_eq!(json["message"], "something broke"); + } + + #[test] + fn serialize_work_item_changed_response() { + let resp = WsResponse::WorkItemChanged { + stage: "2_current".to_string(), + item_id: "42_story_foo".to_string(), + action: "start".to_string(), + commit_msg: "huskies: start 42_story_foo".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "work_item_changed"); + assert_eq!(json["stage"], "2_current"); + assert_eq!(json["item_id"], "42_story_foo"); + assert_eq!(json["action"], "start"); + assert_eq!(json["commit_msg"], "huskies: start 42_story_foo"); + } + + #[test] + fn serialize_pipeline_state_response() { + let story = UpcomingStory { + story_id: "10_story_test".to_string(), + name: Some("Test".to_string()), + error: None, + merge_failure: None, + agent: None, + review_hold: None, + qa: None, + retry_count: None, + blocked: None, + depends_on: None, + }; + let resp = WsResponse::PipelineState { + backlog: vec![story], + current: vec![], + qa: vec![], + merge: vec![], + done: vec![], + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pipeline_state"); + assert_eq!(json["backlog"].as_array().unwrap().len(), 1); + assert_eq!(json["backlog"][0]["story_id"], "10_story_test"); + assert!(json["current"].as_array().unwrap().is_empty()); + assert!(json["done"].as_array().unwrap().is_empty()); + } + + #[test] + fn serialize_agent_config_changed_response() { + let resp = WsResponse::AgentConfigChanged; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "agent_config_changed"); + } + + #[test] + fn serialize_pong_response() { + let resp = WsResponse::Pong; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pong"); + } + + #[test] + fn serialize_thinking_token_response() { + let resp = WsResponse::ThinkingToken { + content: "I need to think about this...".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "thinking_token"); + assert_eq!(json["content"], "I need to think about this..."); + } + + #[test] + fn serialize_onboarding_status_true() { + let resp = WsResponse::OnboardingStatus { + needs_onboarding: true, + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "onboarding_status"); + assert_eq!(json["needs_onboarding"], true); + } + + #[test] + fn serialize_onboarding_status_false() { + let resp = WsResponse::OnboardingStatus { + needs_onboarding: false, + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "onboarding_status"); + assert_eq!(json["needs_onboarding"], false); + } + + #[test] + fn serialize_permission_request_response() { + let resp = WsResponse::PermissionRequest { + request_id: "perm-1".to_string(), + tool_name: "Bash".to_string(), + tool_input: serde_json::json!({"command": "ls"}), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "permission_request"); + assert_eq!(json["request_id"], "perm-1"); + assert_eq!(json["tool_name"], "Bash"); + assert_eq!(json["tool_input"]["command"], "ls"); + } + + #[test] + fn serialize_tool_activity_response() { + let resp = WsResponse::ToolActivity { + tool_name: "Read".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "tool_activity"); + assert_eq!(json["tool_name"], "Read"); + } + + #[test] + fn serialize_reconciliation_progress_response() { + let resp = WsResponse::ReconciliationProgress { + story_id: "50_story_x".to_string(), + status: "gates_running".to_string(), + message: "Running clippy...".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "reconciliation_progress"); + assert_eq!(json["story_id"], "50_story_x"); + assert_eq!(json["status"], "gates_running"); + assert_eq!(json["message"], "Running clippy..."); + } + + #[test] + fn serialize_wizard_state_response() { + let resp = WsResponse::WizardState { + steps: vec![WizardStepInfo { + step: "scaffold".to_string(), + label: "Scaffold directory structure".to_string(), + status: "pending".to_string(), + content: None, + }], + current_step_index: 0, + completed: false, + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "wizard_state"); + assert_eq!(json["steps"][0]["step"], "scaffold"); + assert_eq!(json["current_step_index"], 0); + assert_eq!(json["completed"], false); + } + + #[test] + fn serialize_side_question_token() { + let resp = WsResponse::SideQuestionToken { + content: "partial answer".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "side_question_token"); + assert_eq!(json["content"], "partial answer"); + } + + #[test] + fn serialize_side_question_done() { + let resp = WsResponse::SideQuestionDone { + response: "full answer".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "side_question_done"); + assert_eq!(json["response"], "full answer"); + } + + #[test] + fn serialize_log_entry() { + let resp = WsResponse::LogEntry { + timestamp: "2026-01-01T00:00:00Z".to_string(), + level: "INFO".to_string(), + message: "server started".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "log_entry"); + assert_eq!(json["level"], "INFO"); + assert_eq!(json["message"], "server started"); + } + + #[test] + fn ws_response_serializes_to_parseable_json_string() { + let resp = WsResponse::Error { + message: "test error".to_string(), + }; + let text = serde_json::to_string(&resp).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); + assert_eq!(parsed["type"], "error"); + assert_eq!(parsed["message"], "test error"); + } + + #[test] + fn ws_response_update_with_empty_messages() { + let resp = WsResponse::Update { messages: vec![] }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "update"); + assert!(json["messages"].as_array().unwrap().is_empty()); + } + + #[test] + fn ws_response_token_with_empty_content() { + let resp = WsResponse::Token { + content: String::new(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "token"); + assert_eq!(json["content"], ""); + } + + #[test] + fn ws_response_error_with_special_characters() { + let resp = WsResponse::Error { + message: "error: \"quoted\" & ".to_string(), + }; + let text = serde_json::to_string(&resp).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); + assert_eq!(parsed["message"], "error: \"quoted\" & "); + } + + #[test] + fn reconciliation_done_event_has_empty_story_id() { + let resp = WsResponse::ReconciliationProgress { + story_id: String::new(), + status: "done".to_string(), + message: "Reconciliation complete".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["story_id"], ""); + assert_eq!(json["status"], "done"); + } +}