Compare commits
21 Commits
ezsockets
...
feature/ez
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
546a45bb3a | ||
|
|
950a63c103 | ||
|
|
f9c4fce398 | ||
|
|
014462c187 | ||
|
|
b1daec3b84 | ||
|
|
e0c991d0f9 | ||
|
|
a53b5bd94c | ||
|
|
d13df41b82 | ||
|
|
4496a0916b | ||
|
|
f3bea8c62d | ||
|
|
443c4e1dac | ||
|
|
6077c3a519 | ||
|
|
91fbe7f9bd | ||
|
|
4717ffa7e8 | ||
|
|
c3f5b2890b | ||
|
|
9dc515fb78 | ||
|
|
6f756d4fb6 | ||
|
|
5d6a1e806a | ||
|
|
d91a631fdc | ||
|
|
a81d1f913a | ||
|
|
b1f5d2b75a |
@@ -123,12 +123,6 @@ pub struct SignedOp {
|
|||||||
pub depends_on: Vec<SignedDigest>,
|
pub depends_on: Vec<SignedDigest>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for SignedOp {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(f, "SignedOp({})", self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SignedOp {
|
impl SignedOp {
|
||||||
pub fn id(&self) -> OpId {
|
pub fn id(&self) -> OpId {
|
||||||
self.inner.id
|
self.inner.id
|
||||||
|
|||||||
34
side-node/src/crdt.rs
Normal file
34
side-node/src/crdt.rs
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
use bft_crdt_derive::add_crdt_fields;
|
||||||
|
use bft_json_crdt::{
|
||||||
|
json_crdt::{CrdtNode, IntoCrdtNode},
|
||||||
|
list_crdt::ListCrdt,
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[add_crdt_fields]
|
||||||
|
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
|
||||||
|
pub(crate) struct TransactionList {
|
||||||
|
pub(crate) list: ListCrdt<Transaction>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A fake Transaction struct we can use as a simulated payload
|
||||||
|
#[add_crdt_fields]
|
||||||
|
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
|
||||||
|
pub(crate) struct Transaction {
|
||||||
|
from: String,
|
||||||
|
to: String,
|
||||||
|
amount: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
// impl TransactionList {
|
||||||
|
// fn create(&mut self, keys: &Ed25519KeyPair) -> bft_json_crdt::json_crdt::SignedOp {
|
||||||
|
// // generate a placeholder transaction
|
||||||
|
// let transaction = _fake_transaction(keys.public().to_string());
|
||||||
|
|
||||||
|
// // next job is to keep adding to this guy
|
||||||
|
// let last: &Op<Transaction>;
|
||||||
|
// last = self.list.ops.last().expect("couldn't find last op");
|
||||||
|
// let signed_op = self.list.insert(last.id, transaction.clone()).sign(&keys);
|
||||||
|
// signed_op
|
||||||
|
// }
|
||||||
|
// }
|
||||||
@@ -10,7 +10,7 @@ pub(crate) struct SideNodeConfig {
|
|||||||
pub(crate) name: String,
|
pub(crate) name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn write(
|
pub(crate) fn write_toml(
|
||||||
config: &SideNodeConfig,
|
config: &SideNodeConfig,
|
||||||
file_path: &PathBuf,
|
file_path: &PathBuf,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ pub(crate) fn init(home: PathBuf, config: SideNodeConfig) -> Result<(), std::io:
|
|||||||
keys::write(key_path)?;
|
keys::write(key_path)?;
|
||||||
|
|
||||||
println!("Writing config to: {:?}", config_path);
|
println!("Writing config to: {:?}", config_path);
|
||||||
config::write(&config, &config_path).expect("unable to write config file");
|
config::write_toml(&config, &config_path).expect("unable to write config file");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,61 +0,0 @@
|
|||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
use bft_crdt_derive::add_crdt_fields;
|
|
||||||
|
|
||||||
use bft_json_crdt::{
|
|
||||||
json_crdt::{BaseCrdt, CrdtNode, IntoCrdtNode},
|
|
||||||
keypair::{Ed25519KeyPair, KeyPair},
|
|
||||||
list_crdt::ListCrdt,
|
|
||||||
};
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use serde_json::{json, Value};
|
|
||||||
|
|
||||||
use crate::keys;
|
|
||||||
|
|
||||||
#[add_crdt_fields]
|
|
||||||
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
|
|
||||||
pub(crate) struct CrdtList {
|
|
||||||
pub(crate) list: ListCrdt<Transaction>, // switch to Transaction as soon as char is working
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A fake Transaction struct we can use as a simulated payload
|
|
||||||
#[add_crdt_fields]
|
|
||||||
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
|
|
||||||
pub(crate) struct Transaction {
|
|
||||||
from: String,
|
|
||||||
to: String,
|
|
||||||
amount: f64,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn new(side_dir: PathBuf) -> (BaseCrdt<CrdtList>, Ed25519KeyPair) {
|
|
||||||
let keys = keys::load_from_file(side_dir);
|
|
||||||
let bft_crdt = BaseCrdt::<CrdtList>::new(&keys);
|
|
||||||
println!("Author is {}", keys.public().to_string());
|
|
||||||
(bft_crdt, keys)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn send(
|
|
||||||
bft_crdt: &mut BaseCrdt<CrdtList>,
|
|
||||||
keys: &Ed25519KeyPair,
|
|
||||||
) -> bft_json_crdt::json_crdt::SignedOp {
|
|
||||||
// generate a placeholder transaction
|
|
||||||
let transaction = generate_transaction(keys.public().to_string());
|
|
||||||
|
|
||||||
// next job is to keep adding to this guy
|
|
||||||
let next = bft_crdt.doc.list.ops.len();
|
|
||||||
let signed_op = bft_crdt
|
|
||||||
.doc
|
|
||||||
.list
|
|
||||||
.insert_idx(next - 1, transaction.clone())
|
|
||||||
.sign(&keys);
|
|
||||||
signed_op
|
|
||||||
}
|
|
||||||
|
|
||||||
fn generate_transaction(pubkey: String) -> Value {
|
|
||||||
json!({
|
|
||||||
"from": pubkey,
|
|
||||||
"to": "Bob",
|
|
||||||
"amount": 1
|
|
||||||
})
|
|
||||||
}
|
|
||||||
@@ -1,9 +1,16 @@
|
|||||||
|
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
|
||||||
use cli::{parse_args, Commands};
|
use cli::{parse_args, Commands};
|
||||||
|
use crdt::TransactionList;
|
||||||
|
use node::SideNode;
|
||||||
|
use tokio::{sync::mpsc, task};
|
||||||
|
use websocket::WebSocketClient;
|
||||||
|
|
||||||
pub(crate) mod cli;
|
pub(crate) mod cli;
|
||||||
|
pub(crate) mod crdt;
|
||||||
pub(crate) mod init;
|
pub(crate) mod init;
|
||||||
pub(crate) mod keys;
|
pub(crate) mod keys;
|
||||||
pub(crate) mod list_transaction_crdt;
|
pub(crate) mod node;
|
||||||
|
pub(crate) mod stdin;
|
||||||
pub(crate) mod utils;
|
pub(crate) mod utils;
|
||||||
pub(crate) mod websocket;
|
pub(crate) mod websocket;
|
||||||
|
|
||||||
@@ -17,20 +24,33 @@ async fn main() {
|
|||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let _ = init::init(home(name), config);
|
let _ = init::init(utils::home(name), config);
|
||||||
}
|
}
|
||||||
Some(Commands::Run { name }) => {
|
Some(Commands::Run { name }) => {
|
||||||
let side_dir = home(name);
|
let mut node = setup(name).await;
|
||||||
let (mut bft_crdt, keys) = list_transaction_crdt::new(side_dir);
|
node.start().await;
|
||||||
websocket::start(keys, &mut bft_crdt).await;
|
|
||||||
}
|
}
|
||||||
None => println!("No command provided. Exiting. See --help for more information."),
|
None => println!("No command provided. Exiting. See --help for more information."),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn home(name: &String) -> std::path::PathBuf {
|
/// Wire everything up outside the application so we can test more easily later
|
||||||
let mut path = dirs::home_dir().unwrap();
|
async fn setup(name: &String) -> SideNode {
|
||||||
path.push(".side");
|
// First, load up the keys and create a bft-crdt
|
||||||
path.push(name);
|
let side_dir = utils::home(name);
|
||||||
path
|
let keys = keys::load_from_file(side_dir);
|
||||||
|
let crdt = BaseCrdt::<TransactionList>::new(&keys);
|
||||||
|
|
||||||
|
// Channels for internal communication, and a tokio task for stdin input
|
||||||
|
let (incoming_sender, incoming_receiver) = mpsc::channel::<SignedOp>(32);
|
||||||
|
let (stdin_sender, stdin_receiver) = std::sync::mpsc::channel();
|
||||||
|
task::spawn(async move {
|
||||||
|
stdin::input(stdin_sender);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Finally, create the node and return it
|
||||||
|
let handle = WebSocketClient::new(incoming_sender).await;
|
||||||
|
let node = SideNode::new(crdt, keys, incoming_receiver, stdin_receiver, handle);
|
||||||
|
println!("Node setup complete.");
|
||||||
|
node
|
||||||
}
|
}
|
||||||
|
|||||||
91
side-node/src/node.rs
Normal file
91
side-node/src/node.rs
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
|
||||||
|
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
use crate::{crdt::TransactionList, utils, websocket::WebSocketClient};
|
||||||
|
|
||||||
|
pub(crate) struct SideNode {
|
||||||
|
crdt: BaseCrdt<TransactionList>,
|
||||||
|
keys: fastcrypto::ed25519::Ed25519KeyPair,
|
||||||
|
incoming_receiver: mpsc::Receiver<SignedOp>,
|
||||||
|
stdin_receiver: std::sync::mpsc::Receiver<String>,
|
||||||
|
handle: ezsockets::Client<WebSocketClient>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SideNode {
|
||||||
|
pub(crate) fn new(
|
||||||
|
crdt: BaseCrdt<TransactionList>,
|
||||||
|
keys: Ed25519KeyPair,
|
||||||
|
incoming_receiver: mpsc::Receiver<SignedOp>,
|
||||||
|
stdin_receiver: std::sync::mpsc::Receiver<String>,
|
||||||
|
handle: ezsockets::Client<WebSocketClient>,
|
||||||
|
) -> Self {
|
||||||
|
let node = Self {
|
||||||
|
crdt,
|
||||||
|
keys,
|
||||||
|
incoming_receiver,
|
||||||
|
stdin_receiver,
|
||||||
|
handle,
|
||||||
|
};
|
||||||
|
node
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn start(&mut self) {
|
||||||
|
println!("Starting node...");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.stdin_receiver.try_recv() {
|
||||||
|
Ok(stdin) => {
|
||||||
|
println!("Received stdin input: {:?}", stdin);
|
||||||
|
let transaction = utils::fake_transaction(stdin);
|
||||||
|
let json = serde_json::to_value(transaction).unwrap();
|
||||||
|
let signed_op = self.add_transaction_local(json);
|
||||||
|
self.send_to_network(signed_op).await;
|
||||||
|
}
|
||||||
|
Err(_) => {} // ignore empty channel errors in this PoC
|
||||||
|
}
|
||||||
|
match self.incoming_receiver.try_recv() {
|
||||||
|
Ok(incoming) => {
|
||||||
|
self.handle_incoming(&incoming);
|
||||||
|
}
|
||||||
|
Err(_) => {} // ignore empty channel errors in this PoC
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_to_network(&self, signed_op: SignedOp) {
|
||||||
|
println!("sending to network: {:?}", signed_op);
|
||||||
|
let to_send = serde_json::to_string(&signed_op).unwrap();
|
||||||
|
self.handle.call(to_send).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_incoming(&mut self, incoming: &SignedOp) {
|
||||||
|
println!("WINNNINGINGINGINGINGIGNIGN");
|
||||||
|
self.crdt.apply(incoming.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn add_transaction_local(
|
||||||
|
&mut self,
|
||||||
|
transaction: serde_json::Value,
|
||||||
|
) -> bft_json_crdt::json_crdt::SignedOp {
|
||||||
|
let last = self
|
||||||
|
.crdt
|
||||||
|
.doc
|
||||||
|
.list
|
||||||
|
.ops
|
||||||
|
.last()
|
||||||
|
.expect("couldn't find last op");
|
||||||
|
let signed_op = self
|
||||||
|
.crdt
|
||||||
|
.doc
|
||||||
|
.list
|
||||||
|
.insert(last.id, transaction)
|
||||||
|
.sign(&self.keys);
|
||||||
|
signed_op
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Print the current state of the CRDT, can be used to debug
|
||||||
|
pub(crate) fn _trace_crdt(&self) {
|
||||||
|
println!("{:?}", self.crdt.doc.list);
|
||||||
|
}
|
||||||
|
}
|
||||||
12
side-node/src/stdin.rs
Normal file
12
side-node/src/stdin.rs
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
use std::io::BufRead;
|
||||||
|
|
||||||
|
/// Wait for stdin terminal input and send it to the node if any arrives
|
||||||
|
pub(crate) fn input(stdin_sender: std::sync::mpsc::Sender<String>) {
|
||||||
|
let stdin = std::io::stdin();
|
||||||
|
let lines = stdin.lock().lines();
|
||||||
|
for line in lines {
|
||||||
|
println!("We're in stdin_input");
|
||||||
|
let line = line.unwrap();
|
||||||
|
stdin_sender.send(line).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,9 +1,11 @@
|
|||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use serde_json::{json, Value};
|
||||||
|
|
||||||
pub(crate) const KEY_FILE: &str = "keys.pem";
|
pub(crate) const KEY_FILE: &str = "keys.pem";
|
||||||
pub(crate) const CONFIG_FILE: &str = "config.toml";
|
pub(crate) const CONFIG_FILE: &str = "config.toml";
|
||||||
|
|
||||||
/// Returns the path to the key file for this host OS.
|
/// Returns the path to the key file and config for this host OS.
|
||||||
pub(crate) fn side_paths(prefix: PathBuf) -> (PathBuf, PathBuf) {
|
pub(crate) fn side_paths(prefix: PathBuf) -> (PathBuf, PathBuf) {
|
||||||
let mut key_path = prefix.clone();
|
let mut key_path = prefix.clone();
|
||||||
key_path.push(KEY_FILE);
|
key_path.push(KEY_FILE);
|
||||||
@@ -13,3 +15,19 @@ pub(crate) fn side_paths(prefix: PathBuf) -> (PathBuf, PathBuf) {
|
|||||||
|
|
||||||
(key_path, config_path)
|
(key_path, config_path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn home(name: &String) -> std::path::PathBuf {
|
||||||
|
let mut path = dirs::home_dir().unwrap();
|
||||||
|
path.push(".side");
|
||||||
|
path.push(name);
|
||||||
|
path
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generate a fake transaction with customizable from_pubkey String
|
||||||
|
pub(crate) fn fake_transaction(from_pubkey: String) -> Value {
|
||||||
|
json!({
|
||||||
|
"from": from_pubkey,
|
||||||
|
"to": "Bob",
|
||||||
|
"amount": 1
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
57
side-node/src/websocket.rs
Normal file
57
side-node/src/websocket.rs
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use bft_json_crdt::json_crdt::SignedOp;
|
||||||
|
use ezsockets::ClientConfig;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
pub(crate) struct WebSocketClient {
|
||||||
|
incoming_sender: mpsc::Sender<SignedOp>,
|
||||||
|
handle: ezsockets::Client<WebSocketClient>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WebSocketClient {
|
||||||
|
/// Start the websocket client
|
||||||
|
pub(crate) async fn new(
|
||||||
|
incoming_sender: mpsc::Sender<SignedOp>,
|
||||||
|
) -> ezsockets::Client<WebSocketClient> {
|
||||||
|
tracing_subscriber::fmt::init();
|
||||||
|
let config = ClientConfig::new("ws://localhost:8080/websocket");
|
||||||
|
let (handle, future) = ezsockets::connect(
|
||||||
|
|client| WebSocketClient {
|
||||||
|
incoming_sender,
|
||||||
|
handle: client,
|
||||||
|
},
|
||||||
|
config,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
future.await.unwrap();
|
||||||
|
});
|
||||||
|
handle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ezsockets::ClientExt for WebSocketClient {
|
||||||
|
// Right now we're only using the Call type for sending signed ops
|
||||||
|
// change this to an enum if we need to send other types of calls, and
|
||||||
|
// match on it.
|
||||||
|
type Call = String;
|
||||||
|
|
||||||
|
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
|
||||||
|
tracing::info!("received text: {text:?}");
|
||||||
|
let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap();
|
||||||
|
self.incoming_sender.send(incoming).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn on_binary(&mut self, bytes: Vec<u8>) -> Result<(), ezsockets::Error> {
|
||||||
|
tracing::info!("received bytes: {bytes:?}");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> {
|
||||||
|
tracing::info!("sending signed op: {call:?}");
|
||||||
|
self.handle.text(call)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,60 +0,0 @@
|
|||||||
/*
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let author = general_purpose::STANDARD.encode(&incoming_operation.author());
|
|
||||||
bft_crdt.apply(incoming_operation.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
*/
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use bft_json_crdt::json_crdt::BaseCrdt;
|
|
||||||
use ezsockets::ClientConfig;
|
|
||||||
use fastcrypto::ed25519::Ed25519KeyPair;
|
|
||||||
use std::io::BufRead;
|
|
||||||
|
|
||||||
use crate::list_transaction_crdt::{self, CrdtList};
|
|
||||||
|
|
||||||
struct Client {}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl ezsockets::ClientExt for Client {
|
|
||||||
type Call = ();
|
|
||||||
|
|
||||||
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
|
|
||||||
tracing::info!("received message: {text}");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn on_binary(&mut self, bytes: Vec<u8>) -> Result<(), ezsockets::Error> {
|
|
||||||
tracing::info!("received bytes: {bytes:?}");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> {
|
|
||||||
let () = call;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn start(keys: Ed25519KeyPair, bft_crdt: &mut BaseCrdt<CrdtList>) {
|
|
||||||
tracing_subscriber::fmt::init();
|
|
||||||
let config = ClientConfig::new("ws://localhost:8080/websocket");
|
|
||||||
let (handle, future) = ezsockets::connect(|_client| Client {}, config).await;
|
|
||||||
tokio::spawn(async move {
|
|
||||||
future.await.unwrap();
|
|
||||||
});
|
|
||||||
let stdin = std::io::stdin();
|
|
||||||
let lines = stdin.lock().lines();
|
|
||||||
for line in lines {
|
|
||||||
let line = line.unwrap();
|
|
||||||
let signed_op = if let "exit" = line.as_str() {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
let op = list_transaction_crdt::send(bft_crdt, &keys);
|
|
||||||
op.to_string()
|
|
||||||
};
|
|
||||||
tracing::info!("sending {:?}", signed_op);
|
|
||||||
handle.text(signed_op).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user