story-kit: merge 122_story_test_coverage_http_ws_rs

This commit is contained in:
Dave
2026-02-24 00:07:20 +00:00
parent 8a10443a7a
commit ca2097d8e4
4 changed files with 757 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -2156,6 +2156,7 @@ dependencies = [
"strip-ansi-escapes", "strip-ansi-escapes",
"tempfile", "tempfile",
"tokio", "tokio",
"tokio-tungstenite",
"toml", "toml",
"uuid", "uuid",
"walkdir", "walkdir",

View File

@@ -26,4 +26,5 @@ tempfile = "3"
tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync"] }
toml = "1.0.3+spec-1.1.0" toml = "1.0.3+spec-1.1.0"
uuid = { version = "1.21.0", features = ["v4", "serde"] } uuid = { version = "1.21.0", features = ["v4", "serde"] }
tokio-tungstenite = "0.27"
walkdir = "2.5.0" walkdir = "2.5.0"

View File

@@ -31,3 +31,4 @@ walkdir = { workspace = true }
[dev-dependencies] [dev-dependencies]
tempfile = { workspace = true } tempfile = { workspace = true }
tokio-tungstenite = { workspace = true }

View File

@@ -320,3 +320,757 @@ pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem
let _ = forward.await; let _ = forward.await;
}) })
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::http::workflow::{PipelineState, UpcomingStory};
use crate::io::watcher::WatcherEvent;
// ── WsRequest deserialization ────────────────────────────────────
#[test]
fn deserialize_chat_request() {
let json = r#"{
"type": "chat",
"messages": [
{"role": "user", "content": "hello"}
],
"config": {
"provider": "ollama",
"model": "llama3"
}
}"#;
let req: WsRequest = serde_json::from_str(json).unwrap();
match req {
WsRequest::Chat { messages, config } => {
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].content, "hello");
assert_eq!(config.provider, "ollama");
assert_eq!(config.model, "llama3");
}
_ => panic!("expected Chat variant"),
}
}
#[test]
fn deserialize_chat_request_with_optional_fields() {
let json = r#"{
"type": "chat",
"messages": [],
"config": {
"provider": "anthropic",
"model": "claude-3-5-sonnet",
"base_url": "https://api.anthropic.com",
"enable_tools": true,
"session_id": "sess-123"
}
}"#;
let req: WsRequest = serde_json::from_str(json).unwrap();
match req {
WsRequest::Chat { messages, config } => {
assert!(messages.is_empty());
assert_eq!(config.base_url.as_deref(), Some("https://api.anthropic.com"));
assert_eq!(config.enable_tools, Some(true));
assert_eq!(config.session_id.as_deref(), Some("sess-123"));
}
_ => panic!("expected Chat variant"),
}
}
#[test]
fn deserialize_cancel_request() {
let json = r#"{"type": "cancel"}"#;
let req: WsRequest = serde_json::from_str(json).unwrap();
assert!(matches!(req, WsRequest::Cancel));
}
#[test]
fn deserialize_permission_response_approved() {
let json = r#"{
"type": "permission_response",
"request_id": "req-42",
"approved": true
}"#;
let req: WsRequest = serde_json::from_str(json).unwrap();
match req {
WsRequest::PermissionResponse {
request_id,
approved,
} => {
assert_eq!(request_id, "req-42");
assert!(approved);
}
_ => panic!("expected PermissionResponse variant"),
}
}
#[test]
fn deserialize_permission_response_denied() {
let json = r#"{
"type": "permission_response",
"request_id": "req-99",
"approved": false
}"#;
let req: WsRequest = serde_json::from_str(json).unwrap();
match req {
WsRequest::PermissionResponse {
request_id,
approved,
} => {
assert_eq!(request_id, "req-99");
assert!(!approved);
}
_ => panic!("expected PermissionResponse variant"),
}
}
#[test]
fn deserialize_unknown_type_fails() {
let json = r#"{"type": "unknown_type"}"#;
let result: Result<WsRequest, _> = serde_json::from_str(json);
assert!(result.is_err());
}
#[test]
fn deserialize_invalid_json_fails() {
let result: Result<WsRequest, _> = serde_json::from_str("not json");
assert!(result.is_err());
}
#[test]
fn deserialize_missing_type_tag_fails() {
let json = r#"{"messages": [], "config": {"provider": "x", "model": "y"}}"#;
let result: Result<WsRequest, _> = serde_json::from_str(json);
assert!(result.is_err());
}
// ── WsResponse serialization ────────────────────────────────────
#[test]
fn serialize_token_response() {
let resp = WsResponse::Token {
content: "hello world".to_string(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "token");
assert_eq!(json["content"], "hello world");
}
#[test]
fn serialize_update_response() {
let msg = Message {
role: crate::llm::types::Role::Assistant,
content: "response".to_string(),
tool_calls: None,
tool_call_id: None,
};
let resp = WsResponse::Update {
messages: vec![msg],
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "update");
assert_eq!(json["messages"].as_array().unwrap().len(), 1);
assert_eq!(json["messages"][0]["content"], "response");
}
#[test]
fn serialize_session_id_response() {
let resp = WsResponse::SessionId {
session_id: "sess-abc".to_string(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "session_id");
assert_eq!(json["session_id"], "sess-abc");
}
#[test]
fn serialize_error_response() {
let resp = WsResponse::Error {
message: "something broke".to_string(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "error");
assert_eq!(json["message"], "something broke");
}
#[test]
fn serialize_work_item_changed_response() {
let resp = WsResponse::WorkItemChanged {
stage: "2_current".to_string(),
item_id: "42_story_foo".to_string(),
action: "start".to_string(),
commit_msg: "story-kit: start 42_story_foo".to_string(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "work_item_changed");
assert_eq!(json["stage"], "2_current");
assert_eq!(json["item_id"], "42_story_foo");
assert_eq!(json["action"], "start");
assert_eq!(json["commit_msg"], "story-kit: start 42_story_foo");
}
#[test]
fn serialize_pipeline_state_response() {
let story = crate::http::workflow::UpcomingStory {
story_id: "10_story_test".to_string(),
name: Some("Test".to_string()),
error: None,
agent: None,
};
let resp = WsResponse::PipelineState {
upcoming: vec![story],
current: vec![],
qa: vec![],
merge: vec![],
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "pipeline_state");
assert_eq!(json["upcoming"].as_array().unwrap().len(), 1);
assert_eq!(json["upcoming"][0]["story_id"], "10_story_test");
assert!(json["current"].as_array().unwrap().is_empty());
}
#[test]
fn serialize_agent_config_changed_response() {
let resp = WsResponse::AgentConfigChanged;
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "agent_config_changed");
}
#[test]
fn serialize_permission_request_response() {
let resp = WsResponse::PermissionRequest {
request_id: "perm-1".to_string(),
tool_name: "Bash".to_string(),
tool_input: serde_json::json!({"command": "ls"}),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "permission_request");
assert_eq!(json["request_id"], "perm-1");
assert_eq!(json["tool_name"], "Bash");
assert_eq!(json["tool_input"]["command"], "ls");
}
#[test]
fn serialize_tool_activity_response() {
let resp = WsResponse::ToolActivity {
tool_name: "Read".to_string(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "tool_activity");
assert_eq!(json["tool_name"], "Read");
}
#[test]
fn serialize_reconciliation_progress_response() {
let resp = WsResponse::ReconciliationProgress {
story_id: "50_story_x".to_string(),
status: "gates_running".to_string(),
message: "Running clippy...".to_string(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "reconciliation_progress");
assert_eq!(json["story_id"], "50_story_x");
assert_eq!(json["status"], "gates_running");
assert_eq!(json["message"], "Running clippy...");
}
// ── From<WatcherEvent> for Option<WsResponse> ───────────────────
#[test]
fn watcher_work_item_converts_to_ws_response() {
let evt = WatcherEvent::WorkItem {
stage: "2_current".to_string(),
item_id: "42_story_foo".to_string(),
action: "start".to_string(),
commit_msg: "story-kit: start 42_story_foo".to_string(),
};
let ws_msg: Option<WsResponse> = evt.into();
let ws_msg = ws_msg.expect("WorkItem should produce Some");
let json = serde_json::to_value(&ws_msg).unwrap();
assert_eq!(json["type"], "work_item_changed");
assert_eq!(json["stage"], "2_current");
assert_eq!(json["item_id"], "42_story_foo");
assert_eq!(json["action"], "start");
}
#[test]
fn watcher_config_changed_converts_to_ws_response() {
let evt = WatcherEvent::ConfigChanged;
let ws_msg: Option<WsResponse> = evt.into();
let ws_msg = ws_msg.expect("ConfigChanged should produce Some");
let json = serde_json::to_value(&ws_msg).unwrap();
assert_eq!(json["type"], "agent_config_changed");
}
// ── From<PipelineState> for WsResponse ──────────────────────────
#[test]
fn pipeline_state_converts_to_ws_response() {
let state = PipelineState {
upcoming: vec![UpcomingStory {
story_id: "1_story_a".to_string(),
name: Some("Story A".to_string()),
error: None,
agent: None,
}],
current: vec![UpcomingStory {
story_id: "2_story_b".to_string(),
name: Some("Story B".to_string()),
error: None,
agent: None,
}],
qa: vec![],
merge: vec![],
};
let resp: WsResponse = state.into();
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "pipeline_state");
assert_eq!(json["upcoming"].as_array().unwrap().len(), 1);
assert_eq!(json["upcoming"][0]["story_id"], "1_story_a");
assert_eq!(json["current"].as_array().unwrap().len(), 1);
assert_eq!(json["current"][0]["story_id"], "2_story_b");
assert!(json["qa"].as_array().unwrap().is_empty());
assert!(json["merge"].as_array().unwrap().is_empty());
}
#[test]
fn empty_pipeline_state_converts_to_ws_response() {
let state = PipelineState {
upcoming: vec![],
current: vec![],
qa: vec![],
merge: vec![],
};
let resp: WsResponse = state.into();
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "pipeline_state");
assert!(json["upcoming"].as_array().unwrap().is_empty());
assert!(json["current"].as_array().unwrap().is_empty());
assert!(json["qa"].as_array().unwrap().is_empty());
assert!(json["merge"].as_array().unwrap().is_empty());
}
// ── WsResponse JSON round-trip (string form) ────────────────────
#[test]
fn ws_response_serializes_to_parseable_json_string() {
let resp = WsResponse::Error {
message: "test error".to_string(),
};
let text = serde_json::to_string(&resp).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
assert_eq!(parsed["type"], "error");
assert_eq!(parsed["message"], "test error");
}
#[test]
fn ws_response_update_with_empty_messages() {
let resp = WsResponse::Update { messages: vec![] };
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "update");
assert!(json["messages"].as_array().unwrap().is_empty());
}
#[test]
fn ws_response_token_with_empty_content() {
let resp = WsResponse::Token {
content: String::new(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["type"], "token");
assert_eq!(json["content"], "");
}
#[test]
fn ws_response_error_with_special_characters() {
let resp = WsResponse::Error {
message: "error: \"quoted\" & <tagged>".to_string(),
};
let text = serde_json::to_string(&resp).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
assert_eq!(parsed["message"], "error: \"quoted\" & <tagged>");
}
// ── WsRequest edge cases ────────────────────────────────────────
#[test]
fn deserialize_chat_with_multiple_messages() {
let json = r#"{
"type": "chat",
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
{"role": "user", "content": "How are you?"}
],
"config": {
"provider": "ollama",
"model": "llama3"
}
}"#;
let req: WsRequest = serde_json::from_str(json).unwrap();
match req {
WsRequest::Chat { messages, .. } => {
assert_eq!(messages.len(), 4);
assert_eq!(messages[0].role, crate::llm::types::Role::System);
assert_eq!(messages[3].role, crate::llm::types::Role::User);
}
_ => panic!("expected Chat variant"),
}
}
#[test]
fn deserialize_chat_with_tool_call_message() {
let json = r#"{
"type": "chat",
"messages": [
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "read_file",
"arguments": "{\"path\": \"/tmp/test.rs\"}"
}
}
]
}
],
"config": {
"provider": "anthropic",
"model": "claude-3-5-sonnet"
}
}"#;
let req: WsRequest = serde_json::from_str(json).unwrap();
match req {
WsRequest::Chat { messages, .. } => {
assert_eq!(messages.len(), 1);
let tc = messages[0].tool_calls.as_ref().unwrap();
assert_eq!(tc.len(), 1);
assert_eq!(tc[0].function.name, "read_file");
}
_ => panic!("expected Chat variant"),
}
}
// ── Pipeline state with agent assignment ────────────────────────
#[test]
fn pipeline_state_with_agent_converts_correctly() {
let state = PipelineState {
upcoming: vec![],
current: vec![UpcomingStory {
story_id: "10_story_x".to_string(),
name: Some("Story X".to_string()),
error: None,
agent: Some(crate::http::workflow::AgentAssignment {
agent_name: "coder-1".to_string(),
model: Some("claude-3-5-sonnet".to_string()),
status: "running".to_string(),
}),
}],
qa: vec![],
merge: vec![],
};
let resp: WsResponse = state.into();
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["current"][0]["agent"]["agent_name"], "coder-1");
assert_eq!(json["current"][0]["agent"]["model"], "claude-3-5-sonnet");
assert_eq!(json["current"][0]["agent"]["status"], "running");
}
// ── Reconciliation progress done event ──────────────────────────
#[test]
fn reconciliation_done_event_has_empty_story_id() {
let resp = WsResponse::ReconciliationProgress {
story_id: String::new(),
status: "done".to_string(),
message: "Reconciliation complete".to_string(),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["story_id"], "");
assert_eq!(json["status"], "done");
}
// ── ws_handler integration tests (real WebSocket connection) ─────
use futures::stream::SplitSink;
use poem::EndpointExt;
use tokio_tungstenite::tungstenite;
/// Helper: construct a tungstenite text message from a string.
fn ws_text(s: &str) -> tungstenite::Message {
tungstenite::Message::Text(s.into())
}
/// Helper: start a poem server with ws_handler on an ephemeral port
/// and return the WebSocket URL.
async fn start_test_server() -> (String, Arc<AppContext>) {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path().to_path_buf();
// Create minimal pipeline dirs so load_pipeline_state succeeds.
for stage in &["1_upcoming", "2_current", "3_qa", "4_merge"] {
std::fs::create_dir_all(root.join(".story_kit").join("work").join(stage)).unwrap();
}
let ctx = Arc::new(AppContext::new_test(root));
let ctx_data = ctx.clone();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let app = poem::Route::new()
.at("/ws", poem::get(ws_handler))
.data(ctx_data);
tokio::spawn(async move {
let acceptor = poem::listener::TcpAcceptor::from_tokio(listener).unwrap();
let _ = poem::Server::new_with_acceptor(acceptor)
.run(app)
.await;
});
// Small delay to let the server start.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let url = format!("ws://127.0.0.1:{}/ws", addr.port());
(url, ctx)
}
type WsSink = SplitSink<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
tungstenite::Message,
>;
/// Helper: connect and return (sink, stream) plus read the initial
/// pipeline_state message that is always sent on connect.
async fn connect_ws(
url: &str,
) -> (
WsSink,
futures::stream::SplitStream<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
>,
serde_json::Value,
) {
let (ws, _resp) = tokio_tungstenite::connect_async(url).await.unwrap();
let (sink, mut stream) = futures::StreamExt::split(ws);
// The first message should be the initial pipeline_state.
let first = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
.await
.expect("timeout waiting for initial message")
.expect("stream ended")
.expect("ws error");
let initial: serde_json::Value = match first {
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(),
other => panic!("expected text message, got: {other:?}"),
};
(sink, stream, initial)
}
/// Read next text message from the stream with a timeout.
async fn next_msg(
stream: &mut futures::stream::SplitStream<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
>,
) -> serde_json::Value {
let msg = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
.await
.expect("timeout waiting for message")
.expect("stream ended")
.expect("ws error");
match msg {
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(),
other => panic!("expected text message, got: {other:?}"),
}
}
#[tokio::test]
async fn ws_handler_sends_initial_pipeline_state_on_connect() {
let (url, _ctx) = start_test_server().await;
let (_sink, _stream, initial) = connect_ws(&url).await;
assert_eq!(initial["type"], "pipeline_state");
// All stages should be empty arrays since no .md files were created.
assert!(initial["upcoming"].as_array().unwrap().is_empty());
assert!(initial["current"].as_array().unwrap().is_empty());
assert!(initial["qa"].as_array().unwrap().is_empty());
assert!(initial["merge"].as_array().unwrap().is_empty());
}
#[tokio::test]
async fn ws_handler_returns_error_for_invalid_json() {
let (url, _ctx) = start_test_server().await;
let (mut sink, mut stream, _initial) = connect_ws(&url).await;
// Send invalid JSON.
sink.send(ws_text("not valid json"))
.await
.unwrap();
let msg = next_msg(&mut stream).await;
assert_eq!(msg["type"], "error");
assert!(
msg["message"]
.as_str()
.unwrap()
.contains("Invalid request"),
"error message should indicate invalid request, got: {}",
msg["message"]
);
}
#[tokio::test]
async fn ws_handler_returns_error_for_unknown_type() {
let (url, _ctx) = start_test_server().await;
let (mut sink, mut stream, _initial) = connect_ws(&url).await;
// Send a message with an unknown type.
sink.send(ws_text(r#"{"type": "bogus"}"#))
.await
.unwrap();
let msg = next_msg(&mut stream).await;
assert_eq!(msg["type"], "error");
assert!(msg["message"].as_str().unwrap().contains("Invalid request"));
}
#[tokio::test]
async fn ws_handler_cancel_outside_chat_does_not_error() {
let (url, _ctx) = start_test_server().await;
let (mut sink, mut stream, _initial) = connect_ws(&url).await;
// Send cancel when no chat is active — should not produce an error.
sink.send(ws_text(r#"{"type": "cancel"}"#))
.await
.unwrap();
// Send another invalid message to check the connection is still alive.
sink.send(ws_text("{}"))
.await
.unwrap();
let msg = next_msg(&mut stream).await;
// The invalid JSON message should produce an error, confirming
// the cancel didn't break the connection.
assert_eq!(msg["type"], "error");
}
#[tokio::test]
async fn ws_handler_permission_response_outside_chat_is_ignored() {
let (url, _ctx) = start_test_server().await;
let (mut sink, mut stream, _initial) = connect_ws(&url).await;
// Send permission response outside an active chat.
sink.send(ws_text(
r#"{"type": "permission_response", "request_id": "x", "approved": true}"#,
))
.await
.unwrap();
// Send a probe message to check the connection is still alive.
sink.send(ws_text("bad"))
.await
.unwrap();
let msg = next_msg(&mut stream).await;
assert_eq!(msg["type"], "error");
assert!(msg["message"].as_str().unwrap().contains("Invalid request"));
}
#[tokio::test]
async fn ws_handler_forwards_watcher_events() {
let (url, ctx) = start_test_server().await;
let (_sink, mut stream, _initial) = connect_ws(&url).await;
// Broadcast a watcher event.
ctx.watcher_tx
.send(WatcherEvent::WorkItem {
stage: "2_current".to_string(),
item_id: "99_story_test".to_string(),
action: "start".to_string(),
commit_msg: "story-kit: start 99_story_test".to_string(),
})
.unwrap();
let msg = next_msg(&mut stream).await;
assert_eq!(msg["type"], "work_item_changed");
assert_eq!(msg["item_id"], "99_story_test");
assert_eq!(msg["stage"], "2_current");
// After a work-item event, a pipeline_state refresh is pushed.
let state_msg = next_msg(&mut stream).await;
assert_eq!(state_msg["type"], "pipeline_state");
}
#[tokio::test]
async fn ws_handler_forwards_config_changed_without_pipeline_refresh() {
let (url, ctx) = start_test_server().await;
let (_sink, mut stream, _initial) = connect_ws(&url).await;
// Broadcast a config-changed event.
ctx.watcher_tx.send(WatcherEvent::ConfigChanged).unwrap();
let msg = next_msg(&mut stream).await;
assert_eq!(msg["type"], "agent_config_changed");
// Config-changed should NOT be followed by a pipeline_state refresh.
// Send a probe to check no extra message is queued.
ctx.watcher_tx.send(WatcherEvent::ConfigChanged).unwrap();
let msg2 = next_msg(&mut stream).await;
assert_eq!(msg2["type"], "agent_config_changed");
}
#[tokio::test]
async fn ws_handler_forwards_reconciliation_events() {
let (url, ctx) = start_test_server().await;
let (_sink, mut stream, _initial) = connect_ws(&url).await;
// Broadcast a reconciliation event.
ctx.reconciliation_tx
.send(crate::agents::ReconciliationEvent {
story_id: "50_story_recon".to_string(),
status: "checking".to_string(),
message: "Checking story...".to_string(),
})
.unwrap();
let msg = next_msg(&mut stream).await;
assert_eq!(msg["type"], "reconciliation_progress");
assert_eq!(msg["story_id"], "50_story_recon");
assert_eq!(msg["status"], "checking");
assert_eq!(msg["message"], "Checking story...");
}
#[tokio::test]
async fn ws_handler_handles_client_disconnect_gracefully() {
let (url, _ctx) = start_test_server().await;
let (mut sink, _stream, _initial) = connect_ws(&url).await;
// Close the connection — should not panic the server.
sink.close().await.unwrap();
// Give the server a moment to process the close.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Connect again to verify server is still alive.
let (_sink2, _stream2, initial2) = connect_ws(&url).await;
assert_eq!(initial2["type"], "pipeline_state");
}
}