diff --git a/side-node/src/list_transaction_crdt.rs b/side-node/src/list_transaction_crdt.rs index 8f19bd2..0bd0e43 100644 --- a/side-node/src/list_transaction_crdt.rs +++ b/side-node/src/list_transaction_crdt.rs @@ -41,14 +41,14 @@ pub(crate) async fn send( let transaction = generate_transaction(count, keys.public().to_string()); // next job is to keep adding to this guy + let next = bft_crdt.doc.list.ops.len(); let signed_op = bft_crdt .doc .list - .insert_idx(0, transaction.clone()) + .insert_idx(next - 1, transaction.clone()) .sign(&keys); - println!("SignedOp being sent is: {:?}", signed_op); + // println!("SignedOp being sent is: {:?}", signed_op); - println!("Sending: {:?}", signed_op); Ok(ws .send_text(serde_json::to_string(&signed_op).unwrap()) .await?) diff --git a/side-node/src/websocket/mod.rs b/side-node/src/websocket/mod.rs index 34c2e87..b982e84 100644 --- a/side-node/src/websocket/mod.rs +++ b/side-node/src/websocket/mod.rs @@ -1,10 +1,10 @@ use crate::list_transaction_crdt::{self, CrdtList}; +use base64::{engine::general_purpose, Engine as _}; +use bft_json_crdt::json_crdt::BaseCrdt; use bft_json_crdt::json_crdt::SignedOp; -use bft_json_crdt::json_crdt::{BaseCrdt, CrdtNode}; use bft_json_crdt::keypair::Ed25519KeyPair; use tokio::time; use websockets::WebSocket; - /// Starts a websocket and periodically sends a BFT-CRDT message to the websocket server pub(crate) async fn start( keys: Ed25519KeyPair, @@ -13,29 +13,36 @@ pub(crate) async fn start( println!("connecting to websocket at ws://127.0.0.1:8080/"); let mut ws = WebSocket::connect("ws://127.0.0.1:8080/").await?; - let mut interval = every_two_seconds(); + let mut interval = every_ten_seconds(); let mut count = 0; loop { let _ = list_transaction_crdt::send(count, bft_crdt, &mut ws, &keys).await; let msg = ws.receive().await?; - println!("Received: {:?}", msg); + // println!("Received: {:?}", msg); // deserialize the received websocket Frame into a string let msg = msg.into_text().unwrap().0; // deserialize the message into a Transaction struct let incoming_operation: SignedOp = serde_json::from_str(&msg).unwrap(); - println!("Received a new network operation: {:?}", incoming_operation); + println!( + "Received a new network operation from: {:?}", + general_purpose::STANDARD.encode(&incoming_operation.author()) + ); bft_crdt.apply(incoming_operation); + // println!("New crdt state is: {}", bft_crdt.doc.view()); - println!("New crdt state is: {}", bft_crdt.doc.view()); count = count + 1; interval.tick().await; } } -fn every_two_seconds() -> time::Interval { - time::interval(time::Duration::from_secs(2)) +// fn every_two_seconds() -> time::Interval { +// time::interval(time::Duration::from_secs(2)) +// } + +fn every_ten_seconds() -> time::Interval { + time::interval(time::Duration::from_secs(10)) } diff --git a/side-watcher/src/main.rs b/side-watcher/src/main.rs index c26bc48..94ad67a 100644 --- a/side-watcher/src/main.rs +++ b/side-watcher/src/main.rs @@ -24,9 +24,12 @@ fn main() { client_id, message ); // retrieve this client's `Responder`: - let responder = clients.get(&client_id).unwrap(); - // echo the message back: - responder.send(message); + // let responder = clients.get(&client_id).unwrap(); + let all_clients = clients.keys().collect::>(); + for client in all_clients { + let other_responder = clients.get(client).unwrap(); + other_responder.send(message.clone()); + } } } }