Files
bft-crdt-experiment/side-node/src/websocket.rs

69 lines
2.2 KiB
Rust
Raw Normal View History

2024-06-06 19:32:29 +01:00
use async_trait::async_trait;
use bft_json_crdt::json_crdt::SignedOp;
2024-06-06 19:32:29 +01:00
use ezsockets::ClientConfig;
use tokio::sync::mpsc;
2024-06-06 19:32:29 +01:00
use crate::utils;
pub struct WebSocketClient {
incoming_sender: mpsc::Sender<SignedOp>,
2024-06-11 18:13:51 +01:00
handle: ezsockets::Client<WebSocketClient>,
}
impl WebSocketClient {
/// Start the websocket client
pub async fn new(
2024-06-10 16:43:45 +01:00
incoming_sender: mpsc::Sender<SignedOp>,
) -> ezsockets::Client<WebSocketClient> {
let config = ClientConfig::new("ws://localhost:8080/websocket");
2024-06-11 18:13:51 +01:00
let (handle, future) = ezsockets::connect(
|client| WebSocketClient {
incoming_sender,
handle: client,
},
config,
)
.await;
tokio::spawn(async move {
future.await.unwrap();
});
handle
}
}
2024-06-06 19:32:29 +01:00
#[async_trait]
impl ezsockets::ClientExt for WebSocketClient {
2024-06-11 18:33:08 +01:00
// Right now we're only using the Call type for sending signed ops
// change this to an enum if we need to send other types of calls, and
// match on it.
2024-06-11 18:13:51 +01:00
type Call = String;
2024-06-06 19:32:29 +01:00
2024-06-11 18:42:13 +01:00
/// When we receive a text message, apply the crdt operation contained in it to our
/// local crdt.
2024-06-06 19:32:29 +01:00
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
let string_sha = utils::shassy(text.clone());
println!("received text, sha: {string_sha}");
let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap();
let object_sha = utils::shappy(incoming.clone());
println!("deserialized: {}", object_sha);
if string_sha != object_sha {
2024-06-18 16:00:02 +01:00
panic!("sha mismatch: {string_sha} != {object_sha}, bft-crdt has failed");
}
self.incoming_sender.send(incoming).await?;
2024-06-06 19:32:29 +01:00
Ok(())
}
2024-06-11 18:42:13 +01:00
/// When we receive a binary message, log the bytes. Currently unused.
2024-06-06 19:32:29 +01:00
async fn on_binary(&mut self, bytes: Vec<u8>) -> Result<(), ezsockets::Error> {
tracing::info!("received bytes: {bytes:?}");
Ok(())
}
2024-06-11 18:42:13 +01:00
/// Call this with the `Call` type to send application data to the websocket client
/// (and from there, to the server).
2024-06-06 19:32:29 +01:00
async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> {
2024-06-11 18:33:08 +01:00
self.handle.text(call)?;
2024-06-06 19:32:29 +01:00
Ok(())
}
}