2026-04-29 18:35:32 +00:00
|
|
|
//! WebSocket transport adapter — accept connection, serialise/deserialise frames,
|
|
|
|
|
//! invoke service methods. No business logic, no inline state transitions.
|
|
|
|
|
|
|
|
|
|
use crate::config::ProjectConfig;
|
|
|
|
|
use crate::http::context::AppContext;
|
|
|
|
|
use crate::llm::chat;
|
|
|
|
|
use crate::service::ws::{self, WsResponse};
|
|
|
|
|
use futures::{SinkExt, StreamExt};
|
|
|
|
|
use poem::handler;
|
|
|
|
|
use poem::web::Data;
|
|
|
|
|
use poem::web::websocket::{Message as WsMessage, WebSocket};
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::sync::{mpsc, oneshot};
|
|
|
|
|
|
|
|
|
|
use crate::http::context::PermissionDecision;
|
|
|
|
|
|
|
|
|
|
// Re-export WizardStepInfo for any downstream code that imports it from here.
|
|
|
|
|
#[allow(unused_imports)]
|
|
|
|
|
pub use crate::service::ws::WizardStepInfo;
|
|
|
|
|
|
|
|
|
|
#[handler]
|
|
|
|
|
/// WebSocket endpoint for streaming chat responses, cancellation, and
|
|
|
|
|
/// filesystem watcher notifications.
|
|
|
|
|
///
|
|
|
|
|
/// Accepts JSON `WsRequest` messages and streams `WsResponse` messages.
|
|
|
|
|
pub async fn ws_handler(ws: WebSocket, ctx: Data<&Arc<AppContext>>) -> impl poem::IntoResponse {
|
|
|
|
|
let ctx = ctx.0.clone();
|
|
|
|
|
ws.on_upgrade(move |socket| async move {
|
|
|
|
|
let (mut sink, mut stream) = socket.split();
|
|
|
|
|
let (tx, mut rx) = mpsc::unbounded_channel::<WsResponse>();
|
|
|
|
|
// Separate channel for pre-serialized messages (e.g. RPC responses).
|
|
|
|
|
let (raw_tx, mut raw_rx) = mpsc::unbounded_channel::<String>();
|
|
|
|
|
|
|
|
|
|
let forward = tokio::spawn(async move {
|
|
|
|
|
loop {
|
|
|
|
|
tokio::select! {
|
|
|
|
|
msg = rx.recv() => match msg {
|
|
|
|
|
Some(msg) => {
|
|
|
|
|
if let Ok(text) = serde_json::to_string(&msg)
|
|
|
|
|
&& sink.send(WsMessage::Text(text)).await.is_err()
|
|
|
|
|
{
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
None => break,
|
|
|
|
|
},
|
|
|
|
|
raw = raw_rx.recv() => match raw {
|
|
|
|
|
Some(text) => {
|
|
|
|
|
if sink.send(WsMessage::Text(text)).await.is_err() {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
None => break,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// ── Initial state burst ────────────────────────────────────────
|
|
|
|
|
if let Some(state) = ws::load_initial_pipeline_state(ctx.as_ref()) {
|
|
|
|
|
let _ = tx.send(state);
|
|
|
|
|
}
|
|
|
|
|
let _ = tx.send(ws::check_onboarding(ctx.as_ref()));
|
|
|
|
|
if let Some(wiz) = ws::load_wizard_state(ctx.as_ref()) {
|
|
|
|
|
let _ = tx.send(wiz);
|
|
|
|
|
}
|
|
|
|
|
for log in ws::load_recent_logs(100) {
|
|
|
|
|
let _ = tx.send(log);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ── Background subscriptions ───────────────────────────────────
|
|
|
|
|
ws::subscribe_logs(tx.clone());
|
|
|
|
|
ws::subscribe_watcher(tx.clone(), ctx.clone(), ctx.watcher_tx.subscribe());
|
|
|
|
|
ws::subscribe_reconciliation(tx.clone(), ctx.reconciliation_tx.subscribe());
|
|
|
|
|
|
|
|
|
|
// Subscribe to the status broadcaster if web UI consumer is enabled (default: true).
|
|
|
|
|
let status_enabled = ctx
|
|
|
|
|
.state
|
|
|
|
|
.get_project_root()
|
|
|
|
|
.ok()
|
|
|
|
|
.and_then(|root| ProjectConfig::load(&root).ok())
|
|
|
|
|
.map(|c| c.web_ui_status_consumer)
|
|
|
|
|
.unwrap_or(true);
|
|
|
|
|
if status_enabled {
|
|
|
|
|
ws::subscribe_status(tx.clone(), ctx.services.status.subscribe());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Map of pending permission request_id -> oneshot responder.
|
|
|
|
|
let mut pending_perms: HashMap<String, oneshot::Sender<PermissionDecision>> =
|
|
|
|
|
HashMap::new();
|
|
|
|
|
|
2026-05-14 14:48:49 +01:00
|
|
|
// Outer loop: wait for the next WebSocket message.
|
|
|
|
|
while let Some(Ok(WsMessage::Text(text))) = stream.next().await {
|
2026-04-29 18:35:32 +00:00
|
|
|
|
|
|
|
|
// Handle read-RPC frames (discriminated by "kind", not "type").
|
2026-05-13 04:43:48 +00:00
|
|
|
if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&text).await {
|
2026-04-29 18:35:32 +00:00
|
|
|
if let Ok(resp_text) = serde_json::to_string(&rpc_resp) {
|
|
|
|
|
let _ = raw_tx.send(resp_text);
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
match ws::dispatch_outer(&text) {
|
|
|
|
|
ws::DispatchResult::StartChat { messages, config } => {
|
|
|
|
|
let tx_updates = tx.clone();
|
|
|
|
|
let tx_tokens = tx.clone();
|
|
|
|
|
let tx_thinking = tx.clone();
|
|
|
|
|
let tx_activity = tx.clone();
|
|
|
|
|
let ctx_clone = ctx.clone();
|
|
|
|
|
|
|
|
|
|
let chat_fut = chat::chat(
|
|
|
|
|
messages,
|
|
|
|
|
config,
|
|
|
|
|
&ctx_clone.state,
|
|
|
|
|
ctx_clone.store.as_ref(),
|
|
|
|
|
move |history| {
|
|
|
|
|
let _ = tx_updates.send(WsResponse::Update {
|
|
|
|
|
messages: history.to_vec(),
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
move |token| {
|
|
|
|
|
let _ = tx_tokens.send(WsResponse::Token {
|
|
|
|
|
content: token.to_string(),
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
move |thinking: &str| {
|
|
|
|
|
let _ = tx_thinking.send(WsResponse::ThinkingToken {
|
|
|
|
|
content: thinking.to_string(),
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
move |tool_name: &str| {
|
|
|
|
|
let _ = tx_activity.send(WsResponse::ToolActivity {
|
|
|
|
|
tool_name: tool_name.to_string(),
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
tokio::pin!(chat_fut);
|
|
|
|
|
|
|
|
|
|
let mut perm_rx = ctx.services.perm_rx.lock().await;
|
|
|
|
|
|
|
|
|
|
let chat_result = loop {
|
|
|
|
|
tokio::select! {
|
|
|
|
|
result = &mut chat_fut => break result,
|
|
|
|
|
|
|
|
|
|
Some(perm_fwd) = perm_rx.recv() => {
|
|
|
|
|
let _ = tx.send(ws::permission_request_response(
|
|
|
|
|
&perm_fwd.request_id,
|
|
|
|
|
&perm_fwd.tool_name,
|
|
|
|
|
&perm_fwd.tool_input,
|
|
|
|
|
));
|
|
|
|
|
pending_perms.insert(
|
|
|
|
|
perm_fwd.request_id,
|
|
|
|
|
perm_fwd.response_tx,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Some(Ok(WsMessage::Text(inner_text))) = stream.next() => {
|
|
|
|
|
// Handle read-RPC frames during active chat.
|
2026-05-13 04:43:48 +00:00
|
|
|
if let Some(rpc_resp) = crate::crdt_sync::try_handle_rpc_text(&inner_text).await {
|
2026-04-29 18:35:32 +00:00
|
|
|
if let Ok(resp_text) = serde_json::to_string(&rpc_resp) {
|
|
|
|
|
let _ = raw_tx.send(resp_text);
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
match ws::dispatch_inner(&inner_text, &mut pending_perms) {
|
|
|
|
|
ws::InnerDispatchResult::CancelChat => {
|
|
|
|
|
let _ = chat::cancel_chat(&ctx.state);
|
|
|
|
|
}
|
|
|
|
|
ws::InnerDispatchResult::Pong => {
|
|
|
|
|
let _ = tx.send(WsResponse::Pong);
|
|
|
|
|
}
|
|
|
|
|
ws::InnerDispatchResult::StartSideQuestion { question, context_messages, config } => {
|
|
|
|
|
let tx_side = tx.clone();
|
|
|
|
|
let store = ctx.store.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
let result = chat::side_question(
|
|
|
|
|
context_messages,
|
|
|
|
|
question,
|
|
|
|
|
config,
|
|
|
|
|
store.as_ref(),
|
|
|
|
|
|token| {
|
|
|
|
|
let _ = tx_side.send(WsResponse::SideQuestionToken {
|
|
|
|
|
content: token.to_string(),
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
).await;
|
|
|
|
|
match result {
|
|
|
|
|
Ok(response) => {
|
|
|
|
|
let _ = tx_side.send(WsResponse::SideQuestionDone { response });
|
|
|
|
|
}
|
|
|
|
|
Err(err) => {
|
|
|
|
|
let _ = tx_side.send(WsResponse::SideQuestionDone {
|
|
|
|
|
response: format!("Error: {err}"),
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
ws::InnerDispatchResult::PermissionResolved
|
|
|
|
|
| ws::InnerDispatchResult::Ignored => {}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
match chat_result {
|
|
|
|
|
Ok(chat_result) => {
|
|
|
|
|
if let Some(sid) = chat_result.session_id {
|
|
|
|
|
let _ = tx.send(WsResponse::SessionId { session_id: sid });
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Err(err) => {
|
|
|
|
|
let _ = tx.send(ws::error_response(err));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ws::DispatchResult::CancelChat => {
|
|
|
|
|
let _ = chat::cancel_chat(&ctx.state);
|
|
|
|
|
}
|
|
|
|
|
ws::DispatchResult::Pong => {
|
|
|
|
|
let _ = tx.send(WsResponse::Pong);
|
|
|
|
|
}
|
|
|
|
|
ws::DispatchResult::IgnoredPermission => {
|
|
|
|
|
// Permission responses outside an active chat are ignored.
|
|
|
|
|
}
|
|
|
|
|
ws::DispatchResult::StartSideQuestion {
|
|
|
|
|
question,
|
|
|
|
|
context_messages,
|
|
|
|
|
config,
|
|
|
|
|
} => {
|
|
|
|
|
let tx_side = tx.clone();
|
|
|
|
|
let store = ctx.store.clone();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
let result = chat::side_question(
|
|
|
|
|
context_messages,
|
|
|
|
|
question,
|
|
|
|
|
config,
|
|
|
|
|
store.as_ref(),
|
|
|
|
|
|token| {
|
|
|
|
|
let _ = tx_side.send(WsResponse::SideQuestionToken {
|
|
|
|
|
content: token.to_string(),
|
|
|
|
|
});
|
|
|
|
|
},
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
match result {
|
|
|
|
|
Ok(response) => {
|
|
|
|
|
let _ =
|
|
|
|
|
tx_side.send(WsResponse::SideQuestionDone { response });
|
|
|
|
|
}
|
|
|
|
|
Err(err) => {
|
|
|
|
|
let _ = tx_side.send(WsResponse::SideQuestionDone {
|
|
|
|
|
response: format!("Error: {err}"),
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
ws::DispatchResult::ParseError(msg) => {
|
|
|
|
|
let _ = tx.send(ws::error_response(msg));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
drop(tx);
|
|
|
|
|
let _ = forward.await;
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests;
|