From e85683e86551de47869deeec92c772db6bf0a1d2 Mon Sep 17 00:00:00 2001 From: Dave Hrycyszyn Date: Wed, 5 Jun 2024 18:07:59 +0100 Subject: [PATCH] Fixed transaction structure. Remote transactions not apply()ing yet. --- crates/bft-json-crdt/src/json_crdt.rs | 2 +- crates/bft-json-crdt/src/op.rs | 2 +- side-node/src/websocket/mod.rs | 78 ++++++++++++++++----------- 3 files changed, 48 insertions(+), 34 deletions(-) diff --git a/crates/bft-json-crdt/src/json_crdt.rs b/crates/bft-json-crdt/src/json_crdt.rs index abcfc30..d8cd558 100644 --- a/crates/bft-json-crdt/src/json_crdt.rs +++ b/crates/bft-json-crdt/src/json_crdt.rs @@ -109,7 +109,7 @@ pub struct BaseCrdt { /// An [`Op`] with a few bits of extra metadata #[serde_as] -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] pub struct SignedOp { // Note that this can be different from the author of the inner op as the inner op could have been created // by a different person diff --git a/crates/bft-json-crdt/src/op.rs b/crates/bft-json-crdt/src/op.rs index 9c28070..aaa6f02 100644 --- a/crates/bft-json-crdt/src/op.rs +++ b/crates/bft-json-crdt/src/op.rs @@ -81,7 +81,7 @@ pub fn parse_field(path: Vec) -> Option { } /// Represents a single node in a CRDT -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] pub struct Op where T: CrdtNode, diff --git a/side-node/src/websocket/mod.rs b/side-node/src/websocket/mod.rs index 736ada7..7d7fadd 100644 --- a/side-node/src/websocket/mod.rs +++ b/side-node/src/websocket/mod.rs @@ -1,14 +1,17 @@ +use std::io::{self, BufRead}; + use bft_crdt_derive::add_crdt_fields; use bft_json_crdt::json_crdt::SignedOp; -use bft_json_crdt::keypair::KeyPair; +use bft_json_crdt::keypair::{Ed25519KeyPair, KeyPair}; use bft_json_crdt::{ json_crdt::{BaseCrdt, CrdtNode, IntoCrdtNode}, - keypair::{make_keypair, ED25519_PUBLIC_KEY_LENGTH}, + keypair::make_keypair, list_crdt::ListCrdt, op::ROOT_ID, }; use serde::{Deserialize, Serialize}; -use tokio::time::{self}; +use serde_json::{json, Value}; +// use tokio::time::{self}; use websockets::WebSocket; /// Starts a websocket and periodically sends a BFT-CRDT message to the websocket server @@ -21,21 +24,16 @@ pub(crate) async fn start() -> Result<(), websockets::WebSocketError> { let mut bft_crdt = BaseCrdt::::new(&keys); println!("Generated a new CRDT with public key: {}", keys.public()); - // generate a placeholder transaction - let transaction = generate_transaction(); - - let json = convert_to_json(&transaction).unwrap(); - - // next job is to keep adding to this guy - let signed_op = bft_crdt.doc.list.insert(ROOT_ID, json.clone()).sign(&keys); - println!("SignedOp before send is: {:?}", signed_op); - - let mut interval = every_two_seconds(); + // let mut interval = every_two_seconds(); loop { - interval.tick().await; - println!("Sending: {:?}", signed_op); - ws.send_text(serde_json::to_string(&signed_op).unwrap()) - .await?; + let stdin = io::stdin(); + let mut handle = stdin.lock(); + + let mut line = String::new(); + handle.read_line(&mut line).unwrap(); + + println!("You entered: {}", line); + let _ = send_a_transaction(line, &mut bft_crdt, &mut ws, &keys).await; let msg = ws.receive().await?; println!("Received: {:?}", msg); @@ -46,15 +44,15 @@ pub(crate) async fn start() -> Result<(), websockets::WebSocketError> { // deserialize the message into a Transaction struct let operation: SignedOp = serde_json::from_str(&msg).unwrap(); - // TODO: bft_crdt.apply() changes in here when we receive socket input from other nodes bft_crdt.apply(operation); + println!("New crdt state is: {}", bft_crdt.doc.view()) } } -fn every_two_seconds() -> time::Interval { - time::interval(time::Duration::from_secs(2)) -} +// fn every_two_seconds() -> time::Interval { +// time::interval(time::Duration::from_secs(2)) +// } #[add_crdt_fields] #[derive(Clone, CrdtNode, Serialize, Deserialize)] @@ -71,17 +69,33 @@ struct Transaction { amount: f64, } -fn generate_transaction() -> Transaction { - Transaction { - from: "Alice".to_string(), - to: "Bob".to_string(), - amount: 100.0, - path: vec![], - id: [0; ED25519_PUBLIC_KEY_LENGTH], - } +fn generate_transaction(from: String) -> Value { + json!({ + "from": from, + "to": "Bob", + "amount": 100.0 + }) } -fn convert_to_json(transaction: &Transaction) -> serde_json::Result { - let json = serde_json::to_string(&transaction).unwrap(); - Ok(json) +async fn send_a_transaction( + input: String, + bft_crdt: &mut BaseCrdt, + ws: &mut WebSocket, + keys: &Ed25519KeyPair, +) -> Result<(), websockets::WebSocketError> { + // generate a placeholder transaction + let transaction = generate_transaction(input); + + // next job is to keep adding to this guy + let signed_op = bft_crdt + .doc + .list + .insert(ROOT_ID, transaction.clone()) + .sign(&keys); + println!("SignedOp being sent is: {:?}", signed_op); + + println!("Sending: {:?}", signed_op); + Ok(ws + .send_text(serde_json::to_string(&signed_op).unwrap()) + .await?) }