diff --git a/side-node/src/main.rs b/side-node/src/main.rs index 3e07c35..2b50c12 100644 --- a/side-node/src/main.rs +++ b/side-node/src/main.rs @@ -1,8 +1,10 @@ +use std::{io::BufRead, thread}; + use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; use cli::{parse_args, Commands}; use crdt::TransactionList; use node::SideNode; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task}; use websocket::WebSocketClient; pub(crate) mod cli; @@ -38,13 +40,31 @@ async fn setup(name: &String) -> SideNode { let side_dir = utils::home(name); let keys = keys::load_from_file(side_dir); let (incoming_sender, incoming_receiver) = mpsc::channel::(32); + + let (stdin_sender, stdin_receiver) = std::sync::mpsc::channel(); + task::spawn(async move { + stdin_input(stdin_sender); + }); + let crdt = BaseCrdt::::new(&keys); - let node = SideNode::new(crdt, keys, incoming_receiver); - WebSocketClient::start(incoming_sender).await; + let node = SideNode::new(crdt, keys, incoming_receiver, stdin_receiver); + tokio::spawn(async move { + WebSocketClient::start(incoming_sender).await; + }); println!("Node setup complete."); node } +fn stdin_input(stdin_input_sender: std::sync::mpsc::Sender) { + let stdin = std::io::stdin(); + let lines = stdin.lock().lines(); + for line in lines { + println!("We're in stdin_input"); + let line = line.unwrap(); + stdin_input_sender.send(line).unwrap(); + } +} + // async fn maybe_autosend( // autosend: bool, // handle: ezsockets::Client, diff --git a/side-node/src/node.rs b/side-node/src/node.rs index b4f7247..c3bd046 100644 --- a/side-node/src/node.rs +++ b/side-node/src/node.rs @@ -8,6 +8,7 @@ pub(crate) struct SideNode { crdt: BaseCrdt, keys: fastcrypto::ed25519::Ed25519KeyPair, incoming_receiver: mpsc::Receiver, + stdin_receiver: std::sync::mpsc::Receiver, } impl SideNode { @@ -15,25 +16,37 @@ impl SideNode { crdt: BaseCrdt, keys: Ed25519KeyPair, incoming_receiver: mpsc::Receiver, + stdin_receiver: std::sync::mpsc::Receiver, ) -> Self { let node = Self { crdt, keys, incoming_receiver, + stdin_receiver, }; node } pub(crate) async fn start(&mut self) { println!("Starting node..."); + loop { - let incoming = self.incoming_receiver.recv().await.unwrap(); - println!("Received incoming message: {:?}", incoming); - self.handle_incoming(incoming); + match self.stdin_receiver.try_recv() { + Ok(stdin) => { + println!("Received stdin input: {:?}", stdin); + } + Err(_) => {} + } + match self.incoming_receiver.try_recv() { + Ok(incoming) => { + self.handle_incoming(&incoming); + } + Err(_) => {} + } } } - fn handle_incoming(&mut self, incoming: SignedOp) { + fn handle_incoming(&mut self, incoming: &SignedOp) { println!("WINNNINGINGINGINGINGIGNIGN"); self.crdt.apply(incoming.clone()); } diff --git a/side-node/src/websocket.rs b/side-node/src/websocket.rs index 56709c3..fa8e7c1 100644 --- a/side-node/src/websocket.rs +++ b/side-node/src/websocket.rs @@ -1,11 +1,8 @@ use async_trait::async_trait; use bft_json_crdt::json_crdt::SignedOp; use ezsockets::ClientConfig; -use std::io::BufRead; use tokio::sync::mpsc; -use crate::utils; - pub(crate) struct WebSocketClient { incoming_sender: mpsc::Sender, } @@ -14,8 +11,7 @@ impl WebSocketClient { /// Start the websocket client pub(crate) async fn start( incoming_sender: mpsc::Sender, - // stdin_sender: mpsc::Sender, - ) -> tokio::task::JoinHandle<()> { + ) -> ezsockets::Client { tracing_subscriber::fmt::init(); let config = ClientConfig::new("ws://localhost:8080/websocket"); let (handle, future) = @@ -23,27 +19,8 @@ impl WebSocketClient { tokio::spawn(async move { future.await.unwrap(); }); - tokio::spawn(async move { - let stdin = std::io::stdin(); - let lines = stdin.lock().lines(); - for line in lines { - println!("We don't get here until we send a message"); - let line = line.unwrap(); - let signed_op = if let "exit" = line.as_str() { - break; - // } else if let "trace" = line.as_str() { - // node.trace_crdt(); - // continue; - } else { - let fake = utils::fake_transaction("foo123".to_string()); - // stdin_sender.send(fake).await.unwrap(); - fake - }; - tracing::info!("sending {:?}", signed_op); - let json = serde_json::to_string(&signed_op).unwrap(); - handle.text(json).unwrap(); - } - }) + loop {} + handle } }