21 Commits

Author SHA1 Message Date
Dave Hrycyszyn
546a45bb3a Deleting unused mod 2024-06-11 18:37:16 +01:00
Dave Hrycyszyn
950a63c103 Moved stdin_input to stdin::input module, pulling it out of main 2024-06-11 18:35:33 +01:00
Dave Hrycyszyn
f9c4fce398 Stopped double-encoding the SignedOp 2024-06-11 18:33:08 +01:00
Dave Hrycyszyn
014462c187 Using the ezsockets call methods to shoot text at the websocket. 2024-06-11 18:29:02 +01:00
Dave Hrycyszyn
b1daec3b84 Figured out what on_call is for 2024-06-11 18:13:51 +01:00
Dave Hrycyszyn
e0c991d0f9 Getting ready for network broadcast 2024-06-11 17:06:49 +01:00
Dave Hrycyszyn
a53b5bd94c Tidy 2024-06-11 16:52:40 +01:00
Dave Hrycyszyn
d13df41b82 Variable rename 2024-06-11 16:52:15 +01:00
Dave Hrycyszyn
4496a0916b Removing period send code 2024-06-11 16:51:13 +01:00
Dave Hrycyszyn
f3bea8c62d Restructuring tokio tasks and stdin receiver 2024-06-11 16:50:21 +01:00
Dave Hrycyszyn
443c4e1dac Simplifying 2024-06-10 16:43:45 +01:00
Dave Hrycyszyn
6077c3a519 Pinpointing blocking point 2024-06-10 16:33:03 +01:00
Dave Hrycyszyn
91fbe7f9bd Going back to blocking, need a new thread here 2024-06-10 14:26:00 +01:00
Dave Hrycyszyn
4717ffa7e8 Almost working, I've got a blocking I/O problem with stdin now :) 2024-06-10 14:25:05 +01:00
Dave Hrycyszyn
c3f5b2890b More pushing code around 2024-06-07 18:42:28 +01:00
Dave Hrycyszyn
9dc515fb78 Renamed write to write_toml in the config 2024-06-07 18:22:07 +01:00
Dave Hrycyszyn
6f756d4fb6 Minor cleanup 2024-06-07 18:20:02 +01:00
Dave Hrycyszyn
5d6a1e806a Nearly there 2024-06-07 17:35:38 +01:00
Dave Hrycyszyn
d91a631fdc More re-jigging 2024-06-07 17:18:46 +01:00
Dave Hrycyszyn
a81d1f913a Starting to modify things into container structs 2024-06-07 17:03:05 +01:00
Dave Hrycyszyn
b1f5d2b75a User serde_json for SignedOp serialization 2024-06-07 14:58:41 +01:00
12 changed files with 245 additions and 140 deletions

View File

@@ -123,12 +123,6 @@ pub struct SignedOp {
pub depends_on: Vec<SignedDigest>,
}
impl Display for SignedOp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SignedOp({})", self)
}
}
impl SignedOp {
pub fn id(&self) -> OpId {
self.inner.id

34
side-node/src/crdt.rs Normal file
View File

@@ -0,0 +1,34 @@
use bft_crdt_derive::add_crdt_fields;
use bft_json_crdt::{
json_crdt::{CrdtNode, IntoCrdtNode},
list_crdt::ListCrdt,
};
use serde::{Deserialize, Serialize};
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
pub(crate) struct TransactionList {
pub(crate) list: ListCrdt<Transaction>,
}
/// A fake Transaction struct we can use as a simulated payload
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
pub(crate) struct Transaction {
from: String,
to: String,
amount: f64,
}
// impl TransactionList {
// fn create(&mut self, keys: &Ed25519KeyPair) -> bft_json_crdt::json_crdt::SignedOp {
// // generate a placeholder transaction
// let transaction = _fake_transaction(keys.public().to_string());
// // next job is to keep adding to this guy
// let last: &Op<Transaction>;
// last = self.list.ops.last().expect("couldn't find last op");
// let signed_op = self.list.insert(last.id, transaction.clone()).sign(&keys);
// signed_op
// }
// }

View File

@@ -10,7 +10,7 @@ pub(crate) struct SideNodeConfig {
pub(crate) name: String,
}
pub(crate) fn write(
pub(crate) fn write_toml(
config: &SideNodeConfig,
file_path: &PathBuf,
) -> Result<(), Box<dyn std::error::Error>> {

View File

@@ -14,7 +14,7 @@ pub(crate) fn init(home: PathBuf, config: SideNodeConfig) -> Result<(), std::io:
keys::write(key_path)?;
println!("Writing config to: {:?}", config_path);
config::write(&config, &config_path).expect("unable to write config file");
config::write_toml(&config, &config_path).expect("unable to write config file");
Ok(())
}

View File

@@ -1,61 +0,0 @@
use std::path::PathBuf;
use bft_crdt_derive::add_crdt_fields;
use bft_json_crdt::{
json_crdt::{BaseCrdt, CrdtNode, IntoCrdtNode},
keypair::{Ed25519KeyPair, KeyPair},
list_crdt::ListCrdt,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use crate::keys;
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
pub(crate) struct CrdtList {
pub(crate) list: ListCrdt<Transaction>, // switch to Transaction as soon as char is working
}
/// A fake Transaction struct we can use as a simulated payload
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
pub(crate) struct Transaction {
from: String,
to: String,
amount: f64,
}
pub(crate) fn new(side_dir: PathBuf) -> (BaseCrdt<CrdtList>, Ed25519KeyPair) {
let keys = keys::load_from_file(side_dir);
let bft_crdt = BaseCrdt::<CrdtList>::new(&keys);
println!("Author is {}", keys.public().to_string());
(bft_crdt, keys)
}
pub(crate) fn send(
bft_crdt: &mut BaseCrdt<CrdtList>,
keys: &Ed25519KeyPair,
) -> bft_json_crdt::json_crdt::SignedOp {
// generate a placeholder transaction
let transaction = generate_transaction(keys.public().to_string());
// next job is to keep adding to this guy
let next = bft_crdt.doc.list.ops.len();
let signed_op = bft_crdt
.doc
.list
.insert_idx(next - 1, transaction.clone())
.sign(&keys);
signed_op
}
fn generate_transaction(pubkey: String) -> Value {
json!({
"from": pubkey,
"to": "Bob",
"amount": 1
})
}

View File

@@ -1,9 +1,16 @@
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
use cli::{parse_args, Commands};
use crdt::TransactionList;
use node::SideNode;
use tokio::{sync::mpsc, task};
use websocket::WebSocketClient;
pub(crate) mod cli;
pub(crate) mod crdt;
pub(crate) mod init;
pub(crate) mod keys;
pub(crate) mod list_transaction_crdt;
pub(crate) mod node;
pub(crate) mod stdin;
pub(crate) mod utils;
pub(crate) mod websocket;
@@ -17,20 +24,33 @@ async fn main() {
name: name.to_string(),
};
let _ = init::init(home(name), config);
let _ = init::init(utils::home(name), config);
}
Some(Commands::Run { name }) => {
let side_dir = home(name);
let (mut bft_crdt, keys) = list_transaction_crdt::new(side_dir);
websocket::start(keys, &mut bft_crdt).await;
let mut node = setup(name).await;
node.start().await;
}
None => println!("No command provided. Exiting. See --help for more information."),
}
}
fn home(name: &String) -> std::path::PathBuf {
let mut path = dirs::home_dir().unwrap();
path.push(".side");
path.push(name);
path
/// Wire everything up outside the application so we can test more easily later
async fn setup(name: &String) -> SideNode {
// First, load up the keys and create a bft-crdt
let side_dir = utils::home(name);
let keys = keys::load_from_file(side_dir);
let crdt = BaseCrdt::<TransactionList>::new(&keys);
// Channels for internal communication, and a tokio task for stdin input
let (incoming_sender, incoming_receiver) = mpsc::channel::<SignedOp>(32);
let (stdin_sender, stdin_receiver) = std::sync::mpsc::channel();
task::spawn(async move {
stdin::input(stdin_sender);
});
// Finally, create the node and return it
let handle = WebSocketClient::new(incoming_sender).await;
let node = SideNode::new(crdt, keys, incoming_receiver, stdin_receiver, handle);
println!("Node setup complete.");
node
}

91
side-node/src/node.rs Normal file
View File

@@ -0,0 +1,91 @@
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
use fastcrypto::ed25519::Ed25519KeyPair;
use tokio::sync::mpsc;
use crate::{crdt::TransactionList, utils, websocket::WebSocketClient};
pub(crate) struct SideNode {
crdt: BaseCrdt<TransactionList>,
keys: fastcrypto::ed25519::Ed25519KeyPair,
incoming_receiver: mpsc::Receiver<SignedOp>,
stdin_receiver: std::sync::mpsc::Receiver<String>,
handle: ezsockets::Client<WebSocketClient>,
}
impl SideNode {
pub(crate) fn new(
crdt: BaseCrdt<TransactionList>,
keys: Ed25519KeyPair,
incoming_receiver: mpsc::Receiver<SignedOp>,
stdin_receiver: std::sync::mpsc::Receiver<String>,
handle: ezsockets::Client<WebSocketClient>,
) -> Self {
let node = Self {
crdt,
keys,
incoming_receiver,
stdin_receiver,
handle,
};
node
}
pub(crate) async fn start(&mut self) {
println!("Starting node...");
loop {
match self.stdin_receiver.try_recv() {
Ok(stdin) => {
println!("Received stdin input: {:?}", stdin);
let transaction = utils::fake_transaction(stdin);
let json = serde_json::to_value(transaction).unwrap();
let signed_op = self.add_transaction_local(json);
self.send_to_network(signed_op).await;
}
Err(_) => {} // ignore empty channel errors in this PoC
}
match self.incoming_receiver.try_recv() {
Ok(incoming) => {
self.handle_incoming(&incoming);
}
Err(_) => {} // ignore empty channel errors in this PoC
}
}
}
async fn send_to_network(&self, signed_op: SignedOp) {
println!("sending to network: {:?}", signed_op);
let to_send = serde_json::to_string(&signed_op).unwrap();
self.handle.call(to_send).unwrap();
}
fn handle_incoming(&mut self, incoming: &SignedOp) {
println!("WINNNINGINGINGINGINGIGNIGN");
self.crdt.apply(incoming.clone());
}
pub(crate) fn add_transaction_local(
&mut self,
transaction: serde_json::Value,
) -> bft_json_crdt::json_crdt::SignedOp {
let last = self
.crdt
.doc
.list
.ops
.last()
.expect("couldn't find last op");
let signed_op = self
.crdt
.doc
.list
.insert(last.id, transaction)
.sign(&self.keys);
signed_op
}
/// Print the current state of the CRDT, can be used to debug
pub(crate) fn _trace_crdt(&self) {
println!("{:?}", self.crdt.doc.list);
}
}

12
side-node/src/stdin.rs Normal file
View File

@@ -0,0 +1,12 @@
use std::io::BufRead;
/// Wait for stdin terminal input and send it to the node if any arrives
pub(crate) fn input(stdin_sender: std::sync::mpsc::Sender<String>) {
let stdin = std::io::stdin();
let lines = stdin.lock().lines();
for line in lines {
println!("We're in stdin_input");
let line = line.unwrap();
stdin_sender.send(line).unwrap();
}
}

View File

@@ -1,9 +1,11 @@
use std::path::PathBuf;
use serde_json::{json, Value};
pub(crate) const KEY_FILE: &str = "keys.pem";
pub(crate) const CONFIG_FILE: &str = "config.toml";
/// Returns the path to the key file for this host OS.
/// Returns the path to the key file and config for this host OS.
pub(crate) fn side_paths(prefix: PathBuf) -> (PathBuf, PathBuf) {
let mut key_path = prefix.clone();
key_path.push(KEY_FILE);
@@ -13,3 +15,19 @@ pub(crate) fn side_paths(prefix: PathBuf) -> (PathBuf, PathBuf) {
(key_path, config_path)
}
pub(crate) fn home(name: &String) -> std::path::PathBuf {
let mut path = dirs::home_dir().unwrap();
path.push(".side");
path.push(name);
path
}
/// Generate a fake transaction with customizable from_pubkey String
pub(crate) fn fake_transaction(from_pubkey: String) -> Value {
json!({
"from": from_pubkey,
"to": "Bob",
"amount": 1
})
}

View File

@@ -0,0 +1,57 @@
use async_trait::async_trait;
use bft_json_crdt::json_crdt::SignedOp;
use ezsockets::ClientConfig;
use tokio::sync::mpsc;
pub(crate) struct WebSocketClient {
incoming_sender: mpsc::Sender<SignedOp>,
handle: ezsockets::Client<WebSocketClient>,
}
impl WebSocketClient {
/// Start the websocket client
pub(crate) async fn new(
incoming_sender: mpsc::Sender<SignedOp>,
) -> ezsockets::Client<WebSocketClient> {
tracing_subscriber::fmt::init();
let config = ClientConfig::new("ws://localhost:8080/websocket");
let (handle, future) = ezsockets::connect(
|client| WebSocketClient {
incoming_sender,
handle: client,
},
config,
)
.await;
tokio::spawn(async move {
future.await.unwrap();
});
handle
}
}
#[async_trait]
impl ezsockets::ClientExt for WebSocketClient {
// Right now we're only using the Call type for sending signed ops
// change this to an enum if we need to send other types of calls, and
// match on it.
type Call = String;
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
tracing::info!("received text: {text:?}");
let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap();
self.incoming_sender.send(incoming).await?;
Ok(())
}
async fn on_binary(&mut self, bytes: Vec<u8>) -> Result<(), ezsockets::Error> {
tracing::info!("received bytes: {bytes:?}");
Ok(())
}
async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> {
tracing::info!("sending signed op: {call:?}");
self.handle.text(call)?;
Ok(())
}
}

View File

@@ -1,60 +0,0 @@
/*
loop {
let author = general_purpose::STANDARD.encode(&incoming_operation.author());
bft_crdt.apply(incoming_operation.clone());
}
*/
use async_trait::async_trait;
use bft_json_crdt::json_crdt::BaseCrdt;
use ezsockets::ClientConfig;
use fastcrypto::ed25519::Ed25519KeyPair;
use std::io::BufRead;
use crate::list_transaction_crdt::{self, CrdtList};
struct Client {}
#[async_trait]
impl ezsockets::ClientExt for Client {
type Call = ();
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
tracing::info!("received message: {text}");
Ok(())
}
async fn on_binary(&mut self, bytes: Vec<u8>) -> Result<(), ezsockets::Error> {
tracing::info!("received bytes: {bytes:?}");
Ok(())
}
async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> {
let () = call;
Ok(())
}
}
pub(crate) async fn start(keys: Ed25519KeyPair, bft_crdt: &mut BaseCrdt<CrdtList>) {
tracing_subscriber::fmt::init();
let config = ClientConfig::new("ws://localhost:8080/websocket");
let (handle, future) = ezsockets::connect(|_client| Client {}, config).await;
tokio::spawn(async move {
future.await.unwrap();
});
let stdin = std::io::stdin();
let lines = stdin.lock().lines();
for line in lines {
let line = line.unwrap();
let signed_op = if let "exit" = line.as_str() {
break;
} else {
let op = list_transaction_crdt::send(bft_crdt, &keys);
op.to_string()
};
tracing::info!("sending {:?}", signed_op);
handle.text(signed_op).unwrap();
}
}