From 4717ffa7e8a5e770324003b42fe78806c0ae7a49 Mon Sep 17 00:00:00 2001 From: Dave Hrycyszyn Date: Mon, 10 Jun 2024 14:25:05 +0100 Subject: [PATCH] Almost working, I've got a blocking I/O problem with stdin now :) --- side-node/src/main.rs | 19 +++++++------ side-node/src/node.rs | 35 ++++++++++++++++++------ side-node/src/websocket.rs | 56 +++++++++++++++++++++++--------------- 3 files changed, 70 insertions(+), 40 deletions(-) diff --git a/side-node/src/main.rs b/side-node/src/main.rs index b7aa8de..6b6252b 100644 --- a/side-node/src/main.rs +++ b/side-node/src/main.rs @@ -1,8 +1,8 @@ -use bft_json_crdt::json_crdt::BaseCrdt; +use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; use cli::{parse_args, Commands}; use crdt::TransactionList; -use fastcrypto::ed25519::Ed25519KeyPair; use node::SideNode; +use tokio::sync::mpsc; use websocket::WebSocketClient; pub(crate) mod cli; @@ -26,20 +26,21 @@ async fn main() { let _ = init::init(utils::home(name), config); } Some(Commands::Run { name }) => { - let (crdt, websocket_client, keys) = setup(name); - let side_node = &mut SideNode::new(websocket_client, crdt, keys); - side_node.start().await; + let mut node = setup(name).await; + node.start().await; } None => println!("No command provided. Exiting. See --help for more information."), } } /// Wire everything up outside the application so we can test more easily later -fn setup(name: &String) -> (BaseCrdt, WebSocketClient, Ed25519KeyPair) { +async fn setup(name: &String) -> SideNode { let side_dir = utils::home(name); let keys = keys::load_from_file(side_dir); - let websocket_client = WebSocketClient::new(); + let (incoming_sender, incoming_receiver) = mpsc::channel::(32); let crdt = BaseCrdt::::new(&keys); - - (crdt, websocket_client, keys) + let mut node = SideNode::new(crdt, keys, incoming_receiver); + println!("Node setup complete."); + WebSocketClient::start(&mut node, incoming_sender).await; + node } diff --git a/side-node/src/node.rs b/side-node/src/node.rs index 300765a..008cbfb 100644 --- a/side-node/src/node.rs +++ b/side-node/src/node.rs @@ -1,32 +1,44 @@ -use bft_json_crdt::json_crdt::BaseCrdt; +use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; use fastcrypto::ed25519::Ed25519KeyPair; +use tokio::sync::mpsc; -use crate::{crdt::TransactionList, websocket::WebSocketClient}; +use crate::crdt::TransactionList; pub(crate) struct SideNode { crdt: BaseCrdt, keys: fastcrypto::ed25519::Ed25519KeyPair, - websocket_client: WebSocketClient, + incoming_receiver: mpsc::Receiver, } impl SideNode { pub(crate) fn new( - websocket_client: WebSocketClient, crdt: BaseCrdt, keys: Ed25519KeyPair, + incoming_receiver: mpsc::Receiver, ) -> Self { - Self { + let node = Self { crdt, keys, - websocket_client, - } + incoming_receiver, + }; + node } pub(crate) async fn start(&mut self) { - self.websocket_client.start().await; + println!("Starting node..."); + loop { + let incoming = self.incoming_receiver.recv().await.unwrap(); + println!("Received incoming message: {:?}", incoming); + self.handle_incoming(incoming); + } } - fn add_transaction_local( + fn handle_incoming(&mut self, incoming: SignedOp) { + println!("WINNNINGINGINGINGINGIGNIGN"); + self.crdt.apply(incoming.clone()); + } + + pub(crate) fn add_transaction_local( &mut self, transaction: serde_json::Value, ) -> bft_json_crdt::json_crdt::SignedOp { @@ -45,4 +57,9 @@ impl SideNode { .sign(&self.keys); signed_op } + + /// Print the current state of the CRDT, can be used to debug + pub(crate) fn trace_crdt(&self) { + println!("{:?}", self.crdt.doc.list); + } } diff --git a/side-node/src/websocket.rs b/side-node/src/websocket.rs index a358084..8ef7ae3 100644 --- a/side-node/src/websocket.rs +++ b/side-node/src/websocket.rs @@ -1,35 +1,47 @@ use async_trait::async_trait; +use bft_json_crdt::json_crdt::SignedOp; use ezsockets::ClientConfig; -use std::io::BufRead; +use tokio::fs::File; +use tokio::io; +use tokio::sync::mpsc; -pub(crate) struct WebSocketClient {} +use crate::{node::SideNode, utils}; + +pub(crate) struct WebSocketClient { + incoming_sender: mpsc::Sender, +} impl WebSocketClient { - pub(crate) fn new() -> Self { - Self {} + pub(crate) fn new(incoming_sender: mpsc::Sender) -> Self { + Self { incoming_sender } } /// Start the websocket client - pub(crate) async fn start(&mut self) { + pub(crate) async fn start(node: &mut SideNode, incoming_sender: mpsc::Sender) { tracing_subscriber::fmt::init(); let config = ClientConfig::new("ws://localhost:8080/websocket"); - let (handle, future) = ezsockets::connect(|_client| WebSocketClient {}, config).await; + let (handle, future) = + ezsockets::connect(|_client| WebSocketClient { incoming_sender }, config).await; tokio::spawn(async move { future.await.unwrap(); }); - let stdin = std::io::stdin(); - let lines = stdin.lock().lines(); - for line in lines { - let line = line.unwrap(); - let signed_op = if let "exit" = line.as_str() { - break; - } else { - // list_transaction_crdt::create(bft_crdt, &keys) - }; - tracing::info!("sending {:?}", signed_op); - let json = serde_json::to_string(&signed_op).unwrap(); - handle.text(json).unwrap(); - } + let stdin = tokio::io::stdin(); + // let lines = stdin.lock().lines(); + // + let mut reader = FramedRead::new(stdin, LinesCodec::new()); + let line = reader.next().await.transpose()?.unwrap(); + let signed_op = if let "exit" = line.as_str() { + break; + } else if let "trace" = line.as_str() { + node.trace_crdt(); + continue; + } else { + let fake = utils::fake_transaction("foo123".to_string()); + node.add_transaction_local(fake) + }; + tracing::info!("sending {:?}", signed_op); + let json = serde_json::to_string(&signed_op).unwrap(); + handle.text(json).unwrap(); } } @@ -39,9 +51,9 @@ impl ezsockets::ClientExt for WebSocketClient { async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> { tracing::info!("received message: {text}"); - let _incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap(); - // TODO: make this sucker work - // self.bft_crdt.apply(incoming.clone()); + let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap(); + tracing::info!("received signed op: {incoming:?}"); + self.incoming_sender.send(incoming).await.unwrap(); Ok(()) }