Compare commits
21 Commits
ezsockets
...
feature/ez
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
546a45bb3a | ||
|
|
950a63c103 | ||
|
|
f9c4fce398 | ||
|
|
014462c187 | ||
|
|
b1daec3b84 | ||
|
|
e0c991d0f9 | ||
|
|
a53b5bd94c | ||
|
|
d13df41b82 | ||
|
|
4496a0916b | ||
|
|
f3bea8c62d | ||
|
|
443c4e1dac | ||
|
|
6077c3a519 | ||
|
|
91fbe7f9bd | ||
|
|
4717ffa7e8 | ||
|
|
c3f5b2890b | ||
|
|
9dc515fb78 | ||
|
|
6f756d4fb6 | ||
|
|
5d6a1e806a | ||
|
|
d91a631fdc | ||
|
|
a81d1f913a | ||
|
|
b1f5d2b75a |
@@ -123,12 +123,6 @@ pub struct SignedOp {
|
||||
pub depends_on: Vec<SignedDigest>,
|
||||
}
|
||||
|
||||
impl Display for SignedOp {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "SignedOp({})", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl SignedOp {
|
||||
pub fn id(&self) -> OpId {
|
||||
self.inner.id
|
||||
|
||||
34
side-node/src/crdt.rs
Normal file
34
side-node/src/crdt.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
use bft_crdt_derive::add_crdt_fields;
|
||||
use bft_json_crdt::{
|
||||
json_crdt::{CrdtNode, IntoCrdtNode},
|
||||
list_crdt::ListCrdt,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[add_crdt_fields]
|
||||
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
|
||||
pub(crate) struct TransactionList {
|
||||
pub(crate) list: ListCrdt<Transaction>,
|
||||
}
|
||||
|
||||
/// A fake Transaction struct we can use as a simulated payload
|
||||
#[add_crdt_fields]
|
||||
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
|
||||
pub(crate) struct Transaction {
|
||||
from: String,
|
||||
to: String,
|
||||
amount: f64,
|
||||
}
|
||||
|
||||
// impl TransactionList {
|
||||
// fn create(&mut self, keys: &Ed25519KeyPair) -> bft_json_crdt::json_crdt::SignedOp {
|
||||
// // generate a placeholder transaction
|
||||
// let transaction = _fake_transaction(keys.public().to_string());
|
||||
|
||||
// // next job is to keep adding to this guy
|
||||
// let last: &Op<Transaction>;
|
||||
// last = self.list.ops.last().expect("couldn't find last op");
|
||||
// let signed_op = self.list.insert(last.id, transaction.clone()).sign(&keys);
|
||||
// signed_op
|
||||
// }
|
||||
// }
|
||||
@@ -10,7 +10,7 @@ pub(crate) struct SideNodeConfig {
|
||||
pub(crate) name: String,
|
||||
}
|
||||
|
||||
pub(crate) fn write(
|
||||
pub(crate) fn write_toml(
|
||||
config: &SideNodeConfig,
|
||||
file_path: &PathBuf,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
@@ -14,7 +14,7 @@ pub(crate) fn init(home: PathBuf, config: SideNodeConfig) -> Result<(), std::io:
|
||||
keys::write(key_path)?;
|
||||
|
||||
println!("Writing config to: {:?}", config_path);
|
||||
config::write(&config, &config_path).expect("unable to write config file");
|
||||
config::write_toml(&config, &config_path).expect("unable to write config file");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,61 +0,0 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use bft_crdt_derive::add_crdt_fields;
|
||||
|
||||
use bft_json_crdt::{
|
||||
json_crdt::{BaseCrdt, CrdtNode, IntoCrdtNode},
|
||||
keypair::{Ed25519KeyPair, KeyPair},
|
||||
list_crdt::ListCrdt,
|
||||
};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use crate::keys;
|
||||
|
||||
#[add_crdt_fields]
|
||||
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
|
||||
pub(crate) struct CrdtList {
|
||||
pub(crate) list: ListCrdt<Transaction>, // switch to Transaction as soon as char is working
|
||||
}
|
||||
|
||||
/// A fake Transaction struct we can use as a simulated payload
|
||||
#[add_crdt_fields]
|
||||
#[derive(Clone, CrdtNode, Serialize, Deserialize)]
|
||||
pub(crate) struct Transaction {
|
||||
from: String,
|
||||
to: String,
|
||||
amount: f64,
|
||||
}
|
||||
|
||||
pub(crate) fn new(side_dir: PathBuf) -> (BaseCrdt<CrdtList>, Ed25519KeyPair) {
|
||||
let keys = keys::load_from_file(side_dir);
|
||||
let bft_crdt = BaseCrdt::<CrdtList>::new(&keys);
|
||||
println!("Author is {}", keys.public().to_string());
|
||||
(bft_crdt, keys)
|
||||
}
|
||||
|
||||
pub(crate) fn send(
|
||||
bft_crdt: &mut BaseCrdt<CrdtList>,
|
||||
keys: &Ed25519KeyPair,
|
||||
) -> bft_json_crdt::json_crdt::SignedOp {
|
||||
// generate a placeholder transaction
|
||||
let transaction = generate_transaction(keys.public().to_string());
|
||||
|
||||
// next job is to keep adding to this guy
|
||||
let next = bft_crdt.doc.list.ops.len();
|
||||
let signed_op = bft_crdt
|
||||
.doc
|
||||
.list
|
||||
.insert_idx(next - 1, transaction.clone())
|
||||
.sign(&keys);
|
||||
signed_op
|
||||
}
|
||||
|
||||
fn generate_transaction(pubkey: String) -> Value {
|
||||
json!({
|
||||
"from": pubkey,
|
||||
"to": "Bob",
|
||||
"amount": 1
|
||||
})
|
||||
}
|
||||
@@ -1,9 +1,16 @@
|
||||
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
|
||||
use cli::{parse_args, Commands};
|
||||
use crdt::TransactionList;
|
||||
use node::SideNode;
|
||||
use tokio::{sync::mpsc, task};
|
||||
use websocket::WebSocketClient;
|
||||
|
||||
pub(crate) mod cli;
|
||||
pub(crate) mod crdt;
|
||||
pub(crate) mod init;
|
||||
pub(crate) mod keys;
|
||||
pub(crate) mod list_transaction_crdt;
|
||||
pub(crate) mod node;
|
||||
pub(crate) mod stdin;
|
||||
pub(crate) mod utils;
|
||||
pub(crate) mod websocket;
|
||||
|
||||
@@ -17,20 +24,33 @@ async fn main() {
|
||||
name: name.to_string(),
|
||||
};
|
||||
|
||||
let _ = init::init(home(name), config);
|
||||
let _ = init::init(utils::home(name), config);
|
||||
}
|
||||
Some(Commands::Run { name }) => {
|
||||
let side_dir = home(name);
|
||||
let (mut bft_crdt, keys) = list_transaction_crdt::new(side_dir);
|
||||
websocket::start(keys, &mut bft_crdt).await;
|
||||
let mut node = setup(name).await;
|
||||
node.start().await;
|
||||
}
|
||||
None => println!("No command provided. Exiting. See --help for more information."),
|
||||
}
|
||||
}
|
||||
|
||||
fn home(name: &String) -> std::path::PathBuf {
|
||||
let mut path = dirs::home_dir().unwrap();
|
||||
path.push(".side");
|
||||
path.push(name);
|
||||
path
|
||||
/// Wire everything up outside the application so we can test more easily later
|
||||
async fn setup(name: &String) -> SideNode {
|
||||
// First, load up the keys and create a bft-crdt
|
||||
let side_dir = utils::home(name);
|
||||
let keys = keys::load_from_file(side_dir);
|
||||
let crdt = BaseCrdt::<TransactionList>::new(&keys);
|
||||
|
||||
// Channels for internal communication, and a tokio task for stdin input
|
||||
let (incoming_sender, incoming_receiver) = mpsc::channel::<SignedOp>(32);
|
||||
let (stdin_sender, stdin_receiver) = std::sync::mpsc::channel();
|
||||
task::spawn(async move {
|
||||
stdin::input(stdin_sender);
|
||||
});
|
||||
|
||||
// Finally, create the node and return it
|
||||
let handle = WebSocketClient::new(incoming_sender).await;
|
||||
let node = SideNode::new(crdt, keys, incoming_receiver, stdin_receiver, handle);
|
||||
println!("Node setup complete.");
|
||||
node
|
||||
}
|
||||
|
||||
91
side-node/src/node.rs
Normal file
91
side-node/src/node.rs
Normal file
@@ -0,0 +1,91 @@
|
||||
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
|
||||
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::{crdt::TransactionList, utils, websocket::WebSocketClient};
|
||||
|
||||
pub(crate) struct SideNode {
|
||||
crdt: BaseCrdt<TransactionList>,
|
||||
keys: fastcrypto::ed25519::Ed25519KeyPair,
|
||||
incoming_receiver: mpsc::Receiver<SignedOp>,
|
||||
stdin_receiver: std::sync::mpsc::Receiver<String>,
|
||||
handle: ezsockets::Client<WebSocketClient>,
|
||||
}
|
||||
|
||||
impl SideNode {
|
||||
pub(crate) fn new(
|
||||
crdt: BaseCrdt<TransactionList>,
|
||||
keys: Ed25519KeyPair,
|
||||
incoming_receiver: mpsc::Receiver<SignedOp>,
|
||||
stdin_receiver: std::sync::mpsc::Receiver<String>,
|
||||
handle: ezsockets::Client<WebSocketClient>,
|
||||
) -> Self {
|
||||
let node = Self {
|
||||
crdt,
|
||||
keys,
|
||||
incoming_receiver,
|
||||
stdin_receiver,
|
||||
handle,
|
||||
};
|
||||
node
|
||||
}
|
||||
|
||||
pub(crate) async fn start(&mut self) {
|
||||
println!("Starting node...");
|
||||
|
||||
loop {
|
||||
match self.stdin_receiver.try_recv() {
|
||||
Ok(stdin) => {
|
||||
println!("Received stdin input: {:?}", stdin);
|
||||
let transaction = utils::fake_transaction(stdin);
|
||||
let json = serde_json::to_value(transaction).unwrap();
|
||||
let signed_op = self.add_transaction_local(json);
|
||||
self.send_to_network(signed_op).await;
|
||||
}
|
||||
Err(_) => {} // ignore empty channel errors in this PoC
|
||||
}
|
||||
match self.incoming_receiver.try_recv() {
|
||||
Ok(incoming) => {
|
||||
self.handle_incoming(&incoming);
|
||||
}
|
||||
Err(_) => {} // ignore empty channel errors in this PoC
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_to_network(&self, signed_op: SignedOp) {
|
||||
println!("sending to network: {:?}", signed_op);
|
||||
let to_send = serde_json::to_string(&signed_op).unwrap();
|
||||
self.handle.call(to_send).unwrap();
|
||||
}
|
||||
|
||||
fn handle_incoming(&mut self, incoming: &SignedOp) {
|
||||
println!("WINNNINGINGINGINGINGIGNIGN");
|
||||
self.crdt.apply(incoming.clone());
|
||||
}
|
||||
|
||||
pub(crate) fn add_transaction_local(
|
||||
&mut self,
|
||||
transaction: serde_json::Value,
|
||||
) -> bft_json_crdt::json_crdt::SignedOp {
|
||||
let last = self
|
||||
.crdt
|
||||
.doc
|
||||
.list
|
||||
.ops
|
||||
.last()
|
||||
.expect("couldn't find last op");
|
||||
let signed_op = self
|
||||
.crdt
|
||||
.doc
|
||||
.list
|
||||
.insert(last.id, transaction)
|
||||
.sign(&self.keys);
|
||||
signed_op
|
||||
}
|
||||
|
||||
/// Print the current state of the CRDT, can be used to debug
|
||||
pub(crate) fn _trace_crdt(&self) {
|
||||
println!("{:?}", self.crdt.doc.list);
|
||||
}
|
||||
}
|
||||
12
side-node/src/stdin.rs
Normal file
12
side-node/src/stdin.rs
Normal file
@@ -0,0 +1,12 @@
|
||||
use std::io::BufRead;
|
||||
|
||||
/// Wait for stdin terminal input and send it to the node if any arrives
|
||||
pub(crate) fn input(stdin_sender: std::sync::mpsc::Sender<String>) {
|
||||
let stdin = std::io::stdin();
|
||||
let lines = stdin.lock().lines();
|
||||
for line in lines {
|
||||
println!("We're in stdin_input");
|
||||
let line = line.unwrap();
|
||||
stdin_sender.send(line).unwrap();
|
||||
}
|
||||
}
|
||||
@@ -1,9 +1,11 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use serde_json::{json, Value};
|
||||
|
||||
pub(crate) const KEY_FILE: &str = "keys.pem";
|
||||
pub(crate) const CONFIG_FILE: &str = "config.toml";
|
||||
|
||||
/// Returns the path to the key file for this host OS.
|
||||
/// Returns the path to the key file and config for this host OS.
|
||||
pub(crate) fn side_paths(prefix: PathBuf) -> (PathBuf, PathBuf) {
|
||||
let mut key_path = prefix.clone();
|
||||
key_path.push(KEY_FILE);
|
||||
@@ -13,3 +15,19 @@ pub(crate) fn side_paths(prefix: PathBuf) -> (PathBuf, PathBuf) {
|
||||
|
||||
(key_path, config_path)
|
||||
}
|
||||
|
||||
pub(crate) fn home(name: &String) -> std::path::PathBuf {
|
||||
let mut path = dirs::home_dir().unwrap();
|
||||
path.push(".side");
|
||||
path.push(name);
|
||||
path
|
||||
}
|
||||
|
||||
/// Generate a fake transaction with customizable from_pubkey String
|
||||
pub(crate) fn fake_transaction(from_pubkey: String) -> Value {
|
||||
json!({
|
||||
"from": from_pubkey,
|
||||
"to": "Bob",
|
||||
"amount": 1
|
||||
})
|
||||
}
|
||||
|
||||
57
side-node/src/websocket.rs
Normal file
57
side-node/src/websocket.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use async_trait::async_trait;
|
||||
use bft_json_crdt::json_crdt::SignedOp;
|
||||
use ezsockets::ClientConfig;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub(crate) struct WebSocketClient {
|
||||
incoming_sender: mpsc::Sender<SignedOp>,
|
||||
handle: ezsockets::Client<WebSocketClient>,
|
||||
}
|
||||
|
||||
impl WebSocketClient {
|
||||
/// Start the websocket client
|
||||
pub(crate) async fn new(
|
||||
incoming_sender: mpsc::Sender<SignedOp>,
|
||||
) -> ezsockets::Client<WebSocketClient> {
|
||||
tracing_subscriber::fmt::init();
|
||||
let config = ClientConfig::new("ws://localhost:8080/websocket");
|
||||
let (handle, future) = ezsockets::connect(
|
||||
|client| WebSocketClient {
|
||||
incoming_sender,
|
||||
handle: client,
|
||||
},
|
||||
config,
|
||||
)
|
||||
.await;
|
||||
tokio::spawn(async move {
|
||||
future.await.unwrap();
|
||||
});
|
||||
handle
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ezsockets::ClientExt for WebSocketClient {
|
||||
// Right now we're only using the Call type for sending signed ops
|
||||
// change this to an enum if we need to send other types of calls, and
|
||||
// match on it.
|
||||
type Call = String;
|
||||
|
||||
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
|
||||
tracing::info!("received text: {text:?}");
|
||||
let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap();
|
||||
self.incoming_sender.send(incoming).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_binary(&mut self, bytes: Vec<u8>) -> Result<(), ezsockets::Error> {
|
||||
tracing::info!("received bytes: {bytes:?}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> {
|
||||
tracing::info!("sending signed op: {call:?}");
|
||||
self.handle.text(call)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,60 +0,0 @@
|
||||
/*
|
||||
|
||||
loop {
|
||||
let author = general_purpose::STANDARD.encode(&incoming_operation.author());
|
||||
bft_crdt.apply(incoming_operation.clone());
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bft_json_crdt::json_crdt::BaseCrdt;
|
||||
use ezsockets::ClientConfig;
|
||||
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||
use std::io::BufRead;
|
||||
|
||||
use crate::list_transaction_crdt::{self, CrdtList};
|
||||
|
||||
struct Client {}
|
||||
|
||||
#[async_trait]
|
||||
impl ezsockets::ClientExt for Client {
|
||||
type Call = ();
|
||||
|
||||
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
|
||||
tracing::info!("received message: {text}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_binary(&mut self, bytes: Vec<u8>) -> Result<(), ezsockets::Error> {
|
||||
tracing::info!("received bytes: {bytes:?}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> {
|
||||
let () = call;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn start(keys: Ed25519KeyPair, bft_crdt: &mut BaseCrdt<CrdtList>) {
|
||||
tracing_subscriber::fmt::init();
|
||||
let config = ClientConfig::new("ws://localhost:8080/websocket");
|
||||
let (handle, future) = ezsockets::connect(|_client| Client {}, config).await;
|
||||
tokio::spawn(async move {
|
||||
future.await.unwrap();
|
||||
});
|
||||
let stdin = std::io::stdin();
|
||||
let lines = stdin.lock().lines();
|
||||
for line in lines {
|
||||
let line = line.unwrap();
|
||||
let signed_op = if let "exit" = line.as_str() {
|
||||
break;
|
||||
} else {
|
||||
let op = list_transaction_crdt::send(bft_crdt, &keys);
|
||||
op.to_string()
|
||||
};
|
||||
tracing::info!("sending {:?}", signed_op);
|
||||
handle.text(signed_op).unwrap();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user