diff --git a/Cargo.lock b/Cargo.lock index 4fba59b..52c283d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2074,6 +2074,10 @@ version = "0.1.0" dependencies = [ "async-trait", "ezsockets", + "serde", + "serde_json", + "sha256", + "side-node", "tokio", "tracing", "tracing-subscriber", diff --git a/crates/bft-json-crdt/src/json_crdt.rs b/crates/bft-json-crdt/src/json_crdt.rs index d8cd558..b2cb62c 100644 --- a/crates/bft-json-crdt/src/json_crdt.rs +++ b/crates/bft-json-crdt/src/json_crdt.rs @@ -212,7 +212,7 @@ impl BaseCrdt { /// Apply a signed operation to this BaseCRDT, verifying integrity and routing to the right /// nested CRDT pub fn apply(&mut self, op: SignedOp) -> OpState { - self.log_try_apply(&op); + // self.log_try_apply(&op); #[cfg(feature = "bft")] if !op.is_valid_digest() { @@ -232,9 +232,9 @@ impl BaseCrdt { } // apply - self.log_actually_apply(&op); + // self.log_actually_apply(&op); let status = self.doc.apply(op.inner); - self.debug_view(); + // self.debug_view(); self.received.insert(op_id); // apply all of its causal dependents if there are any diff --git a/side-node/src/lib.rs b/side-node/src/lib.rs index 713c800..cb199a4 100644 --- a/side-node/src/lib.rs +++ b/side-node/src/lib.rs @@ -8,11 +8,11 @@ use websocket::WebSocketClient; pub(crate) mod cli; pub mod crdt; pub(crate) mod init; -pub(crate) mod keys; -pub(crate) mod node; +pub mod keys; +pub mod node; pub(crate) mod stdin; pub mod utils; -pub(crate) mod websocket; +pub mod websocket; #[tokio::main] pub async fn run() { diff --git a/side-node/src/node.rs b/side-node/src/node.rs index c69e9c1..1326d06 100644 --- a/side-node/src/node.rs +++ b/side-node/src/node.rs @@ -4,7 +4,7 @@ use tokio::sync::mpsc; use crate::{crdt::TransactionList, utils, websocket::WebSocketClient}; -pub(crate) struct SideNode { +pub struct SideNode { crdt: BaseCrdt, keys: fastcrypto::ed25519::Ed25519KeyPair, incoming_receiver: mpsc::Receiver, @@ -13,7 +13,7 @@ pub(crate) struct SideNode { } impl SideNode { - pub(crate) fn new( + pub fn new( crdt: BaseCrdt, keys: Ed25519KeyPair, incoming_receiver: mpsc::Receiver, @@ -39,12 +39,14 @@ impl SideNode { let transaction = utils::fake_transaction_json(stdin); let json = serde_json::to_value(transaction).unwrap(); let signed_op = self.add_transaction_local(json); + println!("STDIN: {}", shappy(signed_op.clone())); self.send_to_network(signed_op).await; } Err(_) => {} // ignore empty channel errors in this PoC } match self.incoming_receiver.try_recv() { Ok(incoming) => { + println!("INCOMING"); self.handle_incoming(incoming); } Err(_) => {} // ignore empty channel errors in this PoC @@ -57,12 +59,13 @@ impl SideNode { self.handle.call(to_send).unwrap(); } - fn handle_incoming(&mut self, incoming: SignedOp) { + pub fn handle_incoming(&mut self, incoming: SignedOp) { + println!("handle_incoming: {}", shappy(incoming.clone())); self.crdt.apply(incoming); - self.trace_crdt(); + // self.trace_crdt(); } - pub(crate) fn add_transaction_local( + pub fn add_transaction_local( &mut self, transaction: serde_json::Value, ) -> bft_json_crdt::json_crdt::SignedOp { @@ -79,12 +82,21 @@ impl SideNode { .list .insert(last.id, transaction) .sign(&self.keys); - self.trace_crdt(); + // self.trace_crdt(); signed_op } /// Print the current state of the CRDT, can be used to debug - pub(crate) fn trace_crdt(&self) { + pub fn trace_crdt(&self) { println!("{:?}", self.crdt.doc.view_sha()); } + + pub fn current_sha(&self) -> String { + self.crdt.doc.view_sha() + } +} + +fn shappy(op: SignedOp) -> String { + let b = serde_json::to_string(&op).unwrap().into_bytes(); + sha256::digest(b).to_string() } diff --git a/side-node/src/websocket.rs b/side-node/src/websocket.rs index f7d5caa..46db012 100644 --- a/side-node/src/websocket.rs +++ b/side-node/src/websocket.rs @@ -3,17 +3,16 @@ use bft_json_crdt::json_crdt::SignedOp; use ezsockets::ClientConfig; use tokio::sync::mpsc; -pub(crate) struct WebSocketClient { +pub struct WebSocketClient { incoming_sender: mpsc::Sender, handle: ezsockets::Client, } impl WebSocketClient { /// Start the websocket client - pub(crate) async fn new( + pub async fn new( incoming_sender: mpsc::Sender, ) -> ezsockets::Client { - tracing_subscriber::fmt::init(); let config = ClientConfig::new("ws://localhost:8080/websocket"); let (handle, future) = ezsockets::connect( |client| WebSocketClient { @@ -40,7 +39,6 @@ impl ezsockets::ClientExt for WebSocketClient { /// When we receive a text message, apply the crdt operation contained in it to our /// local crdt. async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> { - tracing::info!("received text: {text:?}"); let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap(); self.incoming_sender.send(incoming).await?; Ok(()) @@ -55,7 +53,6 @@ impl ezsockets::ClientExt for WebSocketClient { /// Call this with the `Call` type to send application data to the websocket client /// (and from there, to the server). async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> { - tracing::info!("sending signed op: {call:?}"); self.handle.text(call)?; Ok(()) } diff --git a/side-node/tests/side_node.rs b/side-node/tests/side_node.rs new file mode 100644 index 0000000..1757efd --- /dev/null +++ b/side-node/tests/side_node.rs @@ -0,0 +1,48 @@ +use bft_json_crdt::{ + json_crdt::{BaseCrdt, SignedOp}, + keypair::make_keypair, +}; +use side_node::{crdt::TransactionList, node::SideNode, utils, WebSocketClient}; +use tokio::sync::mpsc; + +#[tokio::test] +async fn test_distribute_via_websockets() { + let mut node1 = setup("alice").await; + let mut node2 = setup("bob").await; + + assert_eq!(node1.current_sha(), node2.current_sha()); + + let transaction = utils::fake_transaction_json("from_alice".to_string()); + let signed_op = node1.add_transaction_local(transaction); + node2.handle_incoming(signed_op); + + assert_eq!(node1.current_sha(), node2.current_sha()); + + let transaction = utils::fake_transaction_json("from_alice2".to_string()); + let signed_op = node1.add_transaction_local(transaction); + node2.handle_incoming(signed_op); + + assert_eq!(node1.current_sha(), node2.current_sha()); + + let transaction = utils::fake_transaction_json("from_alice3".to_string()); + let signed_op = node1.add_transaction_local(transaction); + node2.handle_incoming(signed_op); + + assert_eq!(node1.current_sha(), node2.current_sha()); +} + +/// Wire everything up, ignoring things we are not using in the test +async fn setup(_: &str) -> SideNode { + // First, load up the keys and create a bft-crdt + let keys = make_keypair(); + let crdt = BaseCrdt::::new(&keys); + + // Channels for internal communication, and a tokio task for stdin input + let (incoming_sender, incoming_receiver) = mpsc::channel::(32); + let (_, stdin_receiver) = std::sync::mpsc::channel(); + + // Finally, create the node and return it + let handle = WebSocketClient::new(incoming_sender).await; + let node = SideNode::new(crdt, keys, incoming_receiver, stdin_receiver, handle); + node +} diff --git a/side-watcher/Cargo.toml b/side-watcher/Cargo.toml index c4ce9d8..1c46769 100644 --- a/side-watcher/Cargo.toml +++ b/side-watcher/Cargo.toml @@ -8,6 +8,10 @@ edition = "2021" [dependencies] async-trait = "0.1.52" ezsockets = { version = "*", features = ["tungstenite"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0.117" +sha256 = "1.5.0" +side-node = { path = "../side-node" } tokio = { version = "1.17.0", features = ["full"] } tracing = "0.1.32" tracing-subscriber = "0.3.9" diff --git a/side-watcher/src/main.rs b/side-watcher/src/main.rs index 5973b1a..01a711b 100644 --- a/side-watcher/src/main.rs +++ b/side-watcher/src/main.rs @@ -85,12 +85,13 @@ impl ezsockets::ServerExt for ChatServer { .unzip(); tracing::info!( - "sending {text} to [{sessions}] at `{room}`", + "sending {hash} to [{sessions}] at `{room}`", sessions = ids .iter() .map(|id| id.to_string()) .collect::>() - .join(",") + .join(","), + hash = shappy(text.clone()) ); for session in sessions { session.text(text.clone()).unwrap(); @@ -144,7 +145,7 @@ impl ezsockets::SessionExt for SessionActor { } async fn on_text(&mut self, text: String) -> Result<(), Error> { - tracing::info!("received: {text}"); + tracing::info!("received: {}", shappy(text.clone())); if text.starts_with('/') { let mut args = text.split_whitespace(); let command = args.next().unwrap(); @@ -183,6 +184,11 @@ impl ezsockets::SessionExt for SessionActor { } } +fn shappy(text: String) -> String { + let b = text.into_bytes(); + sha256::digest(b).to_string() +} + #[tokio::main] async fn main() { tracing_subscriber::fmt::init();