use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp}; use fastcrypto::ed25519::Ed25519KeyPair; use tokio::sync::mpsc; use crate::{crdt::TransactionList, utils, websocket::WebSocketClient}; pub(crate) struct SideNode { crdt: BaseCrdt, keys: fastcrypto::ed25519::Ed25519KeyPair, incoming_receiver: mpsc::Receiver, stdin_receiver: std::sync::mpsc::Receiver, handle: ezsockets::Client, } impl SideNode { pub(crate) fn new( crdt: BaseCrdt, keys: Ed25519KeyPair, incoming_receiver: mpsc::Receiver, stdin_receiver: std::sync::mpsc::Receiver, handle: ezsockets::Client, ) -> Self { let node = Self { crdt, keys, incoming_receiver, stdin_receiver, handle, }; node } pub(crate) async fn start(&mut self) { println!("Starting node..."); loop { match self.stdin_receiver.try_recv() { Ok(stdin) => { let transaction = utils::fake_transaction_json(stdin); let json = serde_json::to_value(transaction).unwrap(); let signed_op = self.add_transaction_local(json); self.send_to_network(signed_op).await; } Err(_) => {} // ignore empty channel errors in this PoC } match self.incoming_receiver.try_recv() { Ok(incoming) => { self.handle_incoming(incoming); } Err(_) => {} // ignore empty channel errors in this PoC } } } async fn send_to_network(&self, signed_op: SignedOp) { let to_send = serde_json::to_string(&signed_op).unwrap(); self.handle.call(to_send).unwrap(); } fn handle_incoming(&mut self, incoming: SignedOp) { self.crdt.apply(incoming); self.trace_crdt(); } pub(crate) fn add_transaction_local( &mut self, transaction: serde_json::Value, ) -> bft_json_crdt::json_crdt::SignedOp { let last = self .crdt .doc .list .ops .last() .expect("couldn't find last op"); let signed_op = self .crdt .doc .list .insert(last.id, transaction) .sign(&self.keys); self.trace_crdt(); signed_op } /// Print the current state of the CRDT, can be used to debug pub(crate) fn trace_crdt(&self) { println!("{:?}", self.crdt.doc.view_sha()); } }