From 214d9111a67c6685a4c5ed9cda57f18865fbbde9 Mon Sep 17 00:00:00 2001 From: Dave Hrycyszyn Date: Wed, 5 Jun 2024 18:22:11 +0100 Subject: [PATCH] BFT-CRDT sending of transactions works on a timer. --- side-node/src/websocket/mod.rs | 44 ++++++++++++++++------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/side-node/src/websocket/mod.rs b/side-node/src/websocket/mod.rs index 7d7fadd..7871c03 100644 --- a/side-node/src/websocket/mod.rs +++ b/side-node/src/websocket/mod.rs @@ -1,5 +1,3 @@ -use std::io::{self, BufRead}; - use bft_crdt_derive::add_crdt_fields; use bft_json_crdt::json_crdt::SignedOp; use bft_json_crdt::keypair::{Ed25519KeyPair, KeyPair}; @@ -7,10 +5,11 @@ use bft_json_crdt::{ json_crdt::{BaseCrdt, CrdtNode, IntoCrdtNode}, keypair::make_keypair, list_crdt::ListCrdt, - op::ROOT_ID, + // op::ROOT_ID, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; +use tokio::time; // use tokio::time::{self}; use websockets::WebSocket; @@ -24,16 +23,10 @@ pub(crate) async fn start() -> Result<(), websockets::WebSocketError> { let mut bft_crdt = BaseCrdt::::new(&keys); println!("Generated a new CRDT with public key: {}", keys.public()); - // let mut interval = every_two_seconds(); + let mut interval = every_two_seconds(); + let mut count = 0; loop { - let stdin = io::stdin(); - let mut handle = stdin.lock(); - - let mut line = String::new(); - handle.read_line(&mut line).unwrap(); - - println!("You entered: {}", line); - let _ = send_a_transaction(line, &mut bft_crdt, &mut ws, &keys).await; + let _ = send_a_transaction(count, &mut bft_crdt, &mut ws, &keys).await; let msg = ws.receive().await?; println!("Received: {:?}", msg); @@ -42,17 +35,20 @@ pub(crate) async fn start() -> Result<(), websockets::WebSocketError> { let msg = msg.into_text().unwrap().0; // deserialize the message into a Transaction struct - let operation: SignedOp = serde_json::from_str(&msg).unwrap(); + let incoming_operation: SignedOp = serde_json::from_str(&msg).unwrap(); + println!("Received a new network operation: {:?}", incoming_operation); - bft_crdt.apply(operation); + bft_crdt.apply(incoming_operation); - println!("New crdt state is: {}", bft_crdt.doc.view()) + println!("New crdt state is: {}", bft_crdt.doc.view()); + count = count + 1; + interval.tick().await; } } -// fn every_two_seconds() -> time::Interval { -// time::interval(time::Duration::from_secs(2)) -// } +fn every_two_seconds() -> time::Interval { + time::interval(time::Duration::from_secs(2)) +} #[add_crdt_fields] #[derive(Clone, CrdtNode, Serialize, Deserialize)] @@ -69,28 +65,28 @@ struct Transaction { amount: f64, } -fn generate_transaction(from: String) -> Value { +fn generate_transaction(count: u32) -> Value { json!({ - "from": from, + "from": "Alice", "to": "Bob", - "amount": 100.0 + "amount": count }) } async fn send_a_transaction( - input: String, + count: u32, bft_crdt: &mut BaseCrdt, ws: &mut WebSocket, keys: &Ed25519KeyPair, ) -> Result<(), websockets::WebSocketError> { // generate a placeholder transaction - let transaction = generate_transaction(input); + let transaction = generate_transaction(count); // next job is to keep adding to this guy let signed_op = bft_crdt .doc .list - .insert(ROOT_ID, transaction.clone()) + .insert_idx(0, transaction.clone()) .sign(&keys); println!("SignedOp being sent is: {:?}", signed_op);