1413 lines
53 KiB
Rust
1413 lines
53 KiB
Rust
use crate::http::context::{AppContext, PermissionDecision};
|
|
use crate::http::workflow::{PipelineState, load_pipeline_state};
|
|
use crate::io::onboarding;
|
|
use crate::io::watcher::WatcherEvent;
|
|
use crate::llm::chat;
|
|
use crate::llm::types::Message;
|
|
use crate::log_buffer;
|
|
use futures::{SinkExt, StreamExt};
|
|
use poem::handler;
|
|
use poem::web::Data;
|
|
use poem::web::websocket::{Message as WsMessage, WebSocket};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use tokio::sync::{mpsc, oneshot};
|
|
|
|
#[derive(Deserialize)]
|
|
#[serde(tag = "type", rename_all = "snake_case")]
|
|
/// WebSocket request messages sent by the client.
|
|
///
|
|
/// - `chat` starts a streaming chat session.
|
|
/// - `cancel` stops the active session.
|
|
/// - `permission_response` approves or denies a pending permission request.
|
|
enum WsRequest {
|
|
Chat {
|
|
messages: Vec<Message>,
|
|
config: chat::ProviderConfig,
|
|
},
|
|
Cancel,
|
|
PermissionResponse {
|
|
request_id: String,
|
|
approved: bool,
|
|
#[serde(default)]
|
|
always_allow: bool,
|
|
},
|
|
/// Heartbeat ping from the client. The server responds with `Pong` so the
|
|
/// client can detect stale (half-closed) connections.
|
|
Ping,
|
|
/// A quick side question answered from current conversation context.
|
|
/// The question and response are NOT added to the conversation history
|
|
/// and no tool calls are made.
|
|
SideQuestion {
|
|
question: String,
|
|
context_messages: Vec<Message>,
|
|
config: chat::ProviderConfig,
|
|
},
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
#[serde(tag = "type", rename_all = "snake_case")]
|
|
/// WebSocket response messages sent by the server.
|
|
///
|
|
/// - `token` streams partial model output.
|
|
/// - `update` pushes the updated message history.
|
|
/// - `error` reports a request or processing failure.
|
|
/// - `work_item_changed` notifies that a `.story_kit/work/` file changed.
|
|
/// - `agent_config_changed` notifies that `.story_kit/project.toml` was modified.
|
|
enum WsResponse {
|
|
Token {
|
|
content: String,
|
|
},
|
|
Update {
|
|
messages: Vec<Message>,
|
|
},
|
|
/// Session ID for Claude Code conversation resumption.
|
|
SessionId {
|
|
session_id: String,
|
|
},
|
|
Error {
|
|
message: String,
|
|
},
|
|
/// Filesystem watcher notification: a work-pipeline file was created or
|
|
/// modified and auto-committed. The frontend can use this to refresh its
|
|
/// story/bug list without polling.
|
|
WorkItemChanged {
|
|
stage: String,
|
|
item_id: String,
|
|
action: String,
|
|
commit_msg: String,
|
|
},
|
|
/// Full pipeline state pushed on connect and after every work-item watcher event.
|
|
PipelineState {
|
|
backlog: Vec<crate::http::workflow::UpcomingStory>,
|
|
current: Vec<crate::http::workflow::UpcomingStory>,
|
|
qa: Vec<crate::http::workflow::UpcomingStory>,
|
|
merge: Vec<crate::http::workflow::UpcomingStory>,
|
|
done: Vec<crate::http::workflow::UpcomingStory>,
|
|
},
|
|
/// `.story_kit/project.toml` was modified; the frontend should re-fetch the
|
|
/// agent roster. Does NOT trigger a pipeline state refresh.
|
|
AgentConfigChanged,
|
|
/// An agent's state changed (started, stopped, completed, etc.).
|
|
/// Triggers a pipeline state refresh and tells the frontend to re-fetch
|
|
/// the agent list.
|
|
AgentStateChanged,
|
|
/// Claude Code is requesting user approval before executing a tool.
|
|
PermissionRequest {
|
|
request_id: String,
|
|
tool_name: String,
|
|
tool_input: serde_json::Value,
|
|
},
|
|
/// The agent started assembling a tool call; shows live status in the UI.
|
|
ToolActivity {
|
|
tool_name: String,
|
|
},
|
|
/// Real-time progress from the server startup reconciliation pass.
|
|
/// `status` is one of: "checking", "gates_running", "advanced", "skipped",
|
|
/// "failed", "done". `story_id` is empty for the overall "done" event.
|
|
ReconciliationProgress {
|
|
story_id: String,
|
|
status: String,
|
|
message: String,
|
|
},
|
|
/// Heartbeat response to a client `Ping`. Lets the client confirm the
|
|
/// connection is alive and cancel any stale-connection timeout.
|
|
Pong,
|
|
/// Streaming thinking token from an extended-thinking block.
|
|
/// Sent separately from `Token` so the frontend can render them in
|
|
/// a constrained, scrollable ThinkingBlock rather than inline.
|
|
ThinkingToken {
|
|
content: String,
|
|
},
|
|
/// Sent on connect when the project's spec files still contain scaffold
|
|
/// placeholder content and the user needs to go through onboarding.
|
|
OnboardingStatus {
|
|
needs_onboarding: bool,
|
|
},
|
|
/// Streaming token from a `/btw` side question response.
|
|
SideQuestionToken {
|
|
content: String,
|
|
},
|
|
/// Final signal that the `/btw` side question has been fully answered.
|
|
SideQuestionDone {
|
|
response: String,
|
|
},
|
|
/// A single server log entry. Sent in bulk on connect (recent history),
|
|
/// then streamed live as new entries arrive.
|
|
LogEntry {
|
|
timestamp: String,
|
|
level: String,
|
|
message: String,
|
|
},
|
|
}
|
|
|
|
impl From<WatcherEvent> for Option<WsResponse> {
|
|
fn from(e: WatcherEvent) -> Self {
|
|
match e {
|
|
WatcherEvent::WorkItem {
|
|
stage,
|
|
item_id,
|
|
action,
|
|
commit_msg,
|
|
} => Some(WsResponse::WorkItemChanged {
|
|
stage,
|
|
item_id,
|
|
action,
|
|
commit_msg,
|
|
}),
|
|
WatcherEvent::ConfigChanged => Some(WsResponse::AgentConfigChanged),
|
|
WatcherEvent::AgentStateChanged => Some(WsResponse::AgentStateChanged),
|
|
// MergeFailure is handled by the Matrix notification listener only;
|
|
// no WebSocket message is needed for the frontend.
|
|
WatcherEvent::MergeFailure { .. } => None,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<PipelineState> for WsResponse {
|
|
fn from(s: PipelineState) -> Self {
|
|
WsResponse::PipelineState {
|
|
backlog: s.backlog,
|
|
current: s.current,
|
|
qa: s.qa,
|
|
merge: s.merge,
|
|
done: s.done,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[handler]
|
|
/// WebSocket endpoint for streaming chat responses, cancellation, and
|
|
/// filesystem watcher notifications.
|
|
///
|
|
/// Accepts JSON `WsRequest` messages and streams `WsResponse` messages.
|
|
pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem::IntoResponse {
|
|
let ctx = ctx.0.clone();
|
|
ws.on_upgrade(move |socket| async move {
|
|
let (mut sink, mut stream) = socket.split();
|
|
let (tx, mut rx) = mpsc::unbounded_channel::<WsResponse>();
|
|
|
|
let forward = tokio::spawn(async move {
|
|
while let Some(msg) = rx.recv().await {
|
|
if let Ok(text) = serde_json::to_string(&msg)
|
|
&& sink.send(WsMessage::Text(text)).await.is_err()
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Push initial pipeline state to the client on connect.
|
|
if let Ok(state) = load_pipeline_state(ctx.as_ref()) {
|
|
let _ = tx.send(state.into());
|
|
}
|
|
|
|
// Push onboarding status so the frontend knows whether to show
|
|
// the onboarding welcome flow.
|
|
{
|
|
let needs = ctx
|
|
.state
|
|
.get_project_root()
|
|
.map(|root| onboarding::check_onboarding_status(&root).needs_onboarding())
|
|
.unwrap_or(false);
|
|
let _ = tx.send(WsResponse::OnboardingStatus {
|
|
needs_onboarding: needs,
|
|
});
|
|
}
|
|
|
|
// Push recent server log entries so the client has history on connect.
|
|
{
|
|
let entries = log_buffer::global().get_recent_entries(100, None, None);
|
|
for entry in entries {
|
|
let _ = tx.send(WsResponse::LogEntry {
|
|
timestamp: entry.timestamp,
|
|
level: entry.level.as_str().to_string(),
|
|
message: entry.message,
|
|
});
|
|
}
|
|
}
|
|
|
|
// Subscribe to live log entries and forward them to the client.
|
|
let tx_logs = tx.clone();
|
|
let mut log_rx = log_buffer::global().subscribe();
|
|
tokio::spawn(async move {
|
|
loop {
|
|
match log_rx.recv().await {
|
|
Ok(entry) => {
|
|
if tx_logs
|
|
.send(WsResponse::LogEntry {
|
|
timestamp: entry.timestamp,
|
|
level: entry.level.as_str().to_string(),
|
|
message: entry.message,
|
|
})
|
|
.is_err()
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
|
}
|
|
}
|
|
});
|
|
|
|
// Subscribe to filesystem watcher events and forward them to the client.
|
|
// After each work-item event, also push the updated pipeline state.
|
|
// Config-changed events are forwarded as-is without a pipeline refresh.
|
|
let tx_watcher = tx.clone();
|
|
let ctx_watcher = ctx.clone();
|
|
let mut watcher_rx = ctx.watcher_tx.subscribe();
|
|
tokio::spawn(async move {
|
|
loop {
|
|
match watcher_rx.recv().await {
|
|
Ok(evt) => {
|
|
let needs_pipeline_refresh = matches!(
|
|
evt,
|
|
crate::io::watcher::WatcherEvent::WorkItem { .. }
|
|
| crate::io::watcher::WatcherEvent::AgentStateChanged
|
|
);
|
|
let ws_msg: Option<WsResponse> = evt.into();
|
|
if let Some(msg) = ws_msg && tx_watcher.send(msg).is_err() {
|
|
break;
|
|
}
|
|
// Push refreshed pipeline state after work-item changes and
|
|
// agent state changes (so the board updates agent lozenges).
|
|
if needs_pipeline_refresh
|
|
&& let Ok(state) = load_pipeline_state(ctx_watcher.as_ref())
|
|
&& tx_watcher.send(state.into()).is_err()
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
// Lagged: skip missed events, keep going.
|
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
|
}
|
|
}
|
|
});
|
|
|
|
// Subscribe to startup reconciliation events and forward them to the client.
|
|
let tx_reconcile = tx.clone();
|
|
let mut reconcile_rx = ctx.reconciliation_tx.subscribe();
|
|
tokio::spawn(async move {
|
|
loop {
|
|
match reconcile_rx.recv().await {
|
|
Ok(evt) => {
|
|
if tx_reconcile
|
|
.send(WsResponse::ReconciliationProgress {
|
|
story_id: evt.story_id,
|
|
status: evt.status,
|
|
message: evt.message,
|
|
})
|
|
.is_err()
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
|
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
|
|
}
|
|
}
|
|
});
|
|
|
|
// Map of pending permission request_id → oneshot responder.
|
|
// Permission requests arrive from the MCP `prompt_permission` tool via
|
|
// `ctx.perm_rx` and are forwarded to the client as `PermissionRequest`.
|
|
// When the client responds, we resolve the corresponding oneshot.
|
|
let mut pending_perms: HashMap<String, oneshot::Sender<PermissionDecision>> = HashMap::new();
|
|
|
|
loop {
|
|
// Outer loop: wait for the next WebSocket message.
|
|
let Some(Ok(WsMessage::Text(text))) = stream.next().await else {
|
|
break;
|
|
};
|
|
|
|
let parsed: Result<WsRequest, _> = serde_json::from_str(&text);
|
|
match parsed {
|
|
Ok(WsRequest::Chat { messages, config }) => {
|
|
let tx_updates = tx.clone();
|
|
let tx_tokens = tx.clone();
|
|
let tx_thinking = tx.clone();
|
|
let tx_activity = tx.clone();
|
|
let ctx_clone = ctx.clone();
|
|
|
|
// Build the chat future without driving it yet so we can
|
|
// interleave it with permission-request forwarding.
|
|
let chat_fut = chat::chat(
|
|
messages,
|
|
config,
|
|
&ctx_clone.state,
|
|
ctx_clone.store.as_ref(),
|
|
move |history| {
|
|
let _ = tx_updates.send(WsResponse::Update {
|
|
messages: history.to_vec(),
|
|
});
|
|
},
|
|
move |token| {
|
|
let _ = tx_tokens.send(WsResponse::Token {
|
|
content: token.to_string(),
|
|
});
|
|
},
|
|
move |thinking: &str| {
|
|
let _ = tx_thinking.send(WsResponse::ThinkingToken {
|
|
content: thinking.to_string(),
|
|
});
|
|
},
|
|
move |tool_name: &str| {
|
|
let _ = tx_activity.send(WsResponse::ToolActivity {
|
|
tool_name: tool_name.to_string(),
|
|
});
|
|
},
|
|
);
|
|
tokio::pin!(chat_fut);
|
|
|
|
// Lock the permission receiver for the duration of this chat
|
|
// session. Permission requests from the MCP tool arrive here.
|
|
let mut perm_rx = ctx.perm_rx.lock().await;
|
|
|
|
// Inner loop: drive the chat while concurrently handling
|
|
// permission requests (from MCP) and WebSocket messages.
|
|
let chat_result = loop {
|
|
tokio::select! {
|
|
result = &mut chat_fut => break result,
|
|
|
|
// Forward permission requests from MCP tool to the client.
|
|
Some(perm_fwd) = perm_rx.recv() => {
|
|
let _ = tx.send(WsResponse::PermissionRequest {
|
|
request_id: perm_fwd.request_id.clone(),
|
|
tool_name: perm_fwd.tool_name.clone(),
|
|
tool_input: perm_fwd.tool_input.clone(),
|
|
});
|
|
pending_perms.insert(
|
|
perm_fwd.request_id,
|
|
perm_fwd.response_tx,
|
|
);
|
|
}
|
|
|
|
// Handle WebSocket messages during an active chat
|
|
// (permission responses and cancellations).
|
|
Some(Ok(WsMessage::Text(inner_text))) = stream.next() => {
|
|
match serde_json::from_str::<WsRequest>(&inner_text) {
|
|
Ok(WsRequest::PermissionResponse { request_id, approved, always_allow }) => {
|
|
if let Some(resp_tx) = pending_perms.remove(&request_id) {
|
|
let decision = if always_allow {
|
|
PermissionDecision::AlwaysAllow
|
|
} else if approved {
|
|
PermissionDecision::Approve
|
|
} else {
|
|
PermissionDecision::Deny
|
|
};
|
|
let _ = resp_tx.send(decision);
|
|
}
|
|
}
|
|
Ok(WsRequest::Cancel) => {
|
|
let _ = chat::cancel_chat(&ctx.state);
|
|
}
|
|
Ok(WsRequest::Ping) => {
|
|
let _ = tx.send(WsResponse::Pong);
|
|
}
|
|
Ok(WsRequest::SideQuestion { question, context_messages, config }) => {
|
|
let tx_side = tx.clone();
|
|
let store = ctx.store.clone();
|
|
tokio::spawn(async move {
|
|
let result = chat::side_question(
|
|
context_messages,
|
|
question,
|
|
config,
|
|
store.as_ref(),
|
|
|token| {
|
|
let _ = tx_side.send(WsResponse::SideQuestionToken {
|
|
content: token.to_string(),
|
|
});
|
|
},
|
|
).await;
|
|
match result {
|
|
Ok(response) => {
|
|
let _ = tx_side.send(WsResponse::SideQuestionDone { response });
|
|
}
|
|
Err(err) => {
|
|
let _ = tx_side.send(WsResponse::SideQuestionDone {
|
|
response: format!("Error: {err}"),
|
|
});
|
|
}
|
|
}
|
|
});
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
match chat_result {
|
|
Ok(chat_result) => {
|
|
if let Some(sid) = chat_result.session_id {
|
|
let _ = tx.send(WsResponse::SessionId { session_id: sid });
|
|
}
|
|
}
|
|
Err(err) => {
|
|
let _ = tx.send(WsResponse::Error { message: err });
|
|
}
|
|
}
|
|
}
|
|
Ok(WsRequest::Cancel) => {
|
|
let _ = chat::cancel_chat(&ctx.state);
|
|
}
|
|
Ok(WsRequest::Ping) => {
|
|
let _ = tx.send(WsResponse::Pong);
|
|
}
|
|
Ok(WsRequest::PermissionResponse { .. }) => {
|
|
// Permission responses outside an active chat are ignored.
|
|
}
|
|
Ok(WsRequest::SideQuestion {
|
|
question,
|
|
context_messages,
|
|
config,
|
|
}) => {
|
|
let tx_side = tx.clone();
|
|
let store = ctx.store.clone();
|
|
tokio::spawn(async move {
|
|
let result = chat::side_question(
|
|
context_messages,
|
|
question,
|
|
config,
|
|
store.as_ref(),
|
|
|token| {
|
|
let _ = tx_side.send(WsResponse::SideQuestionToken {
|
|
content: token.to_string(),
|
|
});
|
|
},
|
|
)
|
|
.await;
|
|
match result {
|
|
Ok(response) => {
|
|
let _ = tx_side
|
|
.send(WsResponse::SideQuestionDone { response });
|
|
}
|
|
Err(err) => {
|
|
let _ = tx_side.send(WsResponse::SideQuestionDone {
|
|
response: format!("Error: {err}"),
|
|
});
|
|
}
|
|
}
|
|
});
|
|
}
|
|
Err(err) => {
|
|
let _ = tx.send(WsResponse::Error {
|
|
message: format!("Invalid request: {err}"),
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
drop(tx);
|
|
let _ = forward.await;
|
|
})
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::http::workflow::{PipelineState, UpcomingStory};
|
|
use crate::io::watcher::WatcherEvent;
|
|
|
|
// ── WsRequest deserialization ────────────────────────────────────
|
|
|
|
#[test]
|
|
fn deserialize_chat_request() {
|
|
let json = r#"{
|
|
"type": "chat",
|
|
"messages": [
|
|
{"role": "user", "content": "hello"}
|
|
],
|
|
"config": {
|
|
"provider": "ollama",
|
|
"model": "llama3"
|
|
}
|
|
}"#;
|
|
let req: WsRequest = serde_json::from_str(json).unwrap();
|
|
match req {
|
|
WsRequest::Chat { messages, config } => {
|
|
assert_eq!(messages.len(), 1);
|
|
assert_eq!(messages[0].content, "hello");
|
|
assert_eq!(config.provider, "ollama");
|
|
assert_eq!(config.model, "llama3");
|
|
}
|
|
_ => panic!("expected Chat variant"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_chat_request_with_optional_fields() {
|
|
let json = r#"{
|
|
"type": "chat",
|
|
"messages": [],
|
|
"config": {
|
|
"provider": "anthropic",
|
|
"model": "claude-3-5-sonnet",
|
|
"base_url": "https://api.anthropic.com",
|
|
"enable_tools": true,
|
|
"session_id": "sess-123"
|
|
}
|
|
}"#;
|
|
let req: WsRequest = serde_json::from_str(json).unwrap();
|
|
match req {
|
|
WsRequest::Chat { messages, config } => {
|
|
assert!(messages.is_empty());
|
|
assert_eq!(config.base_url.as_deref(), Some("https://api.anthropic.com"));
|
|
assert_eq!(config.enable_tools, Some(true));
|
|
assert_eq!(config.session_id.as_deref(), Some("sess-123"));
|
|
}
|
|
_ => panic!("expected Chat variant"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_cancel_request() {
|
|
let json = r#"{"type": "cancel"}"#;
|
|
let req: WsRequest = serde_json::from_str(json).unwrap();
|
|
assert!(matches!(req, WsRequest::Cancel));
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_ping_request() {
|
|
let json = r#"{"type": "ping"}"#;
|
|
let req: WsRequest = serde_json::from_str(json).unwrap();
|
|
assert!(matches!(req, WsRequest::Ping));
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_permission_response_approved() {
|
|
let json = r#"{
|
|
"type": "permission_response",
|
|
"request_id": "req-42",
|
|
"approved": true
|
|
}"#;
|
|
let req: WsRequest = serde_json::from_str(json).unwrap();
|
|
match req {
|
|
WsRequest::PermissionResponse {
|
|
request_id,
|
|
approved,
|
|
always_allow,
|
|
} => {
|
|
assert_eq!(request_id, "req-42");
|
|
assert!(approved);
|
|
assert!(!always_allow);
|
|
}
|
|
_ => panic!("expected PermissionResponse variant"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_permission_response_denied() {
|
|
let json = r#"{
|
|
"type": "permission_response",
|
|
"request_id": "req-99",
|
|
"approved": false
|
|
}"#;
|
|
let req: WsRequest = serde_json::from_str(json).unwrap();
|
|
match req {
|
|
WsRequest::PermissionResponse {
|
|
request_id,
|
|
approved,
|
|
always_allow,
|
|
} => {
|
|
assert_eq!(request_id, "req-99");
|
|
assert!(!approved);
|
|
assert!(!always_allow);
|
|
}
|
|
_ => panic!("expected PermissionResponse variant"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_permission_response_always_allow() {
|
|
let json = r#"{
|
|
"type": "permission_response",
|
|
"request_id": "req-100",
|
|
"approved": true,
|
|
"always_allow": true
|
|
}"#;
|
|
let req: WsRequest = serde_json::from_str(json).unwrap();
|
|
match req {
|
|
WsRequest::PermissionResponse {
|
|
request_id,
|
|
approved,
|
|
always_allow,
|
|
} => {
|
|
assert_eq!(request_id, "req-100");
|
|
assert!(approved);
|
|
assert!(always_allow);
|
|
}
|
|
_ => panic!("expected PermissionResponse variant"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_unknown_type_fails() {
|
|
let json = r#"{"type": "unknown_type"}"#;
|
|
let result: Result<WsRequest, _> = serde_json::from_str(json);
|
|
assert!(result.is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_invalid_json_fails() {
|
|
let result: Result<WsRequest, _> = serde_json::from_str("not json");
|
|
assert!(result.is_err());
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_missing_type_tag_fails() {
|
|
let json = r#"{"messages": [], "config": {"provider": "x", "model": "y"}}"#;
|
|
let result: Result<WsRequest, _> = serde_json::from_str(json);
|
|
assert!(result.is_err());
|
|
}
|
|
|
|
// ── WsResponse serialization ────────────────────────────────────
|
|
|
|
#[test]
|
|
fn serialize_token_response() {
|
|
let resp = WsResponse::Token {
|
|
content: "hello world".to_string(),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "token");
|
|
assert_eq!(json["content"], "hello world");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_update_response() {
|
|
let msg = Message {
|
|
role: crate::llm::types::Role::Assistant,
|
|
content: "response".to_string(),
|
|
tool_calls: None,
|
|
tool_call_id: None,
|
|
};
|
|
let resp = WsResponse::Update {
|
|
messages: vec![msg],
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "update");
|
|
assert_eq!(json["messages"].as_array().unwrap().len(), 1);
|
|
assert_eq!(json["messages"][0]["content"], "response");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_session_id_response() {
|
|
let resp = WsResponse::SessionId {
|
|
session_id: "sess-abc".to_string(),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "session_id");
|
|
assert_eq!(json["session_id"], "sess-abc");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_error_response() {
|
|
let resp = WsResponse::Error {
|
|
message: "something broke".to_string(),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "error");
|
|
assert_eq!(json["message"], "something broke");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_work_item_changed_response() {
|
|
let resp = WsResponse::WorkItemChanged {
|
|
stage: "2_current".to_string(),
|
|
item_id: "42_story_foo".to_string(),
|
|
action: "start".to_string(),
|
|
commit_msg: "story-kit: start 42_story_foo".to_string(),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "work_item_changed");
|
|
assert_eq!(json["stage"], "2_current");
|
|
assert_eq!(json["item_id"], "42_story_foo");
|
|
assert_eq!(json["action"], "start");
|
|
assert_eq!(json["commit_msg"], "story-kit: start 42_story_foo");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_pipeline_state_response() {
|
|
let story = crate::http::workflow::UpcomingStory {
|
|
story_id: "10_story_test".to_string(),
|
|
name: Some("Test".to_string()),
|
|
error: None,
|
|
merge_failure: None,
|
|
agent: None,
|
|
review_hold: None,
|
|
qa: None,
|
|
retry_count: None,
|
|
blocked: None,
|
|
};
|
|
let resp = WsResponse::PipelineState {
|
|
backlog: vec![story],
|
|
current: vec![],
|
|
qa: vec![],
|
|
merge: vec![],
|
|
done: vec![],
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "pipeline_state");
|
|
assert_eq!(json["backlog"].as_array().unwrap().len(), 1);
|
|
assert_eq!(json["backlog"][0]["story_id"], "10_story_test");
|
|
assert!(json["current"].as_array().unwrap().is_empty());
|
|
assert!(json["done"].as_array().unwrap().is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_agent_config_changed_response() {
|
|
let resp = WsResponse::AgentConfigChanged;
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "agent_config_changed");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_pong_response() {
|
|
let resp = WsResponse::Pong;
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "pong");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_thinking_token_response() {
|
|
let resp = WsResponse::ThinkingToken {
|
|
content: "I need to think about this...".to_string(),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "thinking_token");
|
|
assert_eq!(json["content"], "I need to think about this...");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_onboarding_status_true() {
|
|
let resp = WsResponse::OnboardingStatus {
|
|
needs_onboarding: true,
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "onboarding_status");
|
|
assert_eq!(json["needs_onboarding"], true);
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_onboarding_status_false() {
|
|
let resp = WsResponse::OnboardingStatus {
|
|
needs_onboarding: false,
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "onboarding_status");
|
|
assert_eq!(json["needs_onboarding"], false);
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_permission_request_response() {
|
|
let resp = WsResponse::PermissionRequest {
|
|
request_id: "perm-1".to_string(),
|
|
tool_name: "Bash".to_string(),
|
|
tool_input: serde_json::json!({"command": "ls"}),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "permission_request");
|
|
assert_eq!(json["request_id"], "perm-1");
|
|
assert_eq!(json["tool_name"], "Bash");
|
|
assert_eq!(json["tool_input"]["command"], "ls");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_tool_activity_response() {
|
|
let resp = WsResponse::ToolActivity {
|
|
tool_name: "Read".to_string(),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "tool_activity");
|
|
assert_eq!(json["tool_name"], "Read");
|
|
}
|
|
|
|
#[test]
|
|
fn serialize_reconciliation_progress_response() {
|
|
let resp = WsResponse::ReconciliationProgress {
|
|
story_id: "50_story_x".to_string(),
|
|
status: "gates_running".to_string(),
|
|
message: "Running clippy...".to_string(),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "reconciliation_progress");
|
|
assert_eq!(json["story_id"], "50_story_x");
|
|
assert_eq!(json["status"], "gates_running");
|
|
assert_eq!(json["message"], "Running clippy...");
|
|
}
|
|
|
|
// ── From<WatcherEvent> for Option<WsResponse> ───────────────────
|
|
|
|
#[test]
|
|
fn watcher_work_item_converts_to_ws_response() {
|
|
let evt = WatcherEvent::WorkItem {
|
|
stage: "2_current".to_string(),
|
|
item_id: "42_story_foo".to_string(),
|
|
action: "start".to_string(),
|
|
commit_msg: "story-kit: start 42_story_foo".to_string(),
|
|
};
|
|
let ws_msg: Option<WsResponse> = evt.into();
|
|
let ws_msg = ws_msg.expect("WorkItem should produce Some");
|
|
let json = serde_json::to_value(&ws_msg).unwrap();
|
|
assert_eq!(json["type"], "work_item_changed");
|
|
assert_eq!(json["stage"], "2_current");
|
|
assert_eq!(json["item_id"], "42_story_foo");
|
|
assert_eq!(json["action"], "start");
|
|
}
|
|
|
|
#[test]
|
|
fn watcher_config_changed_converts_to_ws_response() {
|
|
let evt = WatcherEvent::ConfigChanged;
|
|
let ws_msg: Option<WsResponse> = evt.into();
|
|
let ws_msg = ws_msg.expect("ConfigChanged should produce Some");
|
|
let json = serde_json::to_value(&ws_msg).unwrap();
|
|
assert_eq!(json["type"], "agent_config_changed");
|
|
}
|
|
|
|
// ── From<PipelineState> for WsResponse ──────────────────────────
|
|
|
|
#[test]
|
|
fn pipeline_state_converts_to_ws_response() {
|
|
let state = PipelineState {
|
|
backlog: vec![UpcomingStory {
|
|
story_id: "1_story_a".to_string(),
|
|
name: Some("Story A".to_string()),
|
|
error: None,
|
|
merge_failure: None,
|
|
agent: None,
|
|
review_hold: None,
|
|
qa: None,
|
|
retry_count: None,
|
|
blocked: None,
|
|
}],
|
|
current: vec![UpcomingStory {
|
|
story_id: "2_story_b".to_string(),
|
|
name: Some("Story B".to_string()),
|
|
error: None,
|
|
merge_failure: None,
|
|
agent: None,
|
|
review_hold: None,
|
|
qa: None,
|
|
retry_count: None,
|
|
blocked: None,
|
|
}],
|
|
qa: vec![],
|
|
merge: vec![],
|
|
done: vec![UpcomingStory {
|
|
story_id: "50_story_done".to_string(),
|
|
name: Some("Done Story".to_string()),
|
|
error: None,
|
|
merge_failure: None,
|
|
agent: None,
|
|
review_hold: None,
|
|
qa: None,
|
|
retry_count: None,
|
|
blocked: None,
|
|
}],
|
|
};
|
|
let resp: WsResponse = state.into();
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "pipeline_state");
|
|
assert_eq!(json["backlog"].as_array().unwrap().len(), 1);
|
|
assert_eq!(json["backlog"][0]["story_id"], "1_story_a");
|
|
assert_eq!(json["current"].as_array().unwrap().len(), 1);
|
|
assert_eq!(json["current"][0]["story_id"], "2_story_b");
|
|
assert!(json["qa"].as_array().unwrap().is_empty());
|
|
assert!(json["merge"].as_array().unwrap().is_empty());
|
|
assert_eq!(json["done"].as_array().unwrap().len(), 1);
|
|
assert_eq!(json["done"][0]["story_id"], "50_story_done");
|
|
}
|
|
|
|
#[test]
|
|
fn empty_pipeline_state_converts_to_ws_response() {
|
|
let state = PipelineState {
|
|
backlog: vec![],
|
|
current: vec![],
|
|
qa: vec![],
|
|
merge: vec![],
|
|
done: vec![],
|
|
};
|
|
let resp: WsResponse = state.into();
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "pipeline_state");
|
|
assert!(json["backlog"].as_array().unwrap().is_empty());
|
|
assert!(json["current"].as_array().unwrap().is_empty());
|
|
assert!(json["qa"].as_array().unwrap().is_empty());
|
|
assert!(json["merge"].as_array().unwrap().is_empty());
|
|
assert!(json["done"].as_array().unwrap().is_empty());
|
|
}
|
|
|
|
// ── WsResponse JSON round-trip (string form) ────────────────────
|
|
|
|
#[test]
|
|
fn ws_response_serializes_to_parseable_json_string() {
|
|
let resp = WsResponse::Error {
|
|
message: "test error".to_string(),
|
|
};
|
|
let text = serde_json::to_string(&resp).unwrap();
|
|
let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
|
|
assert_eq!(parsed["type"], "error");
|
|
assert_eq!(parsed["message"], "test error");
|
|
}
|
|
|
|
#[test]
|
|
fn ws_response_update_with_empty_messages() {
|
|
let resp = WsResponse::Update { messages: vec![] };
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "update");
|
|
assert!(json["messages"].as_array().unwrap().is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn ws_response_token_with_empty_content() {
|
|
let resp = WsResponse::Token {
|
|
content: String::new(),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["type"], "token");
|
|
assert_eq!(json["content"], "");
|
|
}
|
|
|
|
#[test]
|
|
fn ws_response_error_with_special_characters() {
|
|
let resp = WsResponse::Error {
|
|
message: "error: \"quoted\" & <tagged>".to_string(),
|
|
};
|
|
let text = serde_json::to_string(&resp).unwrap();
|
|
let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
|
|
assert_eq!(parsed["message"], "error: \"quoted\" & <tagged>");
|
|
}
|
|
|
|
// ── WsRequest edge cases ────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn deserialize_chat_with_multiple_messages() {
|
|
let json = r#"{
|
|
"type": "chat",
|
|
"messages": [
|
|
{"role": "system", "content": "You are a helpful assistant."},
|
|
{"role": "user", "content": "Hello"},
|
|
{"role": "assistant", "content": "Hi there!"},
|
|
{"role": "user", "content": "How are you?"}
|
|
],
|
|
"config": {
|
|
"provider": "ollama",
|
|
"model": "llama3"
|
|
}
|
|
}"#;
|
|
let req: WsRequest = serde_json::from_str(json).unwrap();
|
|
match req {
|
|
WsRequest::Chat { messages, .. } => {
|
|
assert_eq!(messages.len(), 4);
|
|
assert_eq!(messages[0].role, crate::llm::types::Role::System);
|
|
assert_eq!(messages[3].role, crate::llm::types::Role::User);
|
|
}
|
|
_ => panic!("expected Chat variant"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn deserialize_chat_with_tool_call_message() {
|
|
let json = r#"{
|
|
"type": "chat",
|
|
"messages": [
|
|
{
|
|
"role": "assistant",
|
|
"content": "",
|
|
"tool_calls": [
|
|
{
|
|
"id": "call_1",
|
|
"type": "function",
|
|
"function": {
|
|
"name": "read_file",
|
|
"arguments": "{\"path\": \"/tmp/test.rs\"}"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
],
|
|
"config": {
|
|
"provider": "anthropic",
|
|
"model": "claude-3-5-sonnet"
|
|
}
|
|
}"#;
|
|
let req: WsRequest = serde_json::from_str(json).unwrap();
|
|
match req {
|
|
WsRequest::Chat { messages, .. } => {
|
|
assert_eq!(messages.len(), 1);
|
|
let tc = messages[0].tool_calls.as_ref().unwrap();
|
|
assert_eq!(tc.len(), 1);
|
|
assert_eq!(tc[0].function.name, "read_file");
|
|
}
|
|
_ => panic!("expected Chat variant"),
|
|
}
|
|
}
|
|
|
|
// ── Pipeline state with agent assignment ────────────────────────
|
|
|
|
#[test]
|
|
fn pipeline_state_with_agent_converts_correctly() {
|
|
let state = PipelineState {
|
|
backlog: vec![],
|
|
current: vec![UpcomingStory {
|
|
story_id: "10_story_x".to_string(),
|
|
name: Some("Story X".to_string()),
|
|
error: None,
|
|
merge_failure: None,
|
|
agent: Some(crate::http::workflow::AgentAssignment {
|
|
agent_name: "coder-1".to_string(),
|
|
model: Some("claude-3-5-sonnet".to_string()),
|
|
status: "running".to_string(),
|
|
}),
|
|
review_hold: None,
|
|
qa: None,
|
|
retry_count: None,
|
|
blocked: None,
|
|
}],
|
|
qa: vec![],
|
|
merge: vec![],
|
|
done: vec![],
|
|
};
|
|
let resp: WsResponse = state.into();
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["current"][0]["agent"]["agent_name"], "coder-1");
|
|
assert_eq!(json["current"][0]["agent"]["model"], "claude-3-5-sonnet");
|
|
assert_eq!(json["current"][0]["agent"]["status"], "running");
|
|
}
|
|
|
|
// ── Reconciliation progress done event ──────────────────────────
|
|
|
|
#[test]
|
|
fn reconciliation_done_event_has_empty_story_id() {
|
|
let resp = WsResponse::ReconciliationProgress {
|
|
story_id: String::new(),
|
|
status: "done".to_string(),
|
|
message: "Reconciliation complete".to_string(),
|
|
};
|
|
let json = serde_json::to_value(&resp).unwrap();
|
|
assert_eq!(json["story_id"], "");
|
|
assert_eq!(json["status"], "done");
|
|
}
|
|
|
|
// ── ws_handler integration tests (real WebSocket connection) ─────
|
|
|
|
use futures::stream::SplitSink;
|
|
use poem::EndpointExt;
|
|
use tokio_tungstenite::tungstenite;
|
|
|
|
/// Helper: construct a tungstenite text message from a string.
|
|
fn ws_text(s: &str) -> tungstenite::Message {
|
|
tungstenite::Message::Text(s.into())
|
|
}
|
|
|
|
/// Helper: start a poem server with ws_handler on an ephemeral port
|
|
/// and return the WebSocket URL.
|
|
async fn start_test_server() -> (String, Arc<AppContext>) {
|
|
let tmp = tempfile::tempdir().unwrap();
|
|
let root = tmp.path().to_path_buf();
|
|
|
|
// Create minimal pipeline dirs so load_pipeline_state succeeds.
|
|
for stage in &["1_backlog", "2_current", "3_qa", "4_merge"] {
|
|
std::fs::create_dir_all(root.join(".story_kit").join("work").join(stage)).unwrap();
|
|
}
|
|
|
|
let ctx = Arc::new(AppContext::new_test(root));
|
|
let ctx_data = ctx.clone();
|
|
|
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let addr = listener.local_addr().unwrap();
|
|
|
|
let app = poem::Route::new()
|
|
.at("/ws", poem::get(ws_handler))
|
|
.data(ctx_data);
|
|
|
|
tokio::spawn(async move {
|
|
let acceptor = poem::listener::TcpAcceptor::from_tokio(listener).unwrap();
|
|
let _ = poem::Server::new_with_acceptor(acceptor)
|
|
.run(app)
|
|
.await;
|
|
});
|
|
|
|
// Small delay to let the server start.
|
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
|
|
|
let url = format!("ws://127.0.0.1:{}/ws", addr.port());
|
|
(url, ctx)
|
|
}
|
|
|
|
type WsSink = SplitSink<
|
|
tokio_tungstenite::WebSocketStream<
|
|
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
|
|
>,
|
|
tungstenite::Message,
|
|
>;
|
|
|
|
/// Helper: connect and return (sink, stream) plus read the initial
|
|
/// pipeline_state and onboarding_status messages that are always sent
|
|
/// on connect.
|
|
async fn connect_ws(
|
|
url: &str,
|
|
) -> (
|
|
WsSink,
|
|
futures::stream::SplitStream<
|
|
tokio_tungstenite::WebSocketStream<
|
|
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
|
|
>,
|
|
>,
|
|
serde_json::Value,
|
|
) {
|
|
let (ws, _resp) = tokio_tungstenite::connect_async(url).await.unwrap();
|
|
let (sink, mut stream) = futures::StreamExt::split(ws);
|
|
|
|
// The first message should be the initial pipeline_state.
|
|
let first = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
|
|
.await
|
|
.expect("timeout waiting for initial message")
|
|
.expect("stream ended")
|
|
.expect("ws error");
|
|
|
|
let initial: serde_json::Value = match first {
|
|
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(),
|
|
other => panic!("expected text message, got: {other:?}"),
|
|
};
|
|
|
|
// The second message is the onboarding_status — consume it so
|
|
// callers only see application-level messages.
|
|
let second = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
|
|
.await
|
|
.expect("timeout waiting for onboarding_status")
|
|
.expect("stream ended")
|
|
.expect("ws error");
|
|
let onboarding: serde_json::Value = match second {
|
|
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(),
|
|
other => panic!("expected text message, got: {other:?}"),
|
|
};
|
|
assert_eq!(
|
|
onboarding["type"], "onboarding_status",
|
|
"expected onboarding_status, got: {onboarding}"
|
|
);
|
|
|
|
// Drain any log_entry messages sent as initial history on connect.
|
|
// These are buffered before tests send their own requests.
|
|
loop {
|
|
// Use a very short timeout: if nothing arrives quickly, the burst is done.
|
|
let Ok(Some(Ok(msg))) =
|
|
tokio::time::timeout(std::time::Duration::from_millis(200), stream.next()).await
|
|
else {
|
|
break;
|
|
};
|
|
let val: serde_json::Value = match msg {
|
|
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(),
|
|
_ => break,
|
|
};
|
|
if val["type"] != "log_entry" {
|
|
// Unexpected non-log message during drain — this shouldn't happen.
|
|
panic!("unexpected message during log drain: {val}");
|
|
}
|
|
}
|
|
|
|
(sink, stream, initial)
|
|
}
|
|
|
|
/// Read next non-log_entry text message from the stream with a timeout.
|
|
/// Skips any `log_entry` messages that arrive between events.
|
|
async fn next_msg(
|
|
stream: &mut futures::stream::SplitStream<
|
|
tokio_tungstenite::WebSocketStream<
|
|
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
|
|
>,
|
|
>,
|
|
) -> serde_json::Value {
|
|
loop {
|
|
let msg = tokio::time::timeout(std::time::Duration::from_secs(2), stream.next())
|
|
.await
|
|
.expect("timeout waiting for message")
|
|
.expect("stream ended")
|
|
.expect("ws error");
|
|
let val: serde_json::Value = match msg {
|
|
tungstenite::Message::Text(t) => serde_json::from_str(t.as_ref()).unwrap(),
|
|
other => panic!("expected text message, got: {other:?}"),
|
|
};
|
|
if val["type"] != "log_entry" {
|
|
return val;
|
|
}
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ws_handler_sends_initial_pipeline_state_on_connect() {
|
|
let (url, _ctx) = start_test_server().await;
|
|
let (_sink, _stream, initial) = connect_ws(&url).await;
|
|
|
|
assert_eq!(initial["type"], "pipeline_state");
|
|
// All stages should be empty arrays since no .md files were created.
|
|
assert!(initial["backlog"].as_array().unwrap().is_empty());
|
|
assert!(initial["current"].as_array().unwrap().is_empty());
|
|
assert!(initial["qa"].as_array().unwrap().is_empty());
|
|
assert!(initial["merge"].as_array().unwrap().is_empty());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ws_handler_returns_error_for_invalid_json() {
|
|
let (url, _ctx) = start_test_server().await;
|
|
let (mut sink, mut stream, _initial) = connect_ws(&url).await;
|
|
|
|
// Send invalid JSON.
|
|
sink.send(ws_text("not valid json"))
|
|
.await
|
|
.unwrap();
|
|
|
|
let msg = next_msg(&mut stream).await;
|
|
assert_eq!(msg["type"], "error");
|
|
assert!(
|
|
msg["message"]
|
|
.as_str()
|
|
.unwrap()
|
|
.contains("Invalid request"),
|
|
"error message should indicate invalid request, got: {}",
|
|
msg["message"]
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ws_handler_returns_error_for_unknown_type() {
|
|
let (url, _ctx) = start_test_server().await;
|
|
let (mut sink, mut stream, _initial) = connect_ws(&url).await;
|
|
|
|
// Send a message with an unknown type.
|
|
sink.send(ws_text(r#"{"type": "bogus"}"#))
|
|
.await
|
|
.unwrap();
|
|
|
|
let msg = next_msg(&mut stream).await;
|
|
assert_eq!(msg["type"], "error");
|
|
assert!(msg["message"].as_str().unwrap().contains("Invalid request"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ws_handler_cancel_outside_chat_does_not_error() {
|
|
let (url, _ctx) = start_test_server().await;
|
|
let (mut sink, mut stream, _initial) = connect_ws(&url).await;
|
|
|
|
// Send cancel when no chat is active — should not produce an error.
|
|
sink.send(ws_text(r#"{"type": "cancel"}"#))
|
|
.await
|
|
.unwrap();
|
|
|
|
// Send another invalid message to check the connection is still alive.
|
|
sink.send(ws_text("{}"))
|
|
.await
|
|
.unwrap();
|
|
|
|
let msg = next_msg(&mut stream).await;
|
|
// The invalid JSON message should produce an error, confirming
|
|
// the cancel didn't break the connection.
|
|
assert_eq!(msg["type"], "error");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ws_handler_permission_response_outside_chat_is_ignored() {
|
|
let (url, _ctx) = start_test_server().await;
|
|
let (mut sink, mut stream, _initial) = connect_ws(&url).await;
|
|
|
|
// Send permission response outside an active chat.
|
|
sink.send(ws_text(
|
|
r#"{"type": "permission_response", "request_id": "x", "approved": true}"#,
|
|
))
|
|
.await
|
|
.unwrap();
|
|
|
|
// Send a probe message to check the connection is still alive.
|
|
sink.send(ws_text("bad"))
|
|
.await
|
|
.unwrap();
|
|
|
|
let msg = next_msg(&mut stream).await;
|
|
assert_eq!(msg["type"], "error");
|
|
assert!(msg["message"].as_str().unwrap().contains("Invalid request"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ws_handler_forwards_watcher_events() {
|
|
let (url, ctx) = start_test_server().await;
|
|
let (_sink, mut stream, _initial) = connect_ws(&url).await;
|
|
|
|
// Broadcast a watcher event.
|
|
ctx.watcher_tx
|
|
.send(WatcherEvent::WorkItem {
|
|
stage: "2_current".to_string(),
|
|
item_id: "99_story_test".to_string(),
|
|
action: "start".to_string(),
|
|
commit_msg: "story-kit: start 99_story_test".to_string(),
|
|
})
|
|
.unwrap();
|
|
|
|
let msg = next_msg(&mut stream).await;
|
|
assert_eq!(msg["type"], "work_item_changed");
|
|
assert_eq!(msg["item_id"], "99_story_test");
|
|
assert_eq!(msg["stage"], "2_current");
|
|
|
|
// After a work-item event, a pipeline_state refresh is pushed.
|
|
let state_msg = next_msg(&mut stream).await;
|
|
assert_eq!(state_msg["type"], "pipeline_state");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ws_handler_forwards_config_changed_without_pipeline_refresh() {
|
|
let (url, ctx) = start_test_server().await;
|
|
let (_sink, mut stream, _initial) = connect_ws(&url).await;
|
|
|
|
// Broadcast a config-changed event.
|
|
ctx.watcher_tx.send(WatcherEvent::ConfigChanged).unwrap();
|
|
|
|
let msg = next_msg(&mut stream).await;
|
|
assert_eq!(msg["type"], "agent_config_changed");
|
|
|
|
// Config-changed should NOT be followed by a pipeline_state refresh.
|
|
// Send a probe to check no extra message is queued.
|
|
ctx.watcher_tx.send(WatcherEvent::ConfigChanged).unwrap();
|
|
let msg2 = next_msg(&mut stream).await;
|
|
assert_eq!(msg2["type"], "agent_config_changed");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ws_handler_forwards_reconciliation_events() {
|
|
let (url, ctx) = start_test_server().await;
|
|
let (_sink, mut stream, _initial) = connect_ws(&url).await;
|
|
|
|
// Broadcast a reconciliation event.
|
|
ctx.reconciliation_tx
|
|
.send(crate::agents::ReconciliationEvent {
|
|
story_id: "50_story_recon".to_string(),
|
|
status: "checking".to_string(),
|
|
message: "Checking story...".to_string(),
|
|
})
|
|
.unwrap();
|
|
|
|
let msg = next_msg(&mut stream).await;
|
|
assert_eq!(msg["type"], "reconciliation_progress");
|
|
assert_eq!(msg["story_id"], "50_story_recon");
|
|
assert_eq!(msg["status"], "checking");
|
|
assert_eq!(msg["message"], "Checking story...");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn ws_handler_handles_client_disconnect_gracefully() {
|
|
let (url, _ctx) = start_test_server().await;
|
|
let (mut sink, _stream, _initial) = connect_ws(&url).await;
|
|
|
|
// Close the connection — should not panic the server.
|
|
sink.close().await.unwrap();
|
|
|
|
// Give the server a moment to process the close.
|
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
|
|
|
// Connect again to verify server is still alive.
|
|
let (_sink2, _stream2, initial2) = connect_ws(&url).await;
|
|
assert_eq!(initial2["type"], "pipeline_state");
|
|
}
|
|
}
|