WIP: hash inequality seems to be happening from something on the wire

This commit is contained in:
Dave Hrycyszyn
2024-06-18 10:17:59 +01:00
parent e9870241cb
commit 416d1ad88b
8 changed files with 92 additions and 21 deletions

4
Cargo.lock generated
View File

@@ -2074,6 +2074,10 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"ezsockets", "ezsockets",
"serde",
"serde_json",
"sha256",
"side-node",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",

View File

@@ -212,7 +212,7 @@ impl<T: CrdtNode + DebugView> BaseCrdt<T> {
/// Apply a signed operation to this BaseCRDT, verifying integrity and routing to the right /// Apply a signed operation to this BaseCRDT, verifying integrity and routing to the right
/// nested CRDT /// nested CRDT
pub fn apply(&mut self, op: SignedOp) -> OpState { pub fn apply(&mut self, op: SignedOp) -> OpState {
self.log_try_apply(&op); // self.log_try_apply(&op);
#[cfg(feature = "bft")] #[cfg(feature = "bft")]
if !op.is_valid_digest() { if !op.is_valid_digest() {
@@ -232,9 +232,9 @@ impl<T: CrdtNode + DebugView> BaseCrdt<T> {
} }
// apply // apply
self.log_actually_apply(&op); // self.log_actually_apply(&op);
let status = self.doc.apply(op.inner); let status = self.doc.apply(op.inner);
self.debug_view(); // self.debug_view();
self.received.insert(op_id); self.received.insert(op_id);
// apply all of its causal dependents if there are any // apply all of its causal dependents if there are any

View File

@@ -8,11 +8,11 @@ use websocket::WebSocketClient;
pub(crate) mod cli; pub(crate) mod cli;
pub mod crdt; pub mod crdt;
pub(crate) mod init; pub(crate) mod init;
pub(crate) mod keys; pub mod keys;
pub(crate) mod node; pub mod node;
pub(crate) mod stdin; pub(crate) mod stdin;
pub mod utils; pub mod utils;
pub(crate) mod websocket; pub mod websocket;
#[tokio::main] #[tokio::main]
pub async fn run() { pub async fn run() {

View File

@@ -4,7 +4,7 @@ use tokio::sync::mpsc;
use crate::{crdt::TransactionList, utils, websocket::WebSocketClient}; use crate::{crdt::TransactionList, utils, websocket::WebSocketClient};
pub(crate) struct SideNode { pub struct SideNode {
crdt: BaseCrdt<TransactionList>, crdt: BaseCrdt<TransactionList>,
keys: fastcrypto::ed25519::Ed25519KeyPair, keys: fastcrypto::ed25519::Ed25519KeyPair,
incoming_receiver: mpsc::Receiver<SignedOp>, incoming_receiver: mpsc::Receiver<SignedOp>,
@@ -13,7 +13,7 @@ pub(crate) struct SideNode {
} }
impl SideNode { impl SideNode {
pub(crate) fn new( pub fn new(
crdt: BaseCrdt<TransactionList>, crdt: BaseCrdt<TransactionList>,
keys: Ed25519KeyPair, keys: Ed25519KeyPair,
incoming_receiver: mpsc::Receiver<SignedOp>, incoming_receiver: mpsc::Receiver<SignedOp>,
@@ -39,12 +39,14 @@ impl SideNode {
let transaction = utils::fake_transaction_json(stdin); let transaction = utils::fake_transaction_json(stdin);
let json = serde_json::to_value(transaction).unwrap(); let json = serde_json::to_value(transaction).unwrap();
let signed_op = self.add_transaction_local(json); let signed_op = self.add_transaction_local(json);
println!("STDIN: {}", shappy(signed_op.clone()));
self.send_to_network(signed_op).await; self.send_to_network(signed_op).await;
} }
Err(_) => {} // ignore empty channel errors in this PoC Err(_) => {} // ignore empty channel errors in this PoC
} }
match self.incoming_receiver.try_recv() { match self.incoming_receiver.try_recv() {
Ok(incoming) => { Ok(incoming) => {
println!("INCOMING");
self.handle_incoming(incoming); self.handle_incoming(incoming);
} }
Err(_) => {} // ignore empty channel errors in this PoC Err(_) => {} // ignore empty channel errors in this PoC
@@ -57,12 +59,13 @@ impl SideNode {
self.handle.call(to_send).unwrap(); self.handle.call(to_send).unwrap();
} }
fn handle_incoming(&mut self, incoming: SignedOp) { pub fn handle_incoming(&mut self, incoming: SignedOp) {
println!("handle_incoming: {}", shappy(incoming.clone()));
self.crdt.apply(incoming); self.crdt.apply(incoming);
self.trace_crdt(); // self.trace_crdt();
} }
pub(crate) fn add_transaction_local( pub fn add_transaction_local(
&mut self, &mut self,
transaction: serde_json::Value, transaction: serde_json::Value,
) -> bft_json_crdt::json_crdt::SignedOp { ) -> bft_json_crdt::json_crdt::SignedOp {
@@ -79,12 +82,21 @@ impl SideNode {
.list .list
.insert(last.id, transaction) .insert(last.id, transaction)
.sign(&self.keys); .sign(&self.keys);
self.trace_crdt(); // self.trace_crdt();
signed_op signed_op
} }
/// Print the current state of the CRDT, can be used to debug /// Print the current state of the CRDT, can be used to debug
pub(crate) fn trace_crdt(&self) { pub fn trace_crdt(&self) {
println!("{:?}", self.crdt.doc.view_sha()); println!("{:?}", self.crdt.doc.view_sha());
} }
pub fn current_sha(&self) -> String {
self.crdt.doc.view_sha()
}
}
fn shappy(op: SignedOp) -> String {
let b = serde_json::to_string(&op).unwrap().into_bytes();
sha256::digest(b).to_string()
} }

View File

@@ -3,17 +3,16 @@ use bft_json_crdt::json_crdt::SignedOp;
use ezsockets::ClientConfig; use ezsockets::ClientConfig;
use tokio::sync::mpsc; use tokio::sync::mpsc;
pub(crate) struct WebSocketClient { pub struct WebSocketClient {
incoming_sender: mpsc::Sender<SignedOp>, incoming_sender: mpsc::Sender<SignedOp>,
handle: ezsockets::Client<WebSocketClient>, handle: ezsockets::Client<WebSocketClient>,
} }
impl WebSocketClient { impl WebSocketClient {
/// Start the websocket client /// Start the websocket client
pub(crate) async fn new( pub async fn new(
incoming_sender: mpsc::Sender<SignedOp>, incoming_sender: mpsc::Sender<SignedOp>,
) -> ezsockets::Client<WebSocketClient> { ) -> ezsockets::Client<WebSocketClient> {
tracing_subscriber::fmt::init();
let config = ClientConfig::new("ws://localhost:8080/websocket"); let config = ClientConfig::new("ws://localhost:8080/websocket");
let (handle, future) = ezsockets::connect( let (handle, future) = ezsockets::connect(
|client| WebSocketClient { |client| WebSocketClient {
@@ -40,7 +39,6 @@ impl ezsockets::ClientExt for WebSocketClient {
/// When we receive a text message, apply the crdt operation contained in it to our /// When we receive a text message, apply the crdt operation contained in it to our
/// local crdt. /// local crdt.
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> { async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
tracing::info!("received text: {text:?}");
let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap(); let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap();
self.incoming_sender.send(incoming).await?; self.incoming_sender.send(incoming).await?;
Ok(()) Ok(())
@@ -55,7 +53,6 @@ impl ezsockets::ClientExt for WebSocketClient {
/// Call this with the `Call` type to send application data to the websocket client /// Call this with the `Call` type to send application data to the websocket client
/// (and from there, to the server). /// (and from there, to the server).
async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> { async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> {
tracing::info!("sending signed op: {call:?}");
self.handle.text(call)?; self.handle.text(call)?;
Ok(()) Ok(())
} }

View File

@@ -0,0 +1,48 @@
use bft_json_crdt::{
json_crdt::{BaseCrdt, SignedOp},
keypair::make_keypair,
};
use side_node::{crdt::TransactionList, node::SideNode, utils, WebSocketClient};
use tokio::sync::mpsc;
#[tokio::test]
async fn test_distribute_via_websockets() {
let mut node1 = setup("alice").await;
let mut node2 = setup("bob").await;
assert_eq!(node1.current_sha(), node2.current_sha());
let transaction = utils::fake_transaction_json("from_alice".to_string());
let signed_op = node1.add_transaction_local(transaction);
node2.handle_incoming(signed_op);
assert_eq!(node1.current_sha(), node2.current_sha());
let transaction = utils::fake_transaction_json("from_alice2".to_string());
let signed_op = node1.add_transaction_local(transaction);
node2.handle_incoming(signed_op);
assert_eq!(node1.current_sha(), node2.current_sha());
let transaction = utils::fake_transaction_json("from_alice3".to_string());
let signed_op = node1.add_transaction_local(transaction);
node2.handle_incoming(signed_op);
assert_eq!(node1.current_sha(), node2.current_sha());
}
/// Wire everything up, ignoring things we are not using in the test
async fn setup(_: &str) -> SideNode {
// First, load up the keys and create a bft-crdt
let keys = make_keypair();
let crdt = BaseCrdt::<TransactionList>::new(&keys);
// Channels for internal communication, and a tokio task for stdin input
let (incoming_sender, incoming_receiver) = mpsc::channel::<SignedOp>(32);
let (_, stdin_receiver) = std::sync::mpsc::channel();
// Finally, create the node and return it
let handle = WebSocketClient::new(incoming_sender).await;
let node = SideNode::new(crdt, keys, incoming_receiver, stdin_receiver, handle);
node
}

View File

@@ -8,6 +8,10 @@ edition = "2021"
[dependencies] [dependencies]
async-trait = "0.1.52" async-trait = "0.1.52"
ezsockets = { version = "*", features = ["tungstenite"] } ezsockets = { version = "*", features = ["tungstenite"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.117"
sha256 = "1.5.0"
side-node = { path = "../side-node" }
tokio = { version = "1.17.0", features = ["full"] } tokio = { version = "1.17.0", features = ["full"] }
tracing = "0.1.32" tracing = "0.1.32"
tracing-subscriber = "0.3.9" tracing-subscriber = "0.3.9"

View File

@@ -85,12 +85,13 @@ impl ezsockets::ServerExt for ChatServer {
.unzip(); .unzip();
tracing::info!( tracing::info!(
"sending {text} to [{sessions}] at `{room}`", "sending {hash} to [{sessions}] at `{room}`",
sessions = ids sessions = ids
.iter() .iter()
.map(|id| id.to_string()) .map(|id| id.to_string())
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(",") .join(","),
hash = shappy(text.clone())
); );
for session in sessions { for session in sessions {
session.text(text.clone()).unwrap(); session.text(text.clone()).unwrap();
@@ -144,7 +145,7 @@ impl ezsockets::SessionExt for SessionActor {
} }
async fn on_text(&mut self, text: String) -> Result<(), Error> { async fn on_text(&mut self, text: String) -> Result<(), Error> {
tracing::info!("received: {text}"); tracing::info!("received: {}", shappy(text.clone()));
if text.starts_with('/') { if text.starts_with('/') {
let mut args = text.split_whitespace(); let mut args = text.split_whitespace();
let command = args.next().unwrap(); let command = args.next().unwrap();
@@ -183,6 +184,11 @@ impl ezsockets::SessionExt for SessionActor {
} }
} }
fn shappy(text: String) -> String {
let b = text.into_bytes();
sha256::digest(b).to_string()
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
tracing_subscriber::fmt::init(); tracing_subscriber::fmt::init();