Refactoring the structure a bit
This commit is contained in:
92
server/src/http/ws.rs
Normal file
92
server/src/http/ws.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use crate::http::context::AppContext;
|
||||
use crate::llm::chat;
|
||||
use crate::llm::types::Message;
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use poem::handler;
|
||||
use poem::web::Data;
|
||||
use poem::web::websocket::{Message as WsMessage, WebSocket};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
enum WsRequest {
|
||||
Chat {
|
||||
messages: Vec<Message>,
|
||||
config: chat::ProviderConfig,
|
||||
},
|
||||
Cancel,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
enum WsResponse {
|
||||
Token { content: String },
|
||||
Update { messages: Vec<Message> },
|
||||
Error { message: String },
|
||||
}
|
||||
|
||||
#[handler]
|
||||
pub async fn ws_handler(ws: WebSocket, ctx: Data<&AppContext>) -> impl poem::IntoResponse {
|
||||
let ctx = ctx.0.clone();
|
||||
ws.on_upgrade(move |socket| async move {
|
||||
let (mut sink, mut stream) = socket.split();
|
||||
let (tx, mut rx) = mpsc::unbounded_channel::<WsResponse>();
|
||||
|
||||
let forward = tokio::spawn(async move {
|
||||
while let Some(msg) = rx.recv().await {
|
||||
if let Ok(text) = serde_json::to_string(&msg)
|
||||
&& sink.send(WsMessage::Text(text)).await.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
while let Some(Ok(msg)) = stream.next().await {
|
||||
if let WsMessage::Text(text) = msg {
|
||||
let parsed: Result<WsRequest, _> = serde_json::from_str(&text);
|
||||
match parsed {
|
||||
Ok(WsRequest::Chat { messages, config }) => {
|
||||
let tx_updates = tx.clone();
|
||||
let tx_tokens = tx.clone();
|
||||
let ctx_clone = ctx.clone();
|
||||
|
||||
let result = chat::chat(
|
||||
messages,
|
||||
config,
|
||||
&ctx_clone.state,
|
||||
ctx_clone.store.as_ref(),
|
||||
|history| {
|
||||
let _ = tx_updates.send(WsResponse::Update {
|
||||
messages: history.to_vec(),
|
||||
});
|
||||
},
|
||||
|token| {
|
||||
let _ = tx_tokens.send(WsResponse::Token {
|
||||
content: token.to_string(),
|
||||
});
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Err(err) = result {
|
||||
let _ = tx.send(WsResponse::Error { message: err });
|
||||
}
|
||||
}
|
||||
Ok(WsRequest::Cancel) => {
|
||||
let _ = chat::cancel_chat(&ctx.state);
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = tx.send(WsResponse::Error {
|
||||
message: format!("Invalid request: {err}"),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(tx);
|
||||
let _ = forward.await;
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user