12 Commits

Author SHA1 Message Date
Dave
10c0c64984 Fixed a last warning 2025-11-25 14:17:57 +00:00
Dave
3d746a8073 Fixed some warnings 2025-11-25 13:58:27 +00:00
Dave
2e4510679a Some extra weird thoughts 2025-06-13 16:59:06 -04:00
Dave
51dd81e145 Noting presence of slop 2025-06-12 16:37:10 -04:00
Dave
c528160d34 Moved websocket out onto the root 2025-06-12 16:32:22 -04:00
Dave
0126614dd3 Moved stdin onto root 2025-06-12 16:27:41 -04:00
Dave
b933f8d6fc Renamed sha operations to sound less crazy 2025-06-12 16:25:49 -04:00
Dave
365cfd7b01 Minor refactors 2025-06-12 16:23:56 -04:00
Dave
79ce80a4a4 Removed the blockchain ambitions from the README. 2025-06-12 16:01:10 -04:00
Dave
32d7b62cfe Moved imports 2025-06-12 15:51:18 -04:00
Dave
97711e2ecf Renamed prefix on oracle network 2025-06-12 15:50:42 -04:00
Dave
e2d50144ca Refactored into multiple modules 2025-06-12 15:49:04 -04:00
21 changed files with 519 additions and 466 deletions

View File

@@ -1,12 +1,18 @@
# BFT-CRDT PoC
This is a proof of concept implementation of a [BFT-CRDT](https://jzhao.xyz/posts/bft-json-crdt) blockchain-like system. It is willfully, wildly insecure as a blockchain right now. Think of it as an experiment which is strictly for fun and poking at ideas.
This is a proof of concept implementation of a [BFT-CRDT](https://jzhao.xyz/posts/bft-json-crdt) system.
This code is based on the ideas of [Martin Kleppmann](https://martin.kleppmann.com/papers/bft-crdt-papoc22.pdf) and the ideas and code of [Jacky Zhao](https://jzhao.xyz/). Have a read, they are both excellent writers and have some of the most interesting computing ideas I've run across in quite a while.
It is not clear what this thing is for, yet. It's not a blockchain. It makes a kind of secure DAG. It uses BFT-CRDTs to make a Sybil-proof and secure information transmission system for messages, with eventual consistency guarantees.
The idea that it could be possible to set up a secure Sybil-proof system, negating the energy burn required for proof of work, the financially exclusionary proof of stake, or the meat space hassle of a proof of personhood ceremony, is too attractive to ignore. At least, if you're interested in cool P2P systems.
Initially I was thinking it could perhaps be used to make a kind of opt-in blockchain, but I don't think it'll work (and reading up on things like e.g. vector clocks, which I had initially thought about for ordering, the literature goes out of its way to note that they can't work in Byzantine environments).
So if it can't be a blockchain, what can it be? Is it useful at all?
Potentially, yes. There are lots of things in crypto land which do not necessarily need consensus and/or a Total Global Ordering. Some brainstormed ideas for these are in the `docs/` folder.
I also wonder: can we use George's insights about blockchain conflicts here? Assume a CRDT based system where participating users have public/private keypairs. The system is initialized with an initial distribution (like a regular blockchain is). Coins change hands when users sign transfers to each other. Is there a way to make such transfers update properly *without* making a block and having a total global ordering?
## Prerequisites
@@ -45,6 +51,10 @@ You can then type directly into each of the Crdt Node consoles. Messages will be
What we have here is a very simple system comprised of two parts: the Crdt Node, and the Crdt Relayer.
It is pretty cool in the sense that it is actually Sybil-proof. But you can't get a Total Global Ordering out of it, so you can't use it for e.g. account transfers in a blockchain.
However, there may be other cases that are interesting which we can't see yet: secure distributed filesystems, some kind of Lightning replacement, etc.
### Crdt Node(s)
The Crdt Nodes make up a system of BFT-CRDT-producing nodes that can make a sort of wildly insecure blockchain. Currently, they can reliably send transactions to each other in a secure way, such that all nodes they communicate with can tell whether received transactions are obeying the rules of the system.
@@ -53,9 +63,9 @@ The Crdt Node does not download any chain state, and if one goes off-line it wil
### Crdt Relayer
The Crdt Relayer replicates transactions between nodes using a websocket. We aim to eliminate this component from the architecture, but for the moment it simplifies networking and consensus agreement while we experiment with higher-value concepts.
The Crdt Relayer replicates transactions between nodes using a websocket. We aim to eliminate this component from the architecture, but for the moment it simplifies networking while we experiment with higher-value concepts.
Later, we will aim to remove the Crdt Relayer from the architecture, by (a) moving to pure P2P transactions between Crdt Nodes, and (b) doing leader election of a Crdt Node to reach agreement on the submitted block.
Later, we will aim to remove the Crdt Relayer from the architecture, moving to pure P2P transactions between Crdt Nodes
## Possible uses
@@ -69,15 +79,13 @@ It is not necessarily the case that e.g. signer participants and Cosmos validato
Might the ability to be part of multiple consensus groups at once provide new opportunities for cross-chain transfers?
### Others
There are some brainstormed ideas in `docs/` and `examples/` as well as an ai-generated example in `crates/oracle-demo`. Have a look.
## Next dev tasks:
- [ ] we don't need a relayer, the first crdt node can act as a leader until people decide they don't want to trust it any more
- [ ] the leader node can have a timer in it for block creation
- [ ] code up the ability to switch leaders (can be a human decision at first, later an (optional) automated choice)
- [ ] pick a commit and reveal scheme to remove MEV. One thing to investigate is [single-use seals](https://docs.rgb.info/distributed-computing-concepts/single-use-seals)
- [ ] enable Crdt Nodes should download current P2P chain/dag state so that they start - out with a consistent copy of transaction data, and also do catch-up after going off-line
- [ ] enable Crdt Nodes should download current P2P dag state so that they start out with a consistent copy of dag data, and also do catch-up after going off-line
- [ ] remove the proc macro code from bft-json-crdt, it's not really needed in this implementation
- [ ] add smart contract execution engine (CosmWasm would be a good first choice)
- [ ] enable Crdt Nodes to download contract code for a given contract
- [ ] enable Crdt Nodes to download current contract state for a given contract
- [ ] switch to full P2P messaging instead of websockets

View File

@@ -41,10 +41,10 @@ pub fn add_crdt_fields(args: OgTokenStream, input: OgTokenStream) -> OgTokenStre
);
}
return quote! {
quote! {
#input
}
.into();
.into()
}
/// Proc macro to automatically derive the CRDTNode trait
@@ -56,7 +56,7 @@ pub fn derive_json_crdt(input: OgTokenStream) -> OgTokenStream {
// used in the quasi-quotation below as `#name`
let ident = input.ident;
let ident_str = LitStr::new(&*ident.to_string(), ident.span());
let ident_str = LitStr::new(&ident.to_string(), ident.span());
let (impl_generics, ty_generics, where_clause) = input.generics.split_for_impl();
match input.data {
@@ -74,7 +74,7 @@ pub fn derive_json_crdt(input: OgTokenStream) -> OgTokenStream {
Type::Path(t) => t.to_token_stream(),
_ => return quote_spanned! { field.span() => compile_error!("Field should be a primitive or struct which implements CRDTNode") }.into(),
};
let str_literal = LitStr::new(&*ident.to_string(), ident.span());
let str_literal = LitStr::new(&ident.to_string(), ident.span());
ident_strings.push(str_literal.clone());
ident_literals.push(ident.clone());
tys.push(ty.clone());
@@ -185,10 +185,10 @@ pub fn derive_json_crdt(input: OgTokenStream) -> OgTokenStream {
expanded.into()
}
_ => {
return quote_spanned! { ident.span() => compile_error!("Cannot derive CRDT on tuple or unit structs"); }
quote_spanned! { ident.span() => compile_error!("Cannot derive CRDT on tuple or unit structs"); }
.into()
}
},
_ => return quote_spanned! { ident.span() => compile_error!("Cannot derive CRDT on enums or unions"); }.into(),
_ => quote_spanned! { ident.span() => compile_error!("Cannot derive CRDT on enums or unions"); }.into(),
}
}

View File

@@ -1,11 +1,12 @@
use bft_json_crdt::{
json_crdt::{CrdtNode, JsonValue},
keypair::make_author,
list_crdt::ListCrdt,
op::{Op, OpId, ROOT_ID}, json_crdt::{CrdtNode, JsonValue},
op::{Op, OpId, ROOT_ID},
};
use rand::{rngs::ThreadRng, seq::SliceRandom, Rng};
fn random_op<T: CrdtNode>(arr: &Vec<Op<T>>, rng: &mut ThreadRng) -> OpId {
fn random_op<T: CrdtNode>(arr: &[Op<T>], rng: &mut ThreadRng) -> OpId {
arr.choose(rng).map(|op| op.id).unwrap_or(ROOT_ID)
}

View File

@@ -1,5 +1,7 @@
# BFT-CRDT Oracle Network Demo
THIS IS JUST AI SLOP. DON'T TRUST IT. :) DH
A live demonstration of a decentralized oracle network using Byzantine Fault Tolerant Conflict-free Replicated Data Types (BFT-CRDTs).
## What This Demo Shows
@@ -75,4 +77,4 @@ See the [full documentation](../../docs/use-case-2-oracle-networks.md) for:
- Detailed architecture
- Smart contract integration
- Production deployment guide
- Security analysis
- Security analysis

View File

@@ -1,9 +1,8 @@
use colored::*;
use rand::Rng;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::time::Duration;
mod network;
mod oracle;
mod utils;
/// A simple demonstration of the BFT-CRDT Oracle Network
/// Run with: cargo run -p oracle-demo
@@ -26,400 +25,13 @@ struct PriceAttestation {
timestamp: u64,
}
// ============ Simple CRDT ============
#[derive(Clone)]
struct OracleNetworkCRDT {
attestations: HashMap<String, PriceAttestation>,
oracle_scores: HashMap<OracleId, f64>,
}
impl OracleNetworkCRDT {
fn new() -> Self {
Self {
attestations: HashMap::new(),
oracle_scores: HashMap::new(),
}
}
fn submit_attestation(&mut self, attestation: PriceAttestation) {
self.attestations
.insert(attestation.id.clone(), attestation.clone());
// Update oracle reputation
let score = self
.oracle_scores
.entry(attestation.oracle_id.clone())
.or_insert(0.5);
*score = (*score * 0.95) + 0.05;
}
fn merge(&mut self, other: &Self) {
for (id, attestation) in &other.attestations {
if !self.attestations.contains_key(id) {
self.attestations.insert(id.clone(), attestation.clone());
}
}
for (oracle_id, score) in &other.oracle_scores {
self.oracle_scores.insert(oracle_id.clone(), *score);
}
}
fn get_aggregate_price(
&self,
asset_pair: &AssetPair,
max_age: u64,
) -> Option<(f64, u8, usize)> {
let now = timestamp();
let min_time = now.saturating_sub(max_age);
let mut prices = Vec::new();
for attestation in self.attestations.values() {
if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time {
let weight = self
.oracle_scores
.get(&attestation.oracle_id)
.unwrap_or(&0.5);
prices.push((attestation.price, attestation.confidence, *weight));
}
}
if prices.is_empty() {
return None;
}
// Remove outliers
prices.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
if prices.len() > 4 {
let q1 = prices[prices.len() / 4].0;
let q3 = prices[3 * prices.len() / 4].0;
let iqr = q3 - q1;
let lower = q1 - iqr * 1.5;
let upper = q3 + iqr * 1.5;
prices.retain(|(price, _, _)| *price >= lower && *price <= upper);
}
// Calculate weighted average
let mut total_weight = 0.0;
let mut weighted_sum = 0.0;
let mut confidence_sum = 0.0;
for (price, confidence, weight) in &prices {
let w = (*confidence as f64 / 100.0) * weight;
weighted_sum += price * w;
confidence_sum += *confidence as f64 * w;
total_weight += w;
}
let avg_price = weighted_sum / total_weight;
let avg_confidence = (confidence_sum / total_weight) as u8;
Some((avg_price, avg_confidence, prices.len()))
}
}
// ============ Oracle Node ============
struct OracleNode {
id: OracleId,
crdt: Arc<Mutex<OracleNetworkCRDT>>,
is_byzantine: bool,
base_price: f64,
}
impl OracleNode {
fn new(id: String, is_byzantine: bool) -> Self {
Self {
id: OracleId(id),
crdt: Arc::new(Mutex::new(OracleNetworkCRDT::new())),
is_byzantine,
base_price: 2500.0,
}
}
fn submit_price(&self) {
let mut rng = rand::rng();
let price = if self.is_byzantine {
self.base_price * 1.2 // Try to manipulate 20% higher
} else {
self.base_price * (1.0 + rng.random_range(-0.01..0.01))
};
let attestation = PriceAttestation {
id: format!("{}_{}", self.id.0, timestamp()),
oracle_id: self.id.clone(),
asset_pair: AssetPair("ETH/USD".to_string()),
price,
confidence: if self.is_byzantine { 50 } else { 95 },
timestamp: timestamp(),
};
let mut crdt = self.crdt.lock().unwrap();
crdt.submit_attestation(attestation);
}
}
// ============ Network Simulator ============
struct NetworkSimulator {
nodes: Vec<Arc<OracleNode>>,
partitioned: Arc<Mutex<bool>>,
}
impl NetworkSimulator {
fn new() -> Self {
let mut nodes = Vec::new();
// Create 5 honest nodes
for i in 1..=5 {
nodes.push(Arc::new(OracleNode::new(format!("honest_{}", i), false)));
}
// Create 2 Byzantine nodes
for i in 1..=2 {
nodes.push(Arc::new(OracleNode::new(format!("byzantine_{}", i), true)));
}
Self {
nodes,
partitioned: Arc::new(Mutex::new(false)),
}
}
fn run(&self, duration: Duration) {
println!(
"{}",
"🚀 Starting BFT-CRDT Oracle Network Demo".cyan().bold()
);
println!("{}", "=========================================".cyan());
println!("📊 Network: {} nodes ({} Byzantine)", self.nodes.len(), 2);
println!("⏱️ Duration: {:?}\n", duration);
let start = Instant::now();
// Spawn oracle threads
let handles: Vec<_> = self
.nodes
.iter()
.map(|node| {
let node_clone = Arc::clone(node);
let start_clone = start.clone();
thread::spawn(move || {
while start_clone.elapsed() < duration {
node_clone.submit_price();
thread::sleep(Duration::from_millis(1000));
}
})
})
.collect();
// Spawn network propagation thread
let nodes_clone = self.nodes.clone();
let partitioned_clone = Arc::clone(&self.partitioned);
let start_clone = start.clone();
let propagation_handle = thread::spawn(move || {
while start_clone.elapsed() < duration {
let is_partitioned = *partitioned_clone.lock().unwrap();
// Propagate between nodes
for i in 0..nodes_clone.len() {
for j in 0..nodes_clone.len() {
if i != j {
// Skip if partitioned
if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) {
continue;
}
let crdt1 = nodes_clone[i].crdt.lock().unwrap();
let mut crdt2 = nodes_clone[j].crdt.lock().unwrap();
crdt2.merge(&*crdt1);
}
}
}
thread::sleep(Duration::from_millis(100));
}
});
// Main monitoring loop
let mut last_partition = Instant::now();
while start.elapsed() < duration {
thread::sleep(Duration::from_secs(2));
// Print current state
self.print_network_state();
// Simulate network partition every 10 seconds
if last_partition.elapsed() > Duration::from_secs(10) {
let mut partitioned = self.partitioned.lock().unwrap();
*partitioned = !*partitioned;
if *partitioned {
println!(
"\n{}",
"⚠️ NETWORK PARTITION ACTIVE - Nodes split into two groups"
.yellow()
.bold()
);
} else {
println!(
"\n{}",
"✅ NETWORK PARTITION HEALED - All nodes can communicate"
.green()
.bold()
);
}
last_partition = Instant::now();
}
}
// Wait for threads
for handle in handles {
handle.join().unwrap();
}
propagation_handle.join().unwrap();
// Print final statistics
self.print_final_stats();
}
fn print_network_state(&self) {
println!("\n{}", "📈 Current Network State:".white().bold());
println!("{}", "------------------------".white());
// Get price from each node's perspective
let mut prices = Vec::new();
for node in &self.nodes {
let crdt = node.crdt.lock().unwrap();
if let Some((price, confidence, sources)) =
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60)
{
prices.push((node.id.0.clone(), price, confidence, sources));
let price_str = format!("${:.2}", price);
let confidence_str = format!("{}%", confidence);
let line = if node.is_byzantine {
format!(
" {} sees: {} (confidence: {}, sources: {})",
node.id.0.red(),
price_str.red(),
confidence_str.red(),
sources
)
} else {
format!(
" {} sees: {} (confidence: {}, sources: {})",
node.id.0.green(),
price_str.green(),
confidence_str.green(),
sources
)
};
println!("{}", line);
}
}
// Calculate network consensus
if !prices.is_empty() {
let avg_price: f64 =
prices.iter().map(|(_, p, _, _)| *p).sum::<f64>() / prices.len() as f64;
let min_price = prices
.iter()
.map(|(_, p, _, _)| *p)
.fold(f64::INFINITY, |a, b| a.min(b));
let max_price = prices
.iter()
.map(|(_, p, _, _)| *p)
.fold(f64::NEG_INFINITY, |a, b| a.max(b));
let deviation = ((max_price - min_price) / avg_price) * 100.0;
println!("\n{}", "📊 Network Consensus:".cyan().bold());
println!(" Average: {}", format!("${:.2}", avg_price).cyan());
println!(
" Range: {} - {}",
format!("${:.2}", min_price).cyan(),
format!("${:.2}", max_price).cyan()
);
println!(" Max Deviation: {}", format!("{:.2}%", deviation).cyan());
}
}
fn print_final_stats(&self) {
println!("\n\n{}", "🏁 Final Statistics".yellow().bold());
println!("{}", "===================".yellow());
let mut total_attestations = 0;
let mut oracle_stats = Vec::new();
for node in &self.nodes {
let crdt = node.crdt.lock().unwrap();
let node_attestations = crdt.attestations.len();
total_attestations += node_attestations;
let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5);
oracle_stats.push((
node.id.0.clone(),
node_attestations,
*score,
node.is_byzantine,
));
}
println!("\n{}", "📈 Oracle Performance:".white().bold());
for (id, attestations, score, is_byzantine) in oracle_stats {
let icon = if is_byzantine { "🔴" } else { "🟢" };
let line = format!(
" {} {} - Attestations: {}, Reputation: {:.2}",
icon, id, attestations, score
);
if is_byzantine {
println!("{}", line.red());
} else {
println!("{}", line.green());
}
}
println!("\n{}", "📊 Network Totals:".cyan().bold());
println!(" Total Attestations: {}", total_attestations);
println!(
" Attestations/second: {:.2}",
total_attestations as f64 / 30.0
);
// Show that Byzantine nodes were filtered out
if let Some(node) = self.nodes.first() {
let crdt = node.crdt.lock().unwrap();
if let Some((price, confidence, _)) =
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300)
{
println!("\n{}", "✅ Final Aggregated Price:".green().bold());
println!(
" {} (confidence: {}%)",
format!("${:.2}", price).green().bold(),
confidence
);
println!(" {}", "Despite Byzantine manipulation attempts!".green());
}
}
}
}
// ============ Helper Functions ============
fn timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}
// ============ Main Function ============
fn main() {
println!("{}", "BFT-CRDT Oracle Network Demo".cyan().bold());
println!("{}", "============================\n".cyan());
let simulator = NetworkSimulator::new();
let simulator = network::Simulator::new();
simulator.run(Duration::from_secs(30));
println!("\n{}", "✅ Demo completed!".green().bold());

View File

@@ -0,0 +1,251 @@
use crate::{oracle, AssetPair};
use colored::Colorize;
use std::{
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
pub(crate) struct Simulator {
nodes: Vec<Arc<oracle::Node>>,
partitioned: Arc<Mutex<bool>>,
}
impl Simulator {
pub(crate) fn new() -> Self {
let mut nodes = Vec::new();
// Create 5 honest nodes
for i in 1..=5 {
nodes.push(Arc::new(oracle::Node::new(format!("honest_{}", i), false)));
}
// Create 2 Byzantine nodes
for i in 1..=2 {
nodes.push(Arc::new(oracle::Node::new(
format!("byzantine_{}", i),
true,
)));
}
Self {
nodes,
partitioned: Arc::new(Mutex::new(false)),
}
}
pub(crate) fn run(&self, duration: Duration) {
println!(
"{}",
"🚀 Starting BFT-CRDT Oracle Network Demo".cyan().bold()
);
println!("{}", "=========================================".cyan());
println!("📊 Network: {} nodes ({} Byzantine)", self.nodes.len(), 2);
println!("⏱️ Duration: {:?}\n", duration);
let start = Instant::now();
// Spawn oracle threads
let handles: Vec<_> = self
.nodes
.iter()
.map(|node| {
let node_clone = Arc::clone(node);
let start_clone = start;
thread::spawn(move || {
while start_clone.elapsed() < duration {
node_clone.submit_price();
thread::sleep(Duration::from_millis(1000));
}
})
})
.collect();
// Spawn network propagation thread
let nodes_clone = self.nodes.clone();
let partitioned_clone = Arc::clone(&self.partitioned);
let start_clone = start;
let propagation_handle = thread::spawn(move || {
while start_clone.elapsed() < duration {
let is_partitioned = *partitioned_clone.lock().unwrap();
// Propagate between nodes
for i in 0..nodes_clone.len() {
for j in 0..nodes_clone.len() {
if i != j {
// Skip if partitioned
if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) {
continue;
}
let crdt1 = nodes_clone[i].crdt.lock().unwrap();
let mut crdt2 = nodes_clone[j].crdt.lock().unwrap();
crdt2.merge(&crdt1);
}
}
}
thread::sleep(Duration::from_millis(100));
}
});
// Main monitoring loop
let mut last_partition = Instant::now();
while start.elapsed() < duration {
thread::sleep(Duration::from_secs(2));
// Print current state
self.print_network_state();
// Simulate network partition every 10 seconds
if last_partition.elapsed() > Duration::from_secs(10) {
let mut partitioned = self.partitioned.lock().unwrap();
*partitioned = !*partitioned;
if *partitioned {
println!(
"\n{}",
"⚠️ NETWORK PARTITION ACTIVE - Nodes split into two groups"
.yellow()
.bold()
);
} else {
println!(
"\n{}",
"✅ NETWORK PARTITION HEALED - All nodes can communicate"
.green()
.bold()
);
}
last_partition = Instant::now();
}
}
// Wait for threads
for handle in handles {
handle.join().unwrap();
}
propagation_handle.join().unwrap();
// Print final statistics
self.print_final_stats();
}
fn print_network_state(&self) {
println!("\n{}", "📈 Current Network State:".white().bold());
println!("{}", "------------------------".white());
// Get price from each node's perspective
let mut prices = Vec::new();
for node in &self.nodes {
let crdt = node.crdt.lock().unwrap();
if let Some((price, confidence, sources)) =
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60)
{
prices.push((node.id.0.clone(), price, confidence, sources));
let price_str = format!("${:.2}", price);
let confidence_str = format!("{}%", confidence);
let line = if node.is_byzantine {
format!(
" {} sees: {} (confidence: {}, sources: {})",
node.id.0.red(),
price_str.red(),
confidence_str.red(),
sources
)
} else {
format!(
" {} sees: {} (confidence: {}, sources: {})",
node.id.0.green(),
price_str.green(),
confidence_str.green(),
sources
)
};
println!("{}", line);
}
}
// Calculate network consensus
if !prices.is_empty() {
let avg_price: f64 =
prices.iter().map(|(_, p, _, _)| *p).sum::<f64>() / prices.len() as f64;
let min_price = prices
.iter()
.map(|(_, p, _, _)| *p)
.fold(f64::INFINITY, |a, b| a.min(b));
let max_price = prices
.iter()
.map(|(_, p, _, _)| *p)
.fold(f64::NEG_INFINITY, |a, b| a.max(b));
let deviation = ((max_price - min_price) / avg_price) * 100.0;
println!("\n{}", "📊 Network Consensus:".cyan().bold());
println!(" Average: {}", format!("${:.2}", avg_price).cyan());
println!(
" Range: {} - {}",
format!("${:.2}", min_price).cyan(),
format!("${:.2}", max_price).cyan()
);
println!(" Max Deviation: {}", format!("{:.2}%", deviation).cyan());
}
}
fn print_final_stats(&self) {
println!("\n\n{}", "🏁 Final Statistics".yellow().bold());
println!("{}", "===================".yellow());
let mut total_attestations = 0;
let mut oracle_stats = Vec::new();
for node in &self.nodes {
let crdt = node.crdt.lock().unwrap();
let node_attestations = crdt.attestations.len();
total_attestations += node_attestations;
let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5);
oracle_stats.push((
node.id.0.clone(),
node_attestations,
*score,
node.is_byzantine,
));
}
println!("\n{}", "📈 Oracle Performance:".white().bold());
for (id, attestations, score, is_byzantine) in oracle_stats {
let icon = if is_byzantine { "🔴" } else { "🟢" };
let line = format!(
" {} {} - Attestations: {}, Reputation: {:.2}",
icon, id, attestations, score
);
if is_byzantine {
println!("{}", line.red());
} else {
println!("{}", line.green());
}
}
println!("\n{}", "📊 Network Totals:".cyan().bold());
println!(" Total Attestations: {}", total_attestations);
println!(
" Attestations/second: {:.2}",
total_attestations as f64 / 30.0
);
// Show that Byzantine nodes were filtered out
if let Some(node) = self.nodes.first() {
let crdt = node.crdt.lock().unwrap();
if let Some((price, confidence, _)) =
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300)
{
println!("\n{}", "✅ Final Aggregated Price:".green().bold());
println!(
" {} (confidence: {}%)",
format!("${:.2}", price).green().bold(),
confidence
);
println!(" {}", "Despite Byzantine manipulation attempts!".green());
}
}
}
}

View File

@@ -0,0 +1,5 @@
mod network_crdt;
mod node;
pub(crate) use network_crdt::NetworkCRDT;
pub(crate) use node::OracleNode as Node;

View File

@@ -0,0 +1,94 @@
use std::collections::HashMap;
use crate::{utils, AssetPair, OracleId, PriceAttestation};
#[derive(Clone)]
pub(crate) struct NetworkCRDT {
pub(crate) attestations: HashMap<String, PriceAttestation>,
pub(crate) oracle_scores: HashMap<OracleId, f64>,
}
impl NetworkCRDT {
pub(crate) fn new() -> Self {
Self {
attestations: HashMap::new(),
oracle_scores: HashMap::new(),
}
}
pub(crate) fn submit_attestation(&mut self, attestation: PriceAttestation) {
self.attestations
.insert(attestation.id.clone(), attestation.clone());
// Update oracle reputation
let score = self
.oracle_scores
.entry(attestation.oracle_id.clone())
.or_insert(0.5);
*score = (*score * 0.95) + 0.05;
}
pub(crate) fn merge(&mut self, other: &Self) {
for (id, attestation) in &other.attestations {
if !self.attestations.contains_key(id) {
self.attestations.insert(id.clone(), attestation.clone());
}
}
for (oracle_id, score) in &other.oracle_scores {
self.oracle_scores.insert(oracle_id.clone(), *score);
}
}
pub(crate) fn get_aggregate_price(
&self,
asset_pair: &AssetPair,
max_age: u64,
) -> Option<(f64, u8, usize)> {
let now = utils::timestamp();
let min_time = now.saturating_sub(max_age);
let mut prices = Vec::new();
for attestation in self.attestations.values() {
if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time {
let weight = self
.oracle_scores
.get(&attestation.oracle_id)
.unwrap_or(&0.5);
prices.push((attestation.price, attestation.confidence, *weight));
}
}
if prices.is_empty() {
return None;
}
// Remove outliers
prices.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
if prices.len() > 4 {
let q1 = prices[prices.len() / 4].0;
let q3 = prices[3 * prices.len() / 4].0;
let iqr = q3 - q1;
let lower = q1 - iqr * 1.5;
let upper = q3 + iqr * 1.5;
prices.retain(|(price, _, _)| *price >= lower && *price <= upper);
}
// Calculate weighted average
let mut total_weight = 0.0;
let mut weighted_sum = 0.0;
let mut confidence_sum = 0.0;
for (price, confidence, weight) in &prices {
let w = (*confidence as f64 / 100.0) * weight;
weighted_sum += price * w;
confidence_sum += *confidence as f64 * w;
total_weight += w;
}
let avg_price = weighted_sum / total_weight;
let avg_confidence = (confidence_sum / total_weight) as u8;
Some((avg_price, avg_confidence, prices.len()))
}
}

View File

@@ -0,0 +1,43 @@
use crate::{oracle, utils, AssetPair, OracleId, PriceAttestation};
use rand::Rng;
use std::sync::{Arc, Mutex};
pub(crate) struct OracleNode {
pub(crate) id: OracleId,
pub(crate) crdt: Arc<Mutex<oracle::NetworkCRDT>>,
pub(crate) is_byzantine: bool,
pub(crate) base_price: f64,
}
impl OracleNode {
pub(crate) fn new(id: String, is_byzantine: bool) -> Self {
Self {
id: OracleId(id),
crdt: Arc::new(Mutex::new(oracle::NetworkCRDT::new())),
is_byzantine,
base_price: 2500.0,
}
}
pub(crate) fn submit_price(&self) {
let mut rng = rand::rng();
let price = if self.is_byzantine {
self.base_price * 1.2 // Try to manipulate 20% higher
} else {
self.base_price * (1.0 + rng.random_range(-0.01..0.01))
};
let attestation = PriceAttestation {
id: format!("{}_{}", self.id.0, utils::timestamp()),
oracle_id: self.id.clone(),
asset_pair: AssetPair("ETH/USD".to_string()),
price,
confidence: if self.is_byzantine { 50 } else { 95 },
timestamp: utils::timestamp(),
};
let mut crdt = self.crdt.lock().unwrap();
crdt.submit_attestation(attestation);
}
}

View File

@@ -0,0 +1,8 @@
use std::time::{SystemTime, UNIX_EPOCH};
pub(crate) fn timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}

View File

@@ -1,7 +1,7 @@
use std::{
fs::{self, File},
io::Write,
path::PathBuf,
path::{Path, PathBuf},
};
use bft_json_crdt::keypair::{make_keypair, Ed25519KeyPair};
@@ -13,12 +13,12 @@ pub(crate) fn write(key_path: &PathBuf) -> Result<(), std::io::Error> {
let mut file = File::create(key_path)?;
let out = keys.encode_base64();
file.write(out.as_bytes())?;
file.write_all(out.as_bytes())?;
Ok(())
}
pub(crate) fn load_from_file(side_dir: &PathBuf) -> Ed25519KeyPair {
let key_path = crate::utils::side_paths(side_dir.clone()).0;
pub(crate) fn load_from_file(side_dir: &Path) -> Ed25519KeyPair {
let key_path = crate::utils::side_paths(side_dir.to_path_buf()).0;
let data = fs::read_to_string(key_path).expect("couldn't read bft-bft-crdt key file");
println!("data: {:?}", data);

View File

@@ -6,8 +6,6 @@ use bft_json_crdt::{
use serde::{Deserialize, Serialize};
pub mod keys;
pub mod stdin;
pub mod websocket;
#[add_crdt_fields]
#[derive(Clone, CrdtNode, Serialize, Deserialize, Debug)]

View File

@@ -2,9 +2,7 @@ use clap::Parser;
use clap::Subcommand;
pub(crate) fn parse_args() -> Args {
let args = Args::parse();
args
Args::parse()
}
/// A P2P BFT info sharing node

View File

@@ -1,7 +1,6 @@
use bft_crdt::websocket;
use bft_crdt::TransactionList;
use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
use cli::{parse_args, Commands};
use cli::Commands;
use node::SideNode;
use tokio::{sync::mpsc, task};
@@ -9,11 +8,13 @@ pub mod bft_crdt;
pub(crate) mod cli;
pub(crate) mod init;
pub mod node;
mod stdin;
pub mod utils;
pub mod websocket;
#[tokio::main]
pub async fn run() {
let args = parse_args();
let args = cli::parse_args();
match &args.command {
Some(Commands::Init { name }) => {
@@ -32,21 +33,22 @@ pub async fn run() {
}
/// Wire everything up outside the application so that we can test more easily later
async fn setup(name: &String) -> SideNode {
// First, load up the keys and create a bft-bft-crdt
let side_dir = utils::home(name);
let bft_crdt_keys = bft_crdt::keys::load_from_file(&side_dir);
async fn setup(name: &str) -> SideNode {
// First, load up the keys and create a bft-crdt node
let home_dir = utils::home(name);
let bft_crdt_keys = bft_crdt::keys::load_from_file(&home_dir);
let crdt = BaseCrdt::<TransactionList>::new(&bft_crdt_keys);
// Channels for internal communication, and a tokio task for stdin input
let (incoming_sender, incoming_receiver) = mpsc::channel::<SignedOp>(32);
let (stdin_sender, stdin_receiver) = std::sync::mpsc::channel();
task::spawn(async move {
bft_crdt::stdin::input(stdin_sender);
stdin::input(stdin_sender);
});
// Finally, create the node and return it
// Wire the websocket client to the incoming channel
let handle = websocket::Client::new(incoming_sender).await;
// Finally, create the node and return it
let node = SideNode::new(
crdt,
bft_crdt_keys,

View File

@@ -1,5 +1,3 @@
use crdt_node;
fn main() {
crdt_node::run();
}

View File

@@ -3,7 +3,7 @@ use bft_json_crdt::json_crdt::{BaseCrdt, SignedOp};
use fastcrypto::ed25519::Ed25519KeyPair;
use tokio::sync::mpsc;
use crate::{bft_crdt::websocket::Client, bft_crdt::TransactionList, utils};
use crate::{bft_crdt::TransactionList, utils, websocket::Client};
pub struct SideNode {
crdt: BaseCrdt<TransactionList>,
@@ -21,36 +21,29 @@ impl SideNode {
stdin_receiver: std::sync::mpsc::Receiver<String>,
handle: ezsockets::Client<Client>,
) -> Self {
let node = Self {
Self {
crdt,
bft_crdt_keys,
incoming_receiver,
stdin_receiver,
handle,
};
node
}
}
pub(crate) async fn start(&mut self) {
println!("Starting node...");
loop {
match self.stdin_receiver.try_recv() {
Ok(stdin) => {
let transaction = utils::fake_generic_transaction_json(stdin);
let json = serde_json::to_value(transaction).unwrap();
let signed_op = self.add_transaction_local(json);
println!("STDIN: {}", utils::shappy(signed_op.clone()));
self.send_to_network(signed_op).await;
}
Err(_) => {} // ignore empty channel errors in this PoC
if let Ok(stdin) = self.stdin_receiver.try_recv() {
let transaction = utils::fake_generic_transaction_json(stdin);
let json = serde_json::to_value(transaction).unwrap();
let signed_op = self.add_transaction_local(json);
println!("STDIN: {}", utils::sha_op(signed_op.clone()));
self.send_to_network(signed_op).await;
}
match self.incoming_receiver.try_recv() {
Ok(incoming) => {
println!("INCOMING: {}", utils::shappy(incoming.clone()));
self.handle_incoming(incoming);
}
Err(_) => {} // ignore empty channel errors in this PoC
if let Ok(incoming) = self.incoming_receiver.try_recv() {
println!("INCOMING: {}", utils::sha_op(incoming.clone()));
self.handle_incoming(incoming);
}
}
}
@@ -76,14 +69,13 @@ impl SideNode {
.ops
.last()
.expect("couldn't find last op");
let signed_op = self
.crdt
// self.trace_crdt();
self.crdt
.doc
.list
.insert(last.id, transaction)
.sign(&self.bft_crdt_keys);
// self.trace_crdt();
signed_op
.sign(&self.bft_crdt_keys)
}
/// Print the current state of the CRDT, can be used to debug

View File

@@ -33,12 +33,12 @@ pub fn fake_generic_transaction_json(from: String) -> Value {
})
}
pub fn shappy(op: SignedOp) -> String {
pub fn sha_op(op: SignedOp) -> String {
let b = serde_json::to_string(&op).unwrap().into_bytes();
sha256::digest(b).to_string()
}
pub fn shassy(text: String) -> String {
pub fn sha_string(text: String) -> String {
let b = text.into_bytes();
sha256::digest(b).to_string()
}

View File

@@ -39,10 +39,10 @@ impl ezsockets::ClientExt for Client {
/// When we receive a text message, apply the bft-crdt operation contained in it to our
/// local bft-crdt.
async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> {
let string_sha = utils::shassy(text.clone());
let string_sha = utils::sha_string(text.clone());
println!("received text, sha: {string_sha}");
let incoming: bft_json_crdt::json_crdt::SignedOp = serde_json::from_str(&text).unwrap();
let object_sha = utils::shappy(incoming.clone());
let object_sha = utils::sha_op(incoming.clone());
println!("deserialized: {}", object_sha);
if string_sha != object_sha {
panic!("sha mismatch: {string_sha} != {object_sha}, bft-bft-crdt has failed");

View File

@@ -2,7 +2,7 @@ use bft_json_crdt::{
json_crdt::{BaseCrdt, SignedOp},
keypair::make_keypair,
};
use crdt_node::{bft_crdt::websocket::Client, bft_crdt::TransactionList, node::SideNode, utils};
use crdt_node::{bft_crdt::TransactionList, node::SideNode, utils, websocket};
use tokio::sync::mpsc;
#[tokio::test]
@@ -42,13 +42,13 @@ async fn setup(_: &str) -> SideNode {
let (_, stdin_receiver) = std::sync::mpsc::channel();
// Finally, create the node and return it
let handle = Client::new(incoming_sender).await;
let node = SideNode::new(
let handle = websocket::Client::new(incoming_sender).await;
SideNode::new(
crdt,
bft_crdt_keys,
incoming_receiver,
stdin_receiver,
handle,
);
node
)
}

View File

@@ -0,0 +1,41 @@
# Use Case 5: Payment Cluster
## Abstract case (no external integrations)
1. Node1 is initialized, with a data store and 100 tokens.
2. Node2 starts up.
3. Node1 sends 1 token to Node2 by signing a transfer statement.
a. if Node1 is honest, it will update its own store to 99 tokens
b. Node2 will always update its internal store. It also has proof that Node1 has transferred 1 token.
4. Node3 shows up. It asks the other two nodes for state.
a. Node2 will send correct state
b. if Node1 is honest, it will send correct state
c. if Node1 is dishonest, it will send 100 tokens as its state.
d. Node3 asks Node2 for proof. Node2 sends it.
So far, it kind of works. Alternately there could be two ways to go:
1. all nodes send all history to newcomers or,
2. if all nodes send the same state, there may be no need for anybody to request the total state. Processing can start from this point.
There are a lot of questions here. As soon as a node goes offline, others can play dishonesty games, obviously.
Are there ways of
(a) checkpointing to some outside system
(b) checkpointing internally
(c) playing some economic game where some "core" nodes are always up and notionally have some kind of financial incentives?
We can make it pretty decentralised, but still, the system would need to have *somewhere* that new nodes could look up where they could start interacting, and also get some notion of who was currently connected (so that they would be able to broadcast messages to all nodes).
As long as there is 1 honest node does the system work?
Maybe not. There could be like 99 dishonest nodes that broadcast messages to each other, but not to the 100th (honest) node. In such a case, how could anybody tell what the "real" history was?
Could a system of acks fix this?
Only if there was a way of determining who was "in" and who was "out".
So like maybe there is a core node for the cluster that people pay fees to? And those fees could go into a big pot and be paid out periodically. If anybody comes up with proof that a core node has behaved dishonestly, they get the fee pot. There could be even maybe 4 "core" nodes per cluster so that there was some redundancy available.
Enforcement against a core node, or any other node that had "misbehaved" would be an interesting question...