From b1daec3b844e5cf517c06b68f6fa6c380a41ce40 Mon Sep 17 00:00:00 2001 From: Dave Hrycyszyn Date: Tue, 11 Jun 2024 18:13:51 +0100 Subject: [PATCH] Figured out what `on_call` is for --- side-node/src/main.rs | 14 ++++++++++++-- side-node/src/node.rs | 18 +++++++++++------- side-node/src/websocket.rs | 35 ++++++++++++++++++++++++++++------- 3 files changed, 51 insertions(+), 16 deletions(-) diff --git a/side-node/src/main.rs b/side-node/src/main.rs index 438462d..e7c973c 100644 --- a/side-node/src/main.rs +++ b/side-node/src/main.rs @@ -42,14 +42,24 @@ async fn setup(name: &String) -> SideNode { let (incoming_sender, incoming_receiver) = mpsc::channel::(32); let (stdin_sender, stdin_receiver) = std::sync::mpsc::channel(); + + let (network_sender, network_receiver) = mpsc::channel::(32); + task::spawn(async move { stdin_input(stdin_sender); }); let crdt = BaseCrdt::::new(&keys); - let node = SideNode::new(crdt, keys, incoming_receiver, stdin_receiver); + let node = SideNode::new( + crdt, + keys, + incoming_receiver, + stdin_receiver, + network_sender, + ); tokio::spawn(async move { - WebSocketClient::start(incoming_sender).await; + let handle = WebSocketClient::new(incoming_sender, network_receiver).await; + handle.call("start".to_string()).unwrap(); }); println!("Node setup complete."); node diff --git a/side-node/src/node.rs b/side-node/src/node.rs index fab29c3..4cd2028 100644 --- a/side-node/src/node.rs +++ b/side-node/src/node.rs @@ -9,6 +9,7 @@ pub(crate) struct SideNode { keys: fastcrypto::ed25519::Ed25519KeyPair, incoming_receiver: mpsc::Receiver, stdin_receiver: std::sync::mpsc::Receiver, + network_sender: mpsc::Sender, } impl SideNode { @@ -17,12 +18,14 @@ impl SideNode { keys: Ed25519KeyPair, incoming_receiver: mpsc::Receiver, stdin_receiver: std::sync::mpsc::Receiver, + network_sender: mpsc::Sender, ) -> Self { let node = Self { crdt, keys, incoming_receiver, stdin_receiver, + network_sender, }; node } @@ -31,27 +34,28 @@ impl SideNode { println!("Starting node..."); loop { - match self.stdin_receiver.recv() { + match self.stdin_receiver.try_recv() { Ok(stdin) => { println!("Received stdin input: {:?}", stdin); let transaction = utils::fake_transaction(stdin); let json = serde_json::to_value(transaction).unwrap(); - let signed_op = self._add_transaction_local(json); - self.send_to_network(signed_op); + let signed_op = self.add_transaction_local(json); + self.send_to_network(signed_op).await; } - Err(_) => {} + Err(_) => {} // ignore empty channel errors in this PoC } match self.incoming_receiver.try_recv() { Ok(incoming) => { self.handle_incoming(&incoming); } - Err(_) => {} + Err(_) => {} // ignore empty channel errors in this PoC } } } - fn send_to_network(&self, signed_op: SignedOp) { + async fn send_to_network(&self, signed_op: SignedOp) { println!("sending to network: {:?}", signed_op); + self.network_sender.send(signed_op).await.unwrap(); } fn handle_incoming(&mut self, incoming: &SignedOp) { @@ -59,7 +63,7 @@ impl SideNode { self.crdt.apply(incoming.clone()); } - pub(crate) fn _add_transaction_local( + pub(crate) fn add_transaction_local( &mut self, transaction: serde_json::Value, ) -> bft_json_crdt::json_crdt::SignedOp { diff --git a/side-node/src/websocket.rs b/side-node/src/websocket.rs index fa8e7c1..480da61 100644 --- a/side-node/src/websocket.rs +++ b/side-node/src/websocket.rs @@ -5,31 +5,51 @@ use tokio::sync::mpsc; pub(crate) struct WebSocketClient { incoming_sender: mpsc::Sender, + network_receiver: mpsc::Receiver, + handle: ezsockets::Client, } impl WebSocketClient { /// Start the websocket client - pub(crate) async fn start( + pub(crate) async fn new( incoming_sender: mpsc::Sender, + network_receiver: mpsc::Receiver, ) -> ezsockets::Client { tracing_subscriber::fmt::init(); let config = ClientConfig::new("ws://localhost:8080/websocket"); - let (handle, future) = - ezsockets::connect(|_client| WebSocketClient { incoming_sender }, config).await; + let (handle, future) = ezsockets::connect( + |client| WebSocketClient { + incoming_sender, + network_receiver, + handle: client, + }, + config, + ) + .await; tokio::spawn(async move { future.await.unwrap(); }); - loop {} handle } + + pub(crate) async fn start(&mut self) { + loop { + match self.network_receiver.try_recv() { + Ok(signed_op) => { + let to_send = serde_json::to_string(&signed_op).unwrap(); + self.handle.text(to_send).unwrap(); + } + Err(_) => {} // ignore empty channel errors in this PoC + } + } + } } #[async_trait] impl ezsockets::ClientExt for WebSocketClient { - type Call = (); + type Call = String; async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> { - tracing::info!("received message: {text}"); let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap(); tracing::info!("received signed op: {incoming:?}"); self.incoming_sender.send(incoming).await.unwrap(); @@ -42,7 +62,8 @@ impl ezsockets::ClientExt for WebSocketClient { } async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> { - let () = call; + println!("received call: {call}"); + self.start().await; Ok(()) } }