diff --git a/Cargo.lock b/Cargo.lock index e133310..7183238 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2156,6 +2156,7 @@ dependencies = [ "strip-ansi-escapes", "tempfile", "tokio", + "tokio-tungstenite", "toml", "uuid", "walkdir", diff --git a/Cargo.toml b/Cargo.toml index 78c663c..374e2a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,4 +26,5 @@ tempfile = "3" tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] } toml = "1.0.3+spec-1.1.0" uuid = { version = "1.21.0", features = ["v4", "serde"] } +tokio-tungstenite = "0.27" walkdir = "2.5.0" diff --git a/server/Cargo.toml b/server/Cargo.toml index a87581a..16a1ea9 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -31,3 +31,4 @@ walkdir = { workspace = true } [dev-dependencies] tempfile = { workspace = true } +tokio-tungstenite = { workspace = true } diff --git a/server/src/http/ws.rs b/server/src/http/ws.rs index 6d8cdde..124a2c3 100644 --- a/server/src/http/ws.rs +++ b/server/src/http/ws.rs @@ -320,3 +320,757 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc>) -> impl poem let _ = forward.await; }) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::http::workflow::{PipelineState, UpcomingStory}; + use crate::io::watcher::WatcherEvent; + + // ── WsRequest deserialization ──────────────────────────────────── + + #[test] + fn deserialize_chat_request() { + let json = r#"{ + "type": "chat", + "messages": [ + {"role": "user", "content": "hello"} + ], + "config": { + "provider": "ollama", + "model": "llama3" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, config } => { + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].content, "hello"); + assert_eq!(config.provider, "ollama"); + assert_eq!(config.model, "llama3"); + } + _ => panic!("expected Chat variant"), + } + } + + #[test] + fn deserialize_chat_request_with_optional_fields() { + let json = r#"{ + "type": "chat", + "messages": [], + "config": { + "provider": "anthropic", + "model": "claude-3-5-sonnet", + "base_url": "https://api.anthropic.com", + "enable_tools": true, + "session_id": "sess-123" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, config } => { + assert!(messages.is_empty()); + assert_eq!(config.base_url.as_deref(), Some("https://api.anthropic.com")); + assert_eq!(config.enable_tools, Some(true)); + assert_eq!(config.session_id.as_deref(), Some("sess-123")); + } + _ => panic!("expected Chat variant"), + } + } + + #[test] + fn deserialize_cancel_request() { + let json = r#"{"type": "cancel"}"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + assert!(matches!(req, WsRequest::Cancel)); + } + + #[test] + fn deserialize_permission_response_approved() { + let json = r#"{ + "type": "permission_response", + "request_id": "req-42", + "approved": true + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::PermissionResponse { + request_id, + approved, + } => { + assert_eq!(request_id, "req-42"); + assert!(approved); + } + _ => panic!("expected PermissionResponse variant"), + } + } + + #[test] + fn deserialize_permission_response_denied() { + let json = r#"{ + "type": "permission_response", + "request_id": "req-99", + "approved": false + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::PermissionResponse { + request_id, + approved, + } => { + assert_eq!(request_id, "req-99"); + assert!(!approved); + } + _ => panic!("expected PermissionResponse variant"), + } + } + + #[test] + fn deserialize_unknown_type_fails() { + let json = r#"{"type": "unknown_type"}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + #[test] + fn deserialize_invalid_json_fails() { + let result: Result = serde_json::from_str("not json"); + assert!(result.is_err()); + } + + #[test] + fn deserialize_missing_type_tag_fails() { + let json = r#"{"messages": [], "config": {"provider": "x", "model": "y"}}"#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + // ── WsResponse serialization ──────────────────────────────────── + + #[test] + fn serialize_token_response() { + let resp = WsResponse::Token { + content: "hello world".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "token"); + assert_eq!(json["content"], "hello world"); + } + + #[test] + fn serialize_update_response() { + let msg = Message { + role: crate::llm::types::Role::Assistant, + content: "response".to_string(), + tool_calls: None, + tool_call_id: None, + }; + let resp = WsResponse::Update { + messages: vec![msg], + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "update"); + assert_eq!(json["messages"].as_array().unwrap().len(), 1); + assert_eq!(json["messages"][0]["content"], "response"); + } + + #[test] + fn serialize_session_id_response() { + let resp = WsResponse::SessionId { + session_id: "sess-abc".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "session_id"); + assert_eq!(json["session_id"], "sess-abc"); + } + + #[test] + fn serialize_error_response() { + let resp = WsResponse::Error { + message: "something broke".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "error"); + assert_eq!(json["message"], "something broke"); + } + + #[test] + fn serialize_work_item_changed_response() { + let resp = WsResponse::WorkItemChanged { + stage: "2_current".to_string(), + item_id: "42_story_foo".to_string(), + action: "start".to_string(), + commit_msg: "story-kit: start 42_story_foo".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "work_item_changed"); + assert_eq!(json["stage"], "2_current"); + assert_eq!(json["item_id"], "42_story_foo"); + assert_eq!(json["action"], "start"); + assert_eq!(json["commit_msg"], "story-kit: start 42_story_foo"); + } + + #[test] + fn serialize_pipeline_state_response() { + let story = crate::http::workflow::UpcomingStory { + story_id: "10_story_test".to_string(), + name: Some("Test".to_string()), + error: None, + agent: None, + }; + let resp = WsResponse::PipelineState { + upcoming: vec![story], + current: vec![], + qa: vec![], + merge: vec![], + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pipeline_state"); + assert_eq!(json["upcoming"].as_array().unwrap().len(), 1); + assert_eq!(json["upcoming"][0]["story_id"], "10_story_test"); + assert!(json["current"].as_array().unwrap().is_empty()); + } + + #[test] + fn serialize_agent_config_changed_response() { + let resp = WsResponse::AgentConfigChanged; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "agent_config_changed"); + } + + #[test] + fn serialize_permission_request_response() { + let resp = WsResponse::PermissionRequest { + request_id: "perm-1".to_string(), + tool_name: "Bash".to_string(), + tool_input: serde_json::json!({"command": "ls"}), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "permission_request"); + assert_eq!(json["request_id"], "perm-1"); + assert_eq!(json["tool_name"], "Bash"); + assert_eq!(json["tool_input"]["command"], "ls"); + } + + #[test] + fn serialize_tool_activity_response() { + let resp = WsResponse::ToolActivity { + tool_name: "Read".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "tool_activity"); + assert_eq!(json["tool_name"], "Read"); + } + + #[test] + fn serialize_reconciliation_progress_response() { + let resp = WsResponse::ReconciliationProgress { + story_id: "50_story_x".to_string(), + status: "gates_running".to_string(), + message: "Running clippy...".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "reconciliation_progress"); + assert_eq!(json["story_id"], "50_story_x"); + assert_eq!(json["status"], "gates_running"); + assert_eq!(json["message"], "Running clippy..."); + } + + // ── From for Option ─────────────────── + + #[test] + fn watcher_work_item_converts_to_ws_response() { + let evt = WatcherEvent::WorkItem { + stage: "2_current".to_string(), + item_id: "42_story_foo".to_string(), + action: "start".to_string(), + commit_msg: "story-kit: start 42_story_foo".to_string(), + }; + let ws_msg: Option = evt.into(); + let ws_msg = ws_msg.expect("WorkItem should produce Some"); + let json = serde_json::to_value(&ws_msg).unwrap(); + assert_eq!(json["type"], "work_item_changed"); + assert_eq!(json["stage"], "2_current"); + assert_eq!(json["item_id"], "42_story_foo"); + assert_eq!(json["action"], "start"); + } + + #[test] + fn watcher_config_changed_converts_to_ws_response() { + let evt = WatcherEvent::ConfigChanged; + let ws_msg: Option = evt.into(); + let ws_msg = ws_msg.expect("ConfigChanged should produce Some"); + let json = serde_json::to_value(&ws_msg).unwrap(); + assert_eq!(json["type"], "agent_config_changed"); + } + + // ── From for WsResponse ────────────────────────── + + #[test] + fn pipeline_state_converts_to_ws_response() { + let state = PipelineState { + upcoming: vec![UpcomingStory { + story_id: "1_story_a".to_string(), + name: Some("Story A".to_string()), + error: None, + agent: None, + }], + current: vec![UpcomingStory { + story_id: "2_story_b".to_string(), + name: Some("Story B".to_string()), + error: None, + agent: None, + }], + qa: vec![], + merge: vec![], + }; + let resp: WsResponse = state.into(); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pipeline_state"); + assert_eq!(json["upcoming"].as_array().unwrap().len(), 1); + assert_eq!(json["upcoming"][0]["story_id"], "1_story_a"); + assert_eq!(json["current"].as_array().unwrap().len(), 1); + assert_eq!(json["current"][0]["story_id"], "2_story_b"); + assert!(json["qa"].as_array().unwrap().is_empty()); + assert!(json["merge"].as_array().unwrap().is_empty()); + } + + #[test] + fn empty_pipeline_state_converts_to_ws_response() { + let state = PipelineState { + upcoming: vec![], + current: vec![], + qa: vec![], + merge: vec![], + }; + let resp: WsResponse = state.into(); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "pipeline_state"); + assert!(json["upcoming"].as_array().unwrap().is_empty()); + assert!(json["current"].as_array().unwrap().is_empty()); + assert!(json["qa"].as_array().unwrap().is_empty()); + assert!(json["merge"].as_array().unwrap().is_empty()); + } + + // ── WsResponse JSON round-trip (string form) ──────────────────── + + #[test] + fn ws_response_serializes_to_parseable_json_string() { + let resp = WsResponse::Error { + message: "test error".to_string(), + }; + let text = serde_json::to_string(&resp).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); + assert_eq!(parsed["type"], "error"); + assert_eq!(parsed["message"], "test error"); + } + + #[test] + fn ws_response_update_with_empty_messages() { + let resp = WsResponse::Update { messages: vec![] }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "update"); + assert!(json["messages"].as_array().unwrap().is_empty()); + } + + #[test] + fn ws_response_token_with_empty_content() { + let resp = WsResponse::Token { + content: String::new(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["type"], "token"); + assert_eq!(json["content"], ""); + } + + #[test] + fn ws_response_error_with_special_characters() { + let resp = WsResponse::Error { + message: "error: \"quoted\" & ".to_string(), + }; + let text = serde_json::to_string(&resp).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&text).unwrap(); + assert_eq!(parsed["message"], "error: \"quoted\" & "); + } + + // ── WsRequest edge cases ──────────────────────────────────────── + + #[test] + fn deserialize_chat_with_multiple_messages() { + let json = r#"{ + "type": "chat", + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + {"role": "user", "content": "How are you?"} + ], + "config": { + "provider": "ollama", + "model": "llama3" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, .. } => { + assert_eq!(messages.len(), 4); + assert_eq!(messages[0].role, crate::llm::types::Role::System); + assert_eq!(messages[3].role, crate::llm::types::Role::User); + } + _ => panic!("expected Chat variant"), + } + } + + #[test] + fn deserialize_chat_with_tool_call_message() { + let json = r#"{ + "type": "chat", + "messages": [ + { + "role": "assistant", + "content": "", + "tool_calls": [ + { + "id": "call_1", + "type": "function", + "function": { + "name": "read_file", + "arguments": "{\"path\": \"/tmp/test.rs\"}" + } + } + ] + } + ], + "config": { + "provider": "anthropic", + "model": "claude-3-5-sonnet" + } + }"#; + let req: WsRequest = serde_json::from_str(json).unwrap(); + match req { + WsRequest::Chat { messages, .. } => { + assert_eq!(messages.len(), 1); + let tc = messages[0].tool_calls.as_ref().unwrap(); + assert_eq!(tc.len(), 1); + assert_eq!(tc[0].function.name, "read_file"); + } + _ => panic!("expected Chat variant"), + } + } + + // ── Pipeline state with agent assignment ──────────────────────── + + #[test] + fn pipeline_state_with_agent_converts_correctly() { + let state = PipelineState { + upcoming: vec![], + current: vec![UpcomingStory { + story_id: "10_story_x".to_string(), + name: Some("Story X".to_string()), + error: None, + agent: Some(crate::http::workflow::AgentAssignment { + agent_name: "coder-1".to_string(), + model: Some("claude-3-5-sonnet".to_string()), + status: "running".to_string(), + }), + }], + qa: vec![], + merge: vec![], + }; + let resp: WsResponse = state.into(); + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["current"][0]["agent"]["agent_name"], "coder-1"); + assert_eq!(json["current"][0]["agent"]["model"], "claude-3-5-sonnet"); + assert_eq!(json["current"][0]["agent"]["status"], "running"); + } + + // ── Reconciliation progress done event ────────────────────────── + + #[test] + fn reconciliation_done_event_has_empty_story_id() { + let resp = WsResponse::ReconciliationProgress { + story_id: String::new(), + status: "done".to_string(), + message: "Reconciliation complete".to_string(), + }; + let json = serde_json::to_value(&resp).unwrap(); + assert_eq!(json["story_id"], ""); + assert_eq!(json["status"], "done"); + } + + // ── ws_handler integration tests (real WebSocket connection) ───── + + use futures::stream::SplitSink; + use poem::EndpointExt; + use tokio_tungstenite::tungstenite; + + /// Helper: construct a tungstenite text message from a string. + fn ws_text(s: &str) -> tungstenite::Message { + tungstenite::Message::Text(s.into()) + } + + /// Helper: start a poem server with ws_handler on an ephemeral port + /// and return the WebSocket URL. + async fn start_test_server() -> (String, Arc) { + let tmp = tempfile::tempdir().unwrap(); + let root = tmp.path().to_path_buf(); + + // Create minimal pipeline dirs so load_pipeline_state succeeds. + for stage in &["1_upcoming", "2_current", "3_qa", "4_merge"] { + std::fs::create_dir_all(root.join(".story_kit").join("work").join(stage)).unwrap(); + } + + let ctx = Arc::new(AppContext::new_test(root)); + let ctx_data = ctx.clone(); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let app = poem::Route::new() + .at("/ws", poem::get(ws_handler)) + .data(ctx_data); + + tokio::spawn(async move { + let acceptor = poem::listener::TcpAcceptor::from_tokio(listener).unwrap(); + let _ = poem::Server::new_with_acceptor(acceptor) + .run(app) + .await; + }); + + // Small delay to let the server start. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + + let url = format!("ws://127.0.0.1:{}/ws", addr.port()); + (url, ctx) + } + + type WsSink = SplitSink< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + tungstenite::Message, + >; + + /// Helper: connect and return (sink, stream) plus read the initial + /// pipeline_state message that is always sent on connect. + async fn connect_ws( + url: &str, + ) -> ( + WsSink, + futures::stream::SplitStream< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + >, + serde_json::Value, + ) { + let (ws, _resp) = tokio_tungstenite::connect_async(url).await.unwrap(); + let (sink, mut stream) = futures::StreamExt::split(ws); + + // The first message should be the initial pipeline_state. + let first = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()) + .await + .expect("timeout waiting for initial message") + .expect("stream ended") + .expect("ws error"); + + let initial: serde_json::Value = match first { + tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(), + other => panic!("expected text message, got: {other:?}"), + }; + + (sink, stream, initial) + } + + /// Read next text message from the stream with a timeout. + async fn next_msg( + stream: &mut futures::stream::SplitStream< + tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >, + >, + ) -> serde_json::Value { + let msg = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next()) + .await + .expect("timeout waiting for message") + .expect("stream ended") + .expect("ws error"); + match msg { + tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(), + other => panic!("expected text message, got: {other:?}"), + } + } + + #[tokio::test] + async fn ws_handler_sends_initial_pipeline_state_on_connect() { + let (url, _ctx) = start_test_server().await; + let (_sink, _stream, initial) = connect_ws(&url).await; + + assert_eq!(initial["type"], "pipeline_state"); + // All stages should be empty arrays since no .md files were created. + assert!(initial["upcoming"].as_array().unwrap().is_empty()); + assert!(initial["current"].as_array().unwrap().is_empty()); + assert!(initial["qa"].as_array().unwrap().is_empty()); + assert!(initial["merge"].as_array().unwrap().is_empty()); + } + + #[tokio::test] + async fn ws_handler_returns_error_for_invalid_json() { + let (url, _ctx) = start_test_server().await; + let (mut sink, mut stream, _initial) = connect_ws(&url).await; + + // Send invalid JSON. + sink.send(ws_text("not valid json")) + .await + .unwrap(); + + let msg = next_msg(&mut stream).await; + assert_eq!(msg["type"], "error"); + assert!( + msg["message"] + .as_str() + .unwrap() + .contains("Invalid request"), + "error message should indicate invalid request, got: {}", + msg["message"] + ); + } + + #[tokio::test] + async fn ws_handler_returns_error_for_unknown_type() { + let (url, _ctx) = start_test_server().await; + let (mut sink, mut stream, _initial) = connect_ws(&url).await; + + // Send a message with an unknown type. + sink.send(ws_text(r#"{"type": "bogus"}"#)) + .await + .unwrap(); + + let msg = next_msg(&mut stream).await; + assert_eq!(msg["type"], "error"); + assert!(msg["message"].as_str().unwrap().contains("Invalid request")); + } + + #[tokio::test] + async fn ws_handler_cancel_outside_chat_does_not_error() { + let (url, _ctx) = start_test_server().await; + let (mut sink, mut stream, _initial) = connect_ws(&url).await; + + // Send cancel when no chat is active — should not produce an error. + sink.send(ws_text(r#"{"type": "cancel"}"#)) + .await + .unwrap(); + + // Send another invalid message to check the connection is still alive. + sink.send(ws_text("{}")) + .await + .unwrap(); + + let msg = next_msg(&mut stream).await; + // The invalid JSON message should produce an error, confirming + // the cancel didn't break the connection. + assert_eq!(msg["type"], "error"); + } + + #[tokio::test] + async fn ws_handler_permission_response_outside_chat_is_ignored() { + let (url, _ctx) = start_test_server().await; + let (mut sink, mut stream, _initial) = connect_ws(&url).await; + + // Send permission response outside an active chat. + sink.send(ws_text( + r#"{"type": "permission_response", "request_id": "x", "approved": true}"#, + )) + .await + .unwrap(); + + // Send a probe message to check the connection is still alive. + sink.send(ws_text("bad")) + .await + .unwrap(); + + let msg = next_msg(&mut stream).await; + assert_eq!(msg["type"], "error"); + assert!(msg["message"].as_str().unwrap().contains("Invalid request")); + } + + #[tokio::test] + async fn ws_handler_forwards_watcher_events() { + let (url, ctx) = start_test_server().await; + let (_sink, mut stream, _initial) = connect_ws(&url).await; + + // Broadcast a watcher event. + ctx.watcher_tx + .send(WatcherEvent::WorkItem { + stage: "2_current".to_string(), + item_id: "99_story_test".to_string(), + action: "start".to_string(), + commit_msg: "story-kit: start 99_story_test".to_string(), + }) + .unwrap(); + + let msg = next_msg(&mut stream).await; + assert_eq!(msg["type"], "work_item_changed"); + assert_eq!(msg["item_id"], "99_story_test"); + assert_eq!(msg["stage"], "2_current"); + + // After a work-item event, a pipeline_state refresh is pushed. + let state_msg = next_msg(&mut stream).await; + assert_eq!(state_msg["type"], "pipeline_state"); + } + + #[tokio::test] + async fn ws_handler_forwards_config_changed_without_pipeline_refresh() { + let (url, ctx) = start_test_server().await; + let (_sink, mut stream, _initial) = connect_ws(&url).await; + + // Broadcast a config-changed event. + ctx.watcher_tx.send(WatcherEvent::ConfigChanged).unwrap(); + + let msg = next_msg(&mut stream).await; + assert_eq!(msg["type"], "agent_config_changed"); + + // Config-changed should NOT be followed by a pipeline_state refresh. + // Send a probe to check no extra message is queued. + ctx.watcher_tx.send(WatcherEvent::ConfigChanged).unwrap(); + let msg2 = next_msg(&mut stream).await; + assert_eq!(msg2["type"], "agent_config_changed"); + } + + #[tokio::test] + async fn ws_handler_forwards_reconciliation_events() { + let (url, ctx) = start_test_server().await; + let (_sink, mut stream, _initial) = connect_ws(&url).await; + + // Broadcast a reconciliation event. + ctx.reconciliation_tx + .send(crate::agents::ReconciliationEvent { + story_id: "50_story_recon".to_string(), + status: "checking".to_string(), + message: "Checking story...".to_string(), + }) + .unwrap(); + + let msg = next_msg(&mut stream).await; + assert_eq!(msg["type"], "reconciliation_progress"); + assert_eq!(msg["story_id"], "50_story_recon"); + assert_eq!(msg["status"], "checking"); + assert_eq!(msg["message"], "Checking story..."); + } + + #[tokio::test] + async fn ws_handler_handles_client_disconnect_gracefully() { + let (url, _ctx) = start_test_server().await; + let (mut sink, _stream, _initial) = connect_ws(&url).await; + + // Close the connection — should not panic the server. + sink.close().await.unwrap(); + + // Give the server a moment to process the close. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Connect again to verify server is still alive. + let (_sink2, _stream2, initial2) = connect_ws(&url).await; + assert_eq!(initial2["type"], "pipeline_state"); + } +}