use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; use fastcrypto::ed25519::Ed25519KeyPair; use tokio::sync::mpsc; use crate::{crdt::TransactionList, utils}; pub(crate) struct SideNode { crdt: BaseCrdt, keys: fastcrypto::ed25519::Ed25519KeyPair, incoming_receiver: mpsc::Receiver, stdin_receiver: std::sync::mpsc::Receiver, network_sender: mpsc::Sender, } impl SideNode { pub(crate) fn new( crdt: BaseCrdt, keys: Ed25519KeyPair, incoming_receiver: mpsc::Receiver, stdin_receiver: std::sync::mpsc::Receiver, network_sender: mpsc::Sender, ) -> Self { let node = Self { crdt, keys, incoming_receiver, stdin_receiver, network_sender, }; node } pub(crate) async fn start(&mut self) { println!("Starting node..."); loop { match self.stdin_receiver.try_recv() { Ok(stdin) => { println!("Received stdin input: {:?}", stdin); let transaction = utils::fake_transaction(stdin); let json = serde_json::to_value(transaction).unwrap(); let signed_op = self.add_transaction_local(json); self.send_to_network(signed_op).await; } Err(_) => {} // ignore empty channel errors in this PoC } match self.incoming_receiver.try_recv() { Ok(incoming) => { self.handle_incoming(&incoming); } Err(_) => {} // ignore empty channel errors in this PoC } } } async fn send_to_network(&self, signed_op: SignedOp) { println!("sending to network: {:?}", signed_op); self.network_sender.send(signed_op).await.unwrap(); } fn handle_incoming(&mut self, incoming: &SignedOp) { println!("WINNNINGINGINGINGINGIGNIGN"); self.crdt.apply(incoming.clone()); } pub(crate) fn add_transaction_local( &mut self, transaction: serde_json::Value, ) -> bft_json_crdt::json_crdt::SignedOp { let last = self .crdt .doc .list .ops .last() .expect("couldn't find last op"); let signed_op = self .crdt .doc .list .insert(last.id, transaction) .sign(&self.keys); signed_op } /// Print the current state of the CRDT, can be used to debug pub(crate) fn _trace_crdt(&self) { println!("{:?}", self.crdt.doc.list); } }