diff --git a/side-node/src/main.rs b/side-node/src/main.rs index e7c973c..86c3c3a 100644 --- a/side-node/src/main.rs +++ b/side-node/src/main.rs @@ -37,30 +37,21 @@ async fn main() { /// Wire everything up outside the application so we can test more easily later async fn setup(name: &String) -> SideNode { + // First, load up the keys and create a bft-crdt let side_dir = utils::home(name); let keys = keys::load_from_file(side_dir); + let crdt = BaseCrdt::::new(&keys); + + // Channels for internal communication, and a tokio task for stdin input let (incoming_sender, incoming_receiver) = mpsc::channel::(32); - let (stdin_sender, stdin_receiver) = std::sync::mpsc::channel(); - - let (network_sender, network_receiver) = mpsc::channel::(32); - task::spawn(async move { stdin_input(stdin_sender); }); - let crdt = BaseCrdt::::new(&keys); - let node = SideNode::new( - crdt, - keys, - incoming_receiver, - stdin_receiver, - network_sender, - ); - tokio::spawn(async move { - let handle = WebSocketClient::new(incoming_sender, network_receiver).await; - handle.call("start".to_string()).unwrap(); - }); + // Finally, create the node and return it + let handle = WebSocketClient::new(incoming_sender).await; + let node = SideNode::new(crdt, keys, incoming_receiver, stdin_receiver, handle); println!("Node setup complete."); node } diff --git a/side-node/src/node.rs b/side-node/src/node.rs index 4cd2028..52c7092 100644 --- a/side-node/src/node.rs +++ b/side-node/src/node.rs @@ -2,14 +2,14 @@ use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; use fastcrypto::ed25519::Ed25519KeyPair; use tokio::sync::mpsc; -use crate::{crdt::TransactionList, utils}; +use crate::{crdt::TransactionList, utils, websocket::WebSocketClient}; pub(crate) struct SideNode { crdt: BaseCrdt, keys: fastcrypto::ed25519::Ed25519KeyPair, incoming_receiver: mpsc::Receiver, stdin_receiver: std::sync::mpsc::Receiver, - network_sender: mpsc::Sender, + handle: ezsockets::Client, } impl SideNode { @@ -18,14 +18,14 @@ impl SideNode { keys: Ed25519KeyPair, incoming_receiver: mpsc::Receiver, stdin_receiver: std::sync::mpsc::Receiver, - network_sender: mpsc::Sender, + handle: ezsockets::Client, ) -> Self { let node = Self { crdt, keys, incoming_receiver, stdin_receiver, - network_sender, + handle, }; node } @@ -55,7 +55,8 @@ impl SideNode { async fn send_to_network(&self, signed_op: SignedOp) { println!("sending to network: {:?}", signed_op); - self.network_sender.send(signed_op).await.unwrap(); + let to_send = serde_json::to_string(&signed_op).unwrap(); + self.handle.call(to_send).unwrap(); } fn handle_incoming(&mut self, incoming: &SignedOp) { diff --git a/side-node/src/websocket.rs b/side-node/src/websocket.rs index 480da61..67a3f89 100644 --- a/side-node/src/websocket.rs +++ b/side-node/src/websocket.rs @@ -5,7 +5,6 @@ use tokio::sync::mpsc; pub(crate) struct WebSocketClient { incoming_sender: mpsc::Sender, - network_receiver: mpsc::Receiver, handle: ezsockets::Client, } @@ -13,14 +12,12 @@ impl WebSocketClient { /// Start the websocket client pub(crate) async fn new( incoming_sender: mpsc::Sender, - network_receiver: mpsc::Receiver, ) -> ezsockets::Client { tracing_subscriber::fmt::init(); let config = ClientConfig::new("ws://localhost:8080/websocket"); let (handle, future) = ezsockets::connect( |client| WebSocketClient { incoming_sender, - network_receiver, handle: client, }, config, @@ -31,18 +28,6 @@ impl WebSocketClient { }); handle } - - pub(crate) async fn start(&mut self) { - loop { - match self.network_receiver.try_recv() { - Ok(signed_op) => { - let to_send = serde_json::to_string(&signed_op).unwrap(); - self.handle.text(to_send).unwrap(); - } - Err(_) => {} // ignore empty channel errors in this PoC - } - } - } } #[async_trait] @@ -50,9 +35,9 @@ impl ezsockets::ClientExt for WebSocketClient { type Call = String; async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> { + tracing::info!("received text: {text:?}"); let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap(); - tracing::info!("received signed op: {incoming:?}"); - self.incoming_sender.send(incoming).await.unwrap(); + self.incoming_sender.send(incoming).await?; Ok(()) } @@ -62,8 +47,9 @@ impl ezsockets::ClientExt for WebSocketClient { } async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> { - println!("received call: {call}"); - self.start().await; + let to_send = serde_json::to_string(&call).unwrap(); + tracing::info!("sending signed op: {to_send:?}"); + self.handle.text(to_send)?; Ok(()) } }