Compare commits
12 Commits
7878bb9149
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
10c0c64984 | ||
|
|
3d746a8073 | ||
|
|
2e4510679a | ||
|
|
51dd81e145 | ||
|
|
c528160d34 | ||
|
|
0126614dd3 | ||
|
|
b933f8d6fc | ||
|
|
365cfd7b01 | ||
|
|
79ce80a4a4 | ||
|
|
32d7b62cfe | ||
|
|
97711e2ecf | ||
|
|
e2d50144ca |
30
README.md
30
README.md
@@ -1,12 +1,18 @@
|
||||
# BFT-CRDT PoC
|
||||
|
||||
This is a proof of concept implementation of a [BFT-CRDT](https://jzhao.xyz/posts/bft-json-crdt) blockchain-like system. It is willfully, wildly insecure as a blockchain right now. Think of it as an experiment which is strictly for fun and poking at ideas.
|
||||
This is a proof of concept implementation of a [BFT-CRDT](https://jzhao.xyz/posts/bft-json-crdt) system.
|
||||
|
||||
This code is based on the ideas of [Martin Kleppmann](https://martin.kleppmann.com/papers/bft-crdt-papoc22.pdf) and the ideas and code of [Jacky Zhao](https://jzhao.xyz/). Have a read, they are both excellent writers and have some of the most interesting computing ideas I've run across in quite a while.
|
||||
|
||||
It is not clear what this thing is for, yet. It's not a blockchain. It makes a kind of secure DAG. It uses BFT-CRDTs to make a Sybil-proof and secure information transmission system for messages, with eventual consistency guarantees.
|
||||
|
||||
The idea that it could be possible to set up a secure Sybil-proof system, negating the energy burn required for proof of work, the financially exclusionary proof of stake, or the meat space hassle of a proof of personhood ceremony, is too attractive to ignore. At least, if you're interested in cool P2P systems.
|
||||
Initially I was thinking it could perhaps be used to make a kind of opt-in blockchain, but I don't think it'll work (and reading up on things like e.g. vector clocks, which I had initially thought about for ordering, the literature goes out of its way to note that they can't work in Byzantine environments).
|
||||
|
||||
So if it can't be a blockchain, what can it be? Is it useful at all?
|
||||
|
||||
Potentially, yes. There are lots of things in crypto land which do not necessarily need consensus and/or a Total Global Ordering. Some brainstormed ideas for these are in the `docs/` folder.
|
||||
|
||||
I also wonder: can we use George's insights about blockchain conflicts here? Assume a CRDT based system where participating users have public/private keypairs. The system is initialized with an initial distribution (like a regular blockchain is). Coins change hands when users sign transfers to each other. Is there a way to make such transfers update properly *without* making a block and having a total global ordering?
|
||||
|
||||
## Prerequisites
|
||||
|
||||
@@ -45,6 +51,10 @@ You can then type directly into each of the Crdt Node consoles. Messages will be
|
||||
|
||||
What we have here is a very simple system comprised of two parts: the Crdt Node, and the Crdt Relayer.
|
||||
|
||||
It is pretty cool in the sense that it is actually Sybil-proof. But you can't get a Total Global Ordering out of it, so you can't use it for e.g. account transfers in a blockchain.
|
||||
|
||||
However, there may be other cases that are interesting which we can't see yet: secure distributed filesystems, some kind of Lightning replacement, etc.
|
||||
|
||||
### Crdt Node(s)
|
||||
|
||||
The Crdt Nodes make up a system of BFT-CRDT-producing nodes that can make a sort of wildly insecure blockchain. Currently, they can reliably send transactions to each other in a secure way, such that all nodes they communicate with can tell whether received transactions are obeying the rules of the system.
|
||||
@@ -53,9 +63,9 @@ The Crdt Node does not download any chain state, and if one goes off-line it wil
|
||||
|
||||
### Crdt Relayer
|
||||
|
||||
The Crdt Relayer replicates transactions between nodes using a websocket. We aim to eliminate this component from the architecture, but for the moment it simplifies networking and consensus agreement while we experiment with higher-value concepts.
|
||||
The Crdt Relayer replicates transactions between nodes using a websocket. We aim to eliminate this component from the architecture, but for the moment it simplifies networking while we experiment with higher-value concepts.
|
||||
|
||||
Later, we will aim to remove the Crdt Relayer from the architecture, by (a) moving to pure P2P transactions between Crdt Nodes, and (b) doing leader election of a Crdt Node to reach agreement on the submitted block.
|
||||
Later, we will aim to remove the Crdt Relayer from the architecture, moving to pure P2P transactions between Crdt Nodes
|
||||
|
||||
## Possible uses
|
||||
|
||||
@@ -69,15 +79,13 @@ It is not necessarily the case that e.g. signer participants and Cosmos validato
|
||||
|
||||
Might the ability to be part of multiple consensus groups at once provide new opportunities for cross-chain transfers?
|
||||
|
||||
### Others
|
||||
|
||||
There are some brainstormed ideas in `docs/` and `examples/` as well as an ai-generated example in `crates/oracle-demo`. Have a look.
|
||||
|
||||
## Next dev tasks:
|
||||
|
||||
- [ ] we don't need a relayer, the first crdt node can act as a leader until people decide they don't want to trust it any more
|
||||
- [ ] the leader node can have a timer in it for block creation
|
||||
- [ ] code up the ability to switch leaders (can be a human decision at first, later an (optional) automated choice)
|
||||
- [ ] pick a commit and reveal scheme to remove MEV. One thing to investigate is [single-use seals](https://docs.rgb.info/distributed-computing-concepts/single-use-seals)
|
||||
- [ ] enable Crdt Nodes should download current P2P chain/dag state so that they start - out with a consistent copy of transaction data, and also do catch-up after going off-line
|
||||
- [ ] enable Crdt Nodes should download current P2P dag state so that they start out with a consistent copy of dag data, and also do catch-up after going off-line
|
||||
- [ ] remove the proc macro code from bft-json-crdt, it's not really needed in this implementation
|
||||
- [ ] add smart contract execution engine (CosmWasm would be a good first choice)
|
||||
- [ ] enable Crdt Nodes to download contract code for a given contract
|
||||
- [ ] enable Crdt Nodes to download current contract state for a given contract
|
||||
- [ ] switch to full P2P messaging instead of websockets
|
||||
|
||||
@@ -41,10 +41,10 @@ pub fn add_crdt_fields(args: OgTokenStream, input: OgTokenStream) -> OgTokenStre
|
||||
);
|
||||
}
|
||||
|
||||
return quote! {
|
||||
quote! {
|
||||
#input
|
||||
}
|
||||
.into();
|
||||
.into()
|
||||
}
|
||||
|
||||
/// Proc macro to automatically derive the CRDTNode trait
|
||||
@@ -56,7 +56,7 @@ pub fn derive_json_crdt(input: OgTokenStream) -> OgTokenStream {
|
||||
|
||||
// used in the quasi-quotation below as `#name`
|
||||
let ident = input.ident;
|
||||
let ident_str = LitStr::new(&*ident.to_string(), ident.span());
|
||||
let ident_str = LitStr::new(&ident.to_string(), ident.span());
|
||||
|
||||
let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
|
||||
match input.data {
|
||||
@@ -74,7 +74,7 @@ pub fn derive_json_crdt(input: OgTokenStream) -> OgTokenStream {
|
||||
Type::Path(t) => t.to_token_stream(),
|
||||
_ => return quote_spanned! { field.span() => compile_error!("Field should be a primitive or struct which implements CRDTNode") }.into(),
|
||||
};
|
||||
let str_literal = LitStr::new(&*ident.to_string(), ident.span());
|
||||
let str_literal = LitStr::new(&ident.to_string(), ident.span());
|
||||
ident_strings.push(str_literal.clone());
|
||||
ident_literals.push(ident.clone());
|
||||
tys.push(ty.clone());
|
||||
@@ -185,10 +185,10 @@ pub fn derive_json_crdt(input: OgTokenStream) -> OgTokenStream {
|
||||
expanded.into()
|
||||
}
|
||||
_ => {
|
||||
return quote_spanned! { ident.span() => compile_error!("Cannot derive CRDT on tuple or unit structs"); }
|
||||
quote_spanned! { ident.span() => compile_error!("Cannot derive CRDT on tuple or unit structs"); }
|
||||
.into()
|
||||
}
|
||||
},
|
||||
_ => return quote_spanned! { ident.span() => compile_error!("Cannot derive CRDT on enums or unions"); }.into(),
|
||||
_ => quote_spanned! { ident.span() => compile_error!("Cannot derive CRDT on enums or unions"); }.into(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
use bft_json_crdt::{
|
||||
json_crdt::{CrdtNode, JsonValue},
|
||||
keypair::make_author,
|
||||
list_crdt::ListCrdt,
|
||||
op::{Op, OpId, ROOT_ID}, json_crdt::{CrdtNode, JsonValue},
|
||||
op::{Op, OpId, ROOT_ID},
|
||||
};
|
||||
use rand::{rngs::ThreadRng, seq::SliceRandom, Rng};
|
||||
|
||||
fn random_op<T: CrdtNode>(arr: &Vec<Op<T>>, rng: &mut ThreadRng) -> OpId {
|
||||
fn random_op<T: CrdtNode>(arr: &[Op<T>], rng: &mut ThreadRng) -> OpId {
|
||||
arr.choose(rng).map(|op| op.id).unwrap_or(ROOT_ID)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
# BFT-CRDT Oracle Network Demo
|
||||
|
||||
THIS IS JUST AI SLOP. DON'T TRUST IT. :) DH
|
||||
|
||||
A live demonstration of a decentralized oracle network using Byzantine Fault Tolerant Conflict-free Replicated Data Types (BFT-CRDTs).
|
||||
|
||||
## What This Demo Shows
|
||||
@@ -75,4 +77,4 @@ See the [full documentation](../../docs/use-case-2-oracle-networks.md) for:
|
||||
- Detailed architecture
|
||||
- Smart contract integration
|
||||
- Production deployment guide
|
||||
- Security analysis
|
||||
- Security analysis
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use colored::*;
|
||||
use rand::Rng;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use std::time::Duration;
|
||||
mod network;
|
||||
mod oracle;
|
||||
mod utils;
|
||||
|
||||
/// A simple demonstration of the BFT-CRDT Oracle Network
|
||||
/// Run with: cargo run -p oracle-demo
|
||||
@@ -26,400 +25,13 @@ struct PriceAttestation {
|
||||
timestamp: u64,
|
||||
}
|
||||
|
||||
// ============ Simple CRDT ============
|
||||
|
||||
#[derive(Clone)]
|
||||
struct OracleNetworkCRDT {
|
||||
attestations: HashMap<String, PriceAttestation>,
|
||||
oracle_scores: HashMap<OracleId, f64>,
|
||||
}
|
||||
|
||||
impl OracleNetworkCRDT {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
attestations: HashMap::new(),
|
||||
oracle_scores: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn submit_attestation(&mut self, attestation: PriceAttestation) {
|
||||
self.attestations
|
||||
.insert(attestation.id.clone(), attestation.clone());
|
||||
|
||||
// Update oracle reputation
|
||||
let score = self
|
||||
.oracle_scores
|
||||
.entry(attestation.oracle_id.clone())
|
||||
.or_insert(0.5);
|
||||
*score = (*score * 0.95) + 0.05;
|
||||
}
|
||||
|
||||
fn merge(&mut self, other: &Self) {
|
||||
for (id, attestation) in &other.attestations {
|
||||
if !self.attestations.contains_key(id) {
|
||||
self.attestations.insert(id.clone(), attestation.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for (oracle_id, score) in &other.oracle_scores {
|
||||
self.oracle_scores.insert(oracle_id.clone(), *score);
|
||||
}
|
||||
}
|
||||
|
||||
fn get_aggregate_price(
|
||||
&self,
|
||||
asset_pair: &AssetPair,
|
||||
max_age: u64,
|
||||
) -> Option<(f64, u8, usize)> {
|
||||
let now = timestamp();
|
||||
let min_time = now.saturating_sub(max_age);
|
||||
|
||||
let mut prices = Vec::new();
|
||||
for attestation in self.attestations.values() {
|
||||
if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time {
|
||||
let weight = self
|
||||
.oracle_scores
|
||||
.get(&attestation.oracle_id)
|
||||
.unwrap_or(&0.5);
|
||||
prices.push((attestation.price, attestation.confidence, *weight));
|
||||
}
|
||||
}
|
||||
|
||||
if prices.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Remove outliers
|
||||
prices.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
|
||||
if prices.len() > 4 {
|
||||
let q1 = prices[prices.len() / 4].0;
|
||||
let q3 = prices[3 * prices.len() / 4].0;
|
||||
let iqr = q3 - q1;
|
||||
let lower = q1 - iqr * 1.5;
|
||||
let upper = q3 + iqr * 1.5;
|
||||
prices.retain(|(price, _, _)| *price >= lower && *price <= upper);
|
||||
}
|
||||
|
||||
// Calculate weighted average
|
||||
let mut total_weight = 0.0;
|
||||
let mut weighted_sum = 0.0;
|
||||
let mut confidence_sum = 0.0;
|
||||
|
||||
for (price, confidence, weight) in &prices {
|
||||
let w = (*confidence as f64 / 100.0) * weight;
|
||||
weighted_sum += price * w;
|
||||
confidence_sum += *confidence as f64 * w;
|
||||
total_weight += w;
|
||||
}
|
||||
|
||||
let avg_price = weighted_sum / total_weight;
|
||||
let avg_confidence = (confidence_sum / total_weight) as u8;
|
||||
|
||||
Some((avg_price, avg_confidence, prices.len()))
|
||||
}
|
||||
}
|
||||
|
||||
// ============ Oracle Node ============
|
||||
|
||||
struct OracleNode {
|
||||
id: OracleId,
|
||||
crdt: Arc<Mutex<OracleNetworkCRDT>>,
|
||||
is_byzantine: bool,
|
||||
base_price: f64,
|
||||
}
|
||||
|
||||
impl OracleNode {
|
||||
fn new(id: String, is_byzantine: bool) -> Self {
|
||||
Self {
|
||||
id: OracleId(id),
|
||||
crdt: Arc::new(Mutex::new(OracleNetworkCRDT::new())),
|
||||
is_byzantine,
|
||||
base_price: 2500.0,
|
||||
}
|
||||
}
|
||||
|
||||
fn submit_price(&self) {
|
||||
let mut rng = rand::rng();
|
||||
|
||||
let price = if self.is_byzantine {
|
||||
self.base_price * 1.2 // Try to manipulate 20% higher
|
||||
} else {
|
||||
self.base_price * (1.0 + rng.random_range(-0.01..0.01))
|
||||
};
|
||||
|
||||
let attestation = PriceAttestation {
|
||||
id: format!("{}_{}", self.id.0, timestamp()),
|
||||
oracle_id: self.id.clone(),
|
||||
asset_pair: AssetPair("ETH/USD".to_string()),
|
||||
price,
|
||||
confidence: if self.is_byzantine { 50 } else { 95 },
|
||||
timestamp: timestamp(),
|
||||
};
|
||||
|
||||
let mut crdt = self.crdt.lock().unwrap();
|
||||
crdt.submit_attestation(attestation);
|
||||
}
|
||||
}
|
||||
|
||||
// ============ Network Simulator ============
|
||||
|
||||
struct NetworkSimulator {
|
||||
nodes: Vec<Arc<OracleNode>>,
|
||||
partitioned: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
impl NetworkSimulator {
|
||||
fn new() -> Self {
|
||||
let mut nodes = Vec::new();
|
||||
|
||||
// Create 5 honest nodes
|
||||
for i in 1..=5 {
|
||||
nodes.push(Arc::new(OracleNode::new(format!("honest_{}", i), false)));
|
||||
}
|
||||
|
||||
// Create 2 Byzantine nodes
|
||||
for i in 1..=2 {
|
||||
nodes.push(Arc::new(OracleNode::new(format!("byzantine_{}", i), true)));
|
||||
}
|
||||
|
||||
Self {
|
||||
nodes,
|
||||
partitioned: Arc::new(Mutex::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
fn run(&self, duration: Duration) {
|
||||
println!(
|
||||
"{}",
|
||||
"🚀 Starting BFT-CRDT Oracle Network Demo".cyan().bold()
|
||||
);
|
||||
println!("{}", "=========================================".cyan());
|
||||
println!("📊 Network: {} nodes ({} Byzantine)", self.nodes.len(), 2);
|
||||
println!("⏱️ Duration: {:?}\n", duration);
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
// Spawn oracle threads
|
||||
let handles: Vec<_> = self
|
||||
.nodes
|
||||
.iter()
|
||||
.map(|node| {
|
||||
let node_clone = Arc::clone(node);
|
||||
let start_clone = start.clone();
|
||||
thread::spawn(move || {
|
||||
while start_clone.elapsed() < duration {
|
||||
node_clone.submit_price();
|
||||
thread::sleep(Duration::from_millis(1000));
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Spawn network propagation thread
|
||||
let nodes_clone = self.nodes.clone();
|
||||
let partitioned_clone = Arc::clone(&self.partitioned);
|
||||
let start_clone = start.clone();
|
||||
let propagation_handle = thread::spawn(move || {
|
||||
while start_clone.elapsed() < duration {
|
||||
let is_partitioned = *partitioned_clone.lock().unwrap();
|
||||
|
||||
// Propagate between nodes
|
||||
for i in 0..nodes_clone.len() {
|
||||
for j in 0..nodes_clone.len() {
|
||||
if i != j {
|
||||
// Skip if partitioned
|
||||
if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let crdt1 = nodes_clone[i].crdt.lock().unwrap();
|
||||
let mut crdt2 = nodes_clone[j].crdt.lock().unwrap();
|
||||
crdt2.merge(&*crdt1);
|
||||
}
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
});
|
||||
|
||||
// Main monitoring loop
|
||||
let mut last_partition = Instant::now();
|
||||
while start.elapsed() < duration {
|
||||
thread::sleep(Duration::from_secs(2));
|
||||
|
||||
// Print current state
|
||||
self.print_network_state();
|
||||
|
||||
// Simulate network partition every 10 seconds
|
||||
if last_partition.elapsed() > Duration::from_secs(10) {
|
||||
let mut partitioned = self.partitioned.lock().unwrap();
|
||||
*partitioned = !*partitioned;
|
||||
if *partitioned {
|
||||
println!(
|
||||
"\n{}",
|
||||
"⚠️ NETWORK PARTITION ACTIVE - Nodes split into two groups"
|
||||
.yellow()
|
||||
.bold()
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"\n{}",
|
||||
"✅ NETWORK PARTITION HEALED - All nodes can communicate"
|
||||
.green()
|
||||
.bold()
|
||||
);
|
||||
}
|
||||
last_partition = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for threads
|
||||
for handle in handles {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
propagation_handle.join().unwrap();
|
||||
|
||||
// Print final statistics
|
||||
self.print_final_stats();
|
||||
}
|
||||
|
||||
fn print_network_state(&self) {
|
||||
println!("\n{}", "📈 Current Network State:".white().bold());
|
||||
println!("{}", "------------------------".white());
|
||||
|
||||
// Get price from each node's perspective
|
||||
let mut prices = Vec::new();
|
||||
for node in &self.nodes {
|
||||
let crdt = node.crdt.lock().unwrap();
|
||||
if let Some((price, confidence, sources)) =
|
||||
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60)
|
||||
{
|
||||
prices.push((node.id.0.clone(), price, confidence, sources));
|
||||
let price_str = format!("${:.2}", price);
|
||||
let confidence_str = format!("{}%", confidence);
|
||||
|
||||
let line = if node.is_byzantine {
|
||||
format!(
|
||||
" {} sees: {} (confidence: {}, sources: {})",
|
||||
node.id.0.red(),
|
||||
price_str.red(),
|
||||
confidence_str.red(),
|
||||
sources
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
" {} sees: {} (confidence: {}, sources: {})",
|
||||
node.id.0.green(),
|
||||
price_str.green(),
|
||||
confidence_str.green(),
|
||||
sources
|
||||
)
|
||||
};
|
||||
println!("{}", line);
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate network consensus
|
||||
if !prices.is_empty() {
|
||||
let avg_price: f64 =
|
||||
prices.iter().map(|(_, p, _, _)| *p).sum::<f64>() / prices.len() as f64;
|
||||
let min_price = prices
|
||||
.iter()
|
||||
.map(|(_, p, _, _)| *p)
|
||||
.fold(f64::INFINITY, |a, b| a.min(b));
|
||||
let max_price = prices
|
||||
.iter()
|
||||
.map(|(_, p, _, _)| *p)
|
||||
.fold(f64::NEG_INFINITY, |a, b| a.max(b));
|
||||
let deviation = ((max_price - min_price) / avg_price) * 100.0;
|
||||
|
||||
println!("\n{}", "📊 Network Consensus:".cyan().bold());
|
||||
println!(" Average: {}", format!("${:.2}", avg_price).cyan());
|
||||
println!(
|
||||
" Range: {} - {}",
|
||||
format!("${:.2}", min_price).cyan(),
|
||||
format!("${:.2}", max_price).cyan()
|
||||
);
|
||||
println!(" Max Deviation: {}", format!("{:.2}%", deviation).cyan());
|
||||
}
|
||||
}
|
||||
|
||||
fn print_final_stats(&self) {
|
||||
println!("\n\n{}", "🏁 Final Statistics".yellow().bold());
|
||||
println!("{}", "===================".yellow());
|
||||
|
||||
let mut total_attestations = 0;
|
||||
let mut oracle_stats = Vec::new();
|
||||
|
||||
for node in &self.nodes {
|
||||
let crdt = node.crdt.lock().unwrap();
|
||||
let node_attestations = crdt.attestations.len();
|
||||
total_attestations += node_attestations;
|
||||
|
||||
let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5);
|
||||
oracle_stats.push((
|
||||
node.id.0.clone(),
|
||||
node_attestations,
|
||||
*score,
|
||||
node.is_byzantine,
|
||||
));
|
||||
}
|
||||
|
||||
println!("\n{}", "📈 Oracle Performance:".white().bold());
|
||||
for (id, attestations, score, is_byzantine) in oracle_stats {
|
||||
let icon = if is_byzantine { "🔴" } else { "🟢" };
|
||||
let line = format!(
|
||||
" {} {} - Attestations: {}, Reputation: {:.2}",
|
||||
icon, id, attestations, score
|
||||
);
|
||||
if is_byzantine {
|
||||
println!("{}", line.red());
|
||||
} else {
|
||||
println!("{}", line.green());
|
||||
}
|
||||
}
|
||||
|
||||
println!("\n{}", "📊 Network Totals:".cyan().bold());
|
||||
println!(" Total Attestations: {}", total_attestations);
|
||||
println!(
|
||||
" Attestations/second: {:.2}",
|
||||
total_attestations as f64 / 30.0
|
||||
);
|
||||
|
||||
// Show that Byzantine nodes were filtered out
|
||||
if let Some(node) = self.nodes.first() {
|
||||
let crdt = node.crdt.lock().unwrap();
|
||||
if let Some((price, confidence, _)) =
|
||||
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300)
|
||||
{
|
||||
println!("\n{}", "✅ Final Aggregated Price:".green().bold());
|
||||
println!(
|
||||
" {} (confidence: {}%)",
|
||||
format!("${:.2}", price).green().bold(),
|
||||
confidence
|
||||
);
|
||||
println!(" {}", "Despite Byzantine manipulation attempts!".green());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ============ Helper Functions ============
|
||||
|
||||
fn timestamp() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
// ============ Main Function ============
|
||||
|
||||
fn main() {
|
||||
println!("{}", "BFT-CRDT Oracle Network Demo".cyan().bold());
|
||||
println!("{}", "============================\n".cyan());
|
||||
|
||||
let simulator = NetworkSimulator::new();
|
||||
let simulator = network::Simulator::new();
|
||||
simulator.run(Duration::from_secs(30));
|
||||
|
||||
println!("\n{}", "✅ Demo completed!".green().bold());
|
||||
|
||||
251
crates/oracle-demo/src/network.rs
Normal file
251
crates/oracle-demo/src/network.rs
Normal file
@@ -0,0 +1,251 @@
|
||||
use crate::{oracle, AssetPair};
|
||||
use colored::Colorize;
|
||||
use std::{
|
||||
sync::{Arc, Mutex},
|
||||
thread,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
pub(crate) struct Simulator {
|
||||
nodes: Vec<Arc<oracle::Node>>,
|
||||
partitioned: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
impl Simulator {
|
||||
pub(crate) fn new() -> Self {
|
||||
let mut nodes = Vec::new();
|
||||
|
||||
// Create 5 honest nodes
|
||||
for i in 1..=5 {
|
||||
nodes.push(Arc::new(oracle::Node::new(format!("honest_{}", i), false)));
|
||||
}
|
||||
|
||||
// Create 2 Byzantine nodes
|
||||
for i in 1..=2 {
|
||||
nodes.push(Arc::new(oracle::Node::new(
|
||||
format!("byzantine_{}", i),
|
||||
true,
|
||||
)));
|
||||
}
|
||||
|
||||
Self {
|
||||
nodes,
|
||||
partitioned: Arc::new(Mutex::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn run(&self, duration: Duration) {
|
||||
println!(
|
||||
"{}",
|
||||
"🚀 Starting BFT-CRDT Oracle Network Demo".cyan().bold()
|
||||
);
|
||||
println!("{}", "=========================================".cyan());
|
||||
println!("📊 Network: {} nodes ({} Byzantine)", self.nodes.len(), 2);
|
||||
println!("⏱️ Duration: {:?}\n", duration);
|
||||
|
||||
let start = Instant::now();
|
||||
|
||||
// Spawn oracle threads
|
||||
let handles: Vec<_> = self
|
||||
.nodes
|
||||
.iter()
|
||||
.map(|node| {
|
||||
let node_clone = Arc::clone(node);
|
||||
let start_clone = start;
|
||||
thread::spawn(move || {
|
||||
while start_clone.elapsed() < duration {
|
||||
node_clone.submit_price();
|
||||
thread::sleep(Duration::from_millis(1000));
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Spawn network propagation thread
|
||||
let nodes_clone = self.nodes.clone();
|
||||
let partitioned_clone = Arc::clone(&self.partitioned);
|
||||
let start_clone = start;
|
||||
let propagation_handle = thread::spawn(move || {
|
||||
while start_clone.elapsed() < duration {
|
||||
let is_partitioned = *partitioned_clone.lock().unwrap();
|
||||
|
||||
// Propagate between nodes
|
||||
for i in 0..nodes_clone.len() {
|
||||
for j in 0..nodes_clone.len() {
|
||||
if i != j {
|
||||
// Skip if partitioned
|
||||
if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let crdt1 = nodes_clone[i].crdt.lock().unwrap();
|
||||
let mut crdt2 = nodes_clone[j].crdt.lock().unwrap();
|
||||
crdt2.merge(&crdt1);
|
||||
}
|
||||
}
|
||||
}
|
||||
thread::sleep(Duration::from_millis(100));
|
||||
}
|
||||
});
|
||||
|
||||
// Main monitoring loop
|
||||
let mut last_partition = Instant::now();
|
||||
while start.elapsed() < duration {
|
||||
thread::sleep(Duration::from_secs(2));
|
||||
|
||||
// Print current state
|
||||
self.print_network_state();
|
||||
|
||||
// Simulate network partition every 10 seconds
|
||||
if last_partition.elapsed() > Duration::from_secs(10) {
|
||||
let mut partitioned = self.partitioned.lock().unwrap();
|
||||
*partitioned = !*partitioned;
|
||||
if *partitioned {
|
||||
println!(
|
||||
"\n{}",
|
||||
"⚠️ NETWORK PARTITION ACTIVE - Nodes split into two groups"
|
||||
.yellow()
|
||||
.bold()
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"\n{}",
|
||||
"✅ NETWORK PARTITION HEALED - All nodes can communicate"
|
||||
.green()
|
||||
.bold()
|
||||
);
|
||||
}
|
||||
last_partition = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for threads
|
||||
for handle in handles {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
propagation_handle.join().unwrap();
|
||||
|
||||
// Print final statistics
|
||||
self.print_final_stats();
|
||||
}
|
||||
|
||||
fn print_network_state(&self) {
|
||||
println!("\n{}", "📈 Current Network State:".white().bold());
|
||||
println!("{}", "------------------------".white());
|
||||
|
||||
// Get price from each node's perspective
|
||||
let mut prices = Vec::new();
|
||||
for node in &self.nodes {
|
||||
let crdt = node.crdt.lock().unwrap();
|
||||
if let Some((price, confidence, sources)) =
|
||||
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60)
|
||||
{
|
||||
prices.push((node.id.0.clone(), price, confidence, sources));
|
||||
let price_str = format!("${:.2}", price);
|
||||
let confidence_str = format!("{}%", confidence);
|
||||
|
||||
let line = if node.is_byzantine {
|
||||
format!(
|
||||
" {} sees: {} (confidence: {}, sources: {})",
|
||||
node.id.0.red(),
|
||||
price_str.red(),
|
||||
confidence_str.red(),
|
||||
sources
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
" {} sees: {} (confidence: {}, sources: {})",
|
||||
node.id.0.green(),
|
||||
price_str.green(),
|
||||
confidence_str.green(),
|
||||
sources
|
||||
)
|
||||
};
|
||||
println!("{}", line);
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate network consensus
|
||||
if !prices.is_empty() {
|
||||
let avg_price: f64 =
|
||||
prices.iter().map(|(_, p, _, _)| *p).sum::<f64>() / prices.len() as f64;
|
||||
let min_price = prices
|
||||
.iter()
|
||||
.map(|(_, p, _, _)| *p)
|
||||
.fold(f64::INFINITY, |a, b| a.min(b));
|
||||
let max_price = prices
|
||||
.iter()
|
||||
.map(|(_, p, _, _)| *p)
|
||||
.fold(f64::NEG_INFINITY, |a, b| a.max(b));
|
||||
let deviation = ((max_price - min_price) / avg_price) * 100.0;
|
||||
|
||||
println!("\n{}", "📊 Network Consensus:".cyan().bold());
|
||||
println!(" Average: {}", format!("${:.2}", avg_price).cyan());
|
||||
println!(
|
||||
" Range: {} - {}",
|
||||
format!("${:.2}", min_price).cyan(),
|
||||
format!("${:.2}", max_price).cyan()
|
||||
);
|
||||
println!(" Max Deviation: {}", format!("{:.2}%", deviation).cyan());
|
||||
}
|
||||
}
|
||||
|
||||
fn print_final_stats(&self) {
|
||||
println!("\n\n{}", "🏁 Final Statistics".yellow().bold());
|
||||
println!("{}", "===================".yellow());
|
||||
|
||||
let mut total_attestations = 0;
|
||||
let mut oracle_stats = Vec::new();
|
||||
|
||||
for node in &self.nodes {
|
||||
let crdt = node.crdt.lock().unwrap();
|
||||
let node_attestations = crdt.attestations.len();
|
||||
total_attestations += node_attestations;
|
||||
|
||||
let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5);
|
||||
oracle_stats.push((
|
||||
node.id.0.clone(),
|
||||
node_attestations,
|
||||
*score,
|
||||
node.is_byzantine,
|
||||
));
|
||||
}
|
||||
|
||||
println!("\n{}", "📈 Oracle Performance:".white().bold());
|
||||
for (id, attestations, score, is_byzantine) in oracle_stats {
|
||||
let icon = if is_byzantine { "🔴" } else { "🟢" };
|
||||
let line = format!(
|
||||
" {} {} - Attestations: {}, Reputation: {:.2}",
|
||||
icon, id, attestations, score
|
||||
);
|
||||
if is_byzantine {
|
||||
println!("{}", line.red());
|
||||
} else {
|
||||
println!("{}", line.green());
|
||||
}
|
||||
}
|
||||
|
||||
println!("\n{}", "📊 Network Totals:".cyan().bold());
|
||||
println!(" Total Attestations: {}", total_attestations);
|
||||
println!(
|
||||
" Attestations/second: {:.2}",
|
||||
total_attestations as f64 / 30.0
|
||||
);
|
||||
|
||||
// Show that Byzantine nodes were filtered out
|
||||
if let Some(node) = self.nodes.first() {
|
||||
let crdt = node.crdt.lock().unwrap();
|
||||
if let Some((price, confidence, _)) =
|
||||
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300)
|
||||
{
|
||||
println!("\n{}", "✅ Final Aggregated Price:".green().bold());
|
||||
println!(
|
||||
" {} (confidence: {}%)",
|
||||
format!("${:.2}", price).green().bold(),
|
||||
confidence
|
||||
);
|
||||
println!(" {}", "Despite Byzantine manipulation attempts!".green());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
5
crates/oracle-demo/src/oracle/mod.rs
Normal file
5
crates/oracle-demo/src/oracle/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
mod network_crdt;
|
||||
mod node;
|
||||
|
||||
pub(crate) use network_crdt::NetworkCRDT;
|
||||
pub(crate) use node::OracleNode as Node;
|
||||
94
crates/oracle-demo/src/oracle/network_crdt.rs
Normal file
94
crates/oracle-demo/src/oracle/network_crdt.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{utils, AssetPair, OracleId, PriceAttestation};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct NetworkCRDT {
|
||||
pub(crate) attestations: HashMap<String, PriceAttestation>,
|
||||
pub(crate) oracle_scores: HashMap<OracleId, f64>,
|
||||
}
|
||||
|
||||
impl NetworkCRDT {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
attestations: HashMap::new(),
|
||||
oracle_scores: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn submit_attestation(&mut self, attestation: PriceAttestation) {
|
||||
self.attestations
|
||||
.insert(attestation.id.clone(), attestation.clone());
|
||||
|
||||
// Update oracle reputation
|
||||
let score = self
|
||||
.oracle_scores
|
||||
.entry(attestation.oracle_id.clone())
|
||||
.or_insert(0.5);
|
||||
*score = (*score * 0.95) + 0.05;
|
||||
}
|
||||
|
||||
pub(crate) fn merge(&mut self, other: &Self) {
|
||||
for (id, attestation) in &other.attestations {
|
||||
if !self.attestations.contains_key(id) {
|
||||
self.attestations.insert(id.clone(), attestation.clone());
|
||||
}
|
||||
}
|
||||
|
||||
for (oracle_id, score) in &other.oracle_scores {
|
||||
self.oracle_scores.insert(oracle_id.clone(), *score);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_aggregate_price(
|
||||
&self,
|
||||
asset_pair: &AssetPair,
|
||||
max_age: u64,
|
||||
) -> Option<(f64, u8, usize)> {
|
||||
let now = utils::timestamp();
|
||||
let min_time = now.saturating_sub(max_age);
|
||||
|
||||
let mut prices = Vec::new();
|
||||
for attestation in self.attestations.values() {
|
||||
if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time {
|
||||
let weight = self
|
||||
.oracle_scores
|
||||
.get(&attestation.oracle_id)
|
||||
.unwrap_or(&0.5);
|
||||
prices.push((attestation.price, attestation.confidence, *weight));
|
||||
}
|
||||
}
|
||||
|
||||
if prices.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Remove outliers
|
||||
prices.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
|
||||
if prices.len() > 4 {
|
||||
let q1 = prices[prices.len() / 4].0;
|
||||
let q3 = prices[3 * prices.len() / 4].0;
|
||||
let iqr = q3 - q1;
|
||||
let lower = q1 - iqr * 1.5;
|
||||
let upper = q3 + iqr * 1.5;
|
||||
prices.retain(|(price, _, _)| *price >= lower && *price <= upper);
|
||||
}
|
||||
|
||||
// Calculate weighted average
|
||||
let mut total_weight = 0.0;
|
||||
let mut weighted_sum = 0.0;
|
||||
let mut confidence_sum = 0.0;
|
||||
|
||||
for (price, confidence, weight) in &prices {
|
||||
let w = (*confidence as f64 / 100.0) * weight;
|
||||
weighted_sum += price * w;
|
||||
confidence_sum += *confidence as f64 * w;
|
||||
total_weight += w;
|
||||
}
|
||||
|
||||
let avg_price = weighted_sum / total_weight;
|
||||
let avg_confidence = (confidence_sum / total_weight) as u8;
|
||||
|
||||
Some((avg_price, avg_confidence, prices.len()))
|
||||
}
|
||||
}
|
||||
43
crates/oracle-demo/src/oracle/node.rs
Normal file
43
crates/oracle-demo/src/oracle/node.rs
Normal file
@@ -0,0 +1,43 @@
|
||||
use crate::{oracle, utils, AssetPair, OracleId, PriceAttestation};
|
||||
use rand::Rng;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub(crate) struct OracleNode {
|
||||
pub(crate) id: OracleId,
|
||||
pub(crate) crdt: Arc<Mutex<oracle::NetworkCRDT>>,
|
||||
pub(crate) is_byzantine: bool,
|
||||
pub(crate) base_price: f64,
|
||||
}
|
||||
|
||||
impl OracleNode {
|
||||
pub(crate) fn new(id: String, is_byzantine: bool) -> Self {
|
||||
Self {
|
||||
id: OracleId(id),
|
||||
crdt: Arc::new(Mutex::new(oracle::NetworkCRDT::new())),
|
||||
is_byzantine,
|
||||
base_price: 2500.0,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn submit_price(&self) {
|
||||
let mut rng = rand::rng();
|
||||
|
||||
let price = if self.is_byzantine {
|
||||
self.base_price * 1.2 // Try to manipulate 20% higher
|
||||
} else {
|
||||
self.base_price * (1.0 + rng.random_range(-0.01..0.01))
|
||||
};
|
||||
|
||||
let attestation = PriceAttestation {
|
||||
id: format!("{}_{}", self.id.0, utils::timestamp()),
|
||||
oracle_id: self.id.clone(),
|
||||
asset_pair: AssetPair("ETH/USD".to_string()),
|
||||
price,
|
||||
confidence: if self.is_byzantine { 50 } else { 95 },
|
||||
timestamp: utils::timestamp(),
|
||||
};
|
||||
|
||||
let mut crdt = self.crdt.lock().unwrap();
|
||||
crdt.submit_attestation(attestation);
|
||||
}
|
||||
}
|
||||
8
crates/oracle-demo/src/utils.rs
Normal file
8
crates/oracle-demo/src/utils.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
pub(crate) fn timestamp() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::{
|
||||
fs::{self, File},
|
||||
io::Write,
|
||||
path::PathBuf,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use bft_json_crdt::keypair::{make_keypair, Ed25519KeyPair};
|
||||
@@ -13,12 +13,12 @@ pub(crate) fn write(key_path: &PathBuf) -> Result<(), std::io::Error> {
|
||||
|
||||
let mut file = File::create(key_path)?;
|
||||
let out = keys.encode_base64();
|
||||
file.write(out.as_bytes())?;
|
||||
file.write_all(out.as_bytes())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn load_from_file(side_dir: &PathBuf) -> Ed25519KeyPair {
|
||||
let key_path = crate::utils::side_paths(side_dir.clone()).0;
|
||||
pub(crate) fn load_from_file(side_dir: &Path) -> Ed25519KeyPair {
|
||||
let key_path = crate::utils::side_paths(side_dir.to_path_buf()).0;
|
||||
|
||||
let data = fs::read_to_string(key_path).expect("couldn't read bft-bft-crdt key file");
|
||||
println!("data: {:?}", data);
|
||||
|
||||
@@ -6,8 +6,6 @@ use bft_json_crdt::{
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod keys;
|
||||
pub mod stdin;
|
||||
pub mod websocket;
|
||||
|
||||
#[add_crdt_fields]
|
||||
#[derive(Clone, CrdtNode, Serialize, Deserialize, Debug)]
|
||||
|
||||
@@ -2,9 +2,7 @@ use clap::Parser;
|
||||
use clap::Subcommand;
|
||||
|
||||
pub(crate) fn parse_args() -> Args {
|
||||
let args = Args::parse();
|
||||
|
||||
args
|
||||
Args::parse()
|
||||
}
|
||||
|
||||
/// A P2P BFT info sharing node
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use bft_crdt::websocket;
|
||||
use bft_crdt::TransactionList;
|
||||
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
|
||||
use cli::{parse_args, Commands};
|
||||
use cli::Commands;
|
||||
use node::SideNode;
|
||||
use tokio::{sync::mpsc, task};
|
||||
|
||||
@@ -9,11 +8,13 @@ pub mod bft_crdt;
|
||||
pub(crate) mod cli;
|
||||
pub(crate) mod init;
|
||||
pub mod node;
|
||||
mod stdin;
|
||||
pub mod utils;
|
||||
pub mod websocket;
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn run() {
|
||||
let args = parse_args();
|
||||
let args = cli::parse_args();
|
||||
|
||||
match &args.command {
|
||||
Some(Commands::Init { name }) => {
|
||||
@@ -32,21 +33,22 @@ pub async fn run() {
|
||||
}
|
||||
|
||||
/// Wire everything up outside the application so that we can test more easily later
|
||||
async fn setup(name: &String) -> SideNode {
|
||||
// First, load up the keys and create a bft-bft-crdt
|
||||
let side_dir = utils::home(name);
|
||||
let bft_crdt_keys = bft_crdt::keys::load_from_file(&side_dir);
|
||||
async fn setup(name: &str) -> SideNode {
|
||||
// First, load up the keys and create a bft-crdt node
|
||||
let home_dir = utils::home(name);
|
||||
let bft_crdt_keys = bft_crdt::keys::load_from_file(&home_dir);
|
||||
let crdt = BaseCrdt::<TransactionList>::new(&bft_crdt_keys);
|
||||
|
||||
// Channels for internal communication, and a tokio task for stdin input
|
||||
let (incoming_sender, incoming_receiver) = mpsc::channel::<SignedOp>(32);
|
||||
let (stdin_sender, stdin_receiver) = std::sync::mpsc::channel();
|
||||
task::spawn(async move {
|
||||
bft_crdt::stdin::input(stdin_sender);
|
||||
stdin::input(stdin_sender);
|
||||
});
|
||||
|
||||
// Finally, create the node and return it
|
||||
// Wire the websocket client to the incoming channel
|
||||
let handle = websocket::Client::new(incoming_sender).await;
|
||||
// Finally, create the node and return it
|
||||
let node = SideNode::new(
|
||||
crdt,
|
||||
bft_crdt_keys,
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use crdt_node;
|
||||
|
||||
fn main() {
|
||||
crdt_node::run();
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
|
||||
use fastcrypto::ed25519::Ed25519KeyPair;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::{bft_crdt::websocket::Client, bft_crdt::TransactionList, utils};
|
||||
use crate::{bft_crdt::TransactionList, utils, websocket::Client};
|
||||
|
||||
pub struct SideNode {
|
||||
crdt: BaseCrdt<TransactionList>,
|
||||
@@ -21,36 +21,29 @@ impl SideNode {
|
||||
stdin_receiver: std::sync::mpsc::Receiver<String>,
|
||||
handle: ezsockets::Client<Client>,
|
||||
) -> Self {
|
||||
let node = Self {
|
||||
Self {
|
||||
crdt,
|
||||
bft_crdt_keys,
|
||||
incoming_receiver,
|
||||
stdin_receiver,
|
||||
handle,
|
||||
};
|
||||
node
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn start(&mut self) {
|
||||
println!("Starting node...");
|
||||
|
||||
loop {
|
||||
match self.stdin_receiver.try_recv() {
|
||||
Ok(stdin) => {
|
||||
let transaction = utils::fake_generic_transaction_json(stdin);
|
||||
let json = serde_json::to_value(transaction).unwrap();
|
||||
let signed_op = self.add_transaction_local(json);
|
||||
println!("STDIN: {}", utils::shappy(signed_op.clone()));
|
||||
self.send_to_network(signed_op).await;
|
||||
}
|
||||
Err(_) => {} // ignore empty channel errors in this PoC
|
||||
if let Ok(stdin) = self.stdin_receiver.try_recv() {
|
||||
let transaction = utils::fake_generic_transaction_json(stdin);
|
||||
let json = serde_json::to_value(transaction).unwrap();
|
||||
let signed_op = self.add_transaction_local(json);
|
||||
println!("STDIN: {}", utils::sha_op(signed_op.clone()));
|
||||
self.send_to_network(signed_op).await;
|
||||
}
|
||||
match self.incoming_receiver.try_recv() {
|
||||
Ok(incoming) => {
|
||||
println!("INCOMING: {}", utils::shappy(incoming.clone()));
|
||||
self.handle_incoming(incoming);
|
||||
}
|
||||
Err(_) => {} // ignore empty channel errors in this PoC
|
||||
if let Ok(incoming) = self.incoming_receiver.try_recv() {
|
||||
println!("INCOMING: {}", utils::sha_op(incoming.clone()));
|
||||
self.handle_incoming(incoming);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -76,14 +69,13 @@ impl SideNode {
|
||||
.ops
|
||||
.last()
|
||||
.expect("couldn't find last op");
|
||||
let signed_op = self
|
||||
.crdt
|
||||
|
||||
// self.trace_crdt();
|
||||
self.crdt
|
||||
.doc
|
||||
.list
|
||||
.insert(last.id, transaction)
|
||||
.sign(&self.bft_crdt_keys);
|
||||
// self.trace_crdt();
|
||||
signed_op
|
||||
.sign(&self.bft_crdt_keys)
|
||||
}
|
||||
|
||||
/// Print the current state of the CRDT, can be used to debug
|
||||
|
||||
@@ -33,12 +33,12 @@ pub fn fake_generic_transaction_json(from: String) -> Value {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn shappy(op: SignedOp) -> String {
|
||||
pub fn sha_op(op: SignedOp) -> String {
|
||||
let b = serde_json::to_string(&op).unwrap().into_bytes();
|
||||
sha256::digest(b).to_string()
|
||||
}
|
||||
|
||||
pub fn shassy(text: String) -> String {
|
||||
pub fn sha_string(text: String) -> String {
|
||||
let b = text.into_bytes();
|
||||
sha256::digest(b).to_string()
|
||||
}
|
||||
|
||||
@@ -39,10 +39,10 @@ impl ezsockets::ClientExt for Client {
|
||||
/// When we receive a text message, apply the bft-crdt operation contained in it to our
|
||||
/// local bft-crdt.
|
||||
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
|
||||
let string_sha = utils::shassy(text.clone());
|
||||
let string_sha = utils::sha_string(text.clone());
|
||||
println!("received text, sha: {string_sha}");
|
||||
let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap();
|
||||
let object_sha = utils::shappy(incoming.clone());
|
||||
let object_sha = utils::sha_op(incoming.clone());
|
||||
println!("deserialized: {}", object_sha);
|
||||
if string_sha != object_sha {
|
||||
panic!("sha mismatch: {string_sha} != {object_sha}, bft-bft-crdt has failed");
|
||||
@@ -2,7 +2,7 @@ use bft_json_crdt::{
|
||||
json_crdt::{BaseCrdt, SignedOp},
|
||||
keypair::make_keypair,
|
||||
};
|
||||
use crdt_node::{bft_crdt::websocket::Client, bft_crdt::TransactionList, node::SideNode, utils};
|
||||
use crdt_node::{bft_crdt::TransactionList, node::SideNode, utils, websocket};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
#[tokio::test]
|
||||
@@ -42,13 +42,13 @@ async fn setup(_: &str) -> SideNode {
|
||||
let (_, stdin_receiver) = std::sync::mpsc::channel();
|
||||
|
||||
// Finally, create the node and return it
|
||||
let handle = Client::new(incoming_sender).await;
|
||||
let node = SideNode::new(
|
||||
let handle = websocket::Client::new(incoming_sender).await;
|
||||
|
||||
SideNode::new(
|
||||
crdt,
|
||||
bft_crdt_keys,
|
||||
incoming_receiver,
|
||||
stdin_receiver,
|
||||
handle,
|
||||
);
|
||||
node
|
||||
)
|
||||
}
|
||||
|
||||
41
docs/use-case-5-payment-cluster.md
Normal file
41
docs/use-case-5-payment-cluster.md
Normal file
@@ -0,0 +1,41 @@
|
||||
# Use Case 5: Payment Cluster
|
||||
|
||||
## Abstract case (no external integrations)
|
||||
|
||||
1. Node1 is initialized, with a data store and 100 tokens.
|
||||
2. Node2 starts up.
|
||||
3. Node1 sends 1 token to Node2 by signing a transfer statement.
|
||||
a. if Node1 is honest, it will update its own store to 99 tokens
|
||||
b. Node2 will always update its internal store. It also has proof that Node1 has transferred 1 token.
|
||||
4. Node3 shows up. It asks the other two nodes for state.
|
||||
a. Node2 will send correct state
|
||||
b. if Node1 is honest, it will send correct state
|
||||
c. if Node1 is dishonest, it will send 100 tokens as its state.
|
||||
d. Node3 asks Node2 for proof. Node2 sends it.
|
||||
|
||||
So far, it kind of works. Alternately there could be two ways to go:
|
||||
|
||||
1. all nodes send all history to newcomers or,
|
||||
2. if all nodes send the same state, there may be no need for anybody to request the total state. Processing can start from this point.
|
||||
|
||||
There are a lot of questions here. As soon as a node goes offline, others can play dishonesty games, obviously.
|
||||
|
||||
Are there ways of
|
||||
|
||||
(a) checkpointing to some outside system
|
||||
(b) checkpointing internally
|
||||
(c) playing some economic game where some "core" nodes are always up and notionally have some kind of financial incentives?
|
||||
|
||||
We can make it pretty decentralised, but still, the system would need to have *somewhere* that new nodes could look up where they could start interacting, and also get some notion of who was currently connected (so that they would be able to broadcast messages to all nodes).
|
||||
|
||||
As long as there is 1 honest node does the system work?
|
||||
|
||||
Maybe not. There could be like 99 dishonest nodes that broadcast messages to each other, but not to the 100th (honest) node. In such a case, how could anybody tell what the "real" history was?
|
||||
|
||||
Could a system of acks fix this?
|
||||
|
||||
Only if there was a way of determining who was "in" and who was "out".
|
||||
|
||||
So like maybe there is a core node for the cluster that people pay fees to? And those fees could go into a big pot and be paid out periodically. If anybody comes up with proof that a core node has behaved dishonestly, they get the fee pot. There could be even maybe 4 "core" nodes per cluster so that there was some redundancy available.
|
||||
|
||||
Enforcement against a core node, or any other node that had "misbehaved" would be an interesting question...
|
||||
Reference in New Issue
Block a user