diff --git a/crates/oracle-demo/src/main.rs b/crates/oracle-demo/src/main.rs index e6ea906..364b4b0 100644 --- a/crates/oracle-demo/src/main.rs +++ b/crates/oracle-demo/src/main.rs @@ -1,9 +1,8 @@ use colored::*; -use rand::Rng; -use std::collections::HashMap; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::Duration; +mod network; +mod oracle; +mod utils; /// A simple demonstration of the BFT-CRDT Oracle Network /// Run with: cargo run -p oracle-demo @@ -26,400 +25,13 @@ struct PriceAttestation { timestamp: u64, } -// ============ Simple CRDT ============ - -#[derive(Clone)] -struct OracleNetworkCRDT { - attestations: HashMap, - oracle_scores: HashMap, -} - -impl OracleNetworkCRDT { - fn new() -> Self { - Self { - attestations: HashMap::new(), - oracle_scores: HashMap::new(), - } - } - - fn submit_attestation(&mut self, attestation: PriceAttestation) { - self.attestations - .insert(attestation.id.clone(), attestation.clone()); - - // Update oracle reputation - let score = self - .oracle_scores - .entry(attestation.oracle_id.clone()) - .or_insert(0.5); - *score = (*score * 0.95) + 0.05; - } - - fn merge(&mut self, other: &Self) { - for (id, attestation) in &other.attestations { - if !self.attestations.contains_key(id) { - self.attestations.insert(id.clone(), attestation.clone()); - } - } - - for (oracle_id, score) in &other.oracle_scores { - self.oracle_scores.insert(oracle_id.clone(), *score); - } - } - - fn get_aggregate_price( - &self, - asset_pair: &AssetPair, - max_age: u64, - ) -> Option<(f64, u8, usize)> { - let now = timestamp(); - let min_time = now.saturating_sub(max_age); - - let mut prices = Vec::new(); - for attestation in self.attestations.values() { - if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time { - let weight = self - .oracle_scores - .get(&attestation.oracle_id) - .unwrap_or(&0.5); - prices.push((attestation.price, attestation.confidence, *weight)); - } - } - - if prices.is_empty() { - return None; - } - - // Remove outliers - prices.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); - if prices.len() > 4 { - let q1 = prices[prices.len() / 4].0; - let q3 = prices[3 * prices.len() / 4].0; - let iqr = q3 - q1; - let lower = q1 - iqr * 1.5; - let upper = q3 + iqr * 1.5; - prices.retain(|(price, _, _)| *price >= lower && *price <= upper); - } - - // Calculate weighted average - let mut total_weight = 0.0; - let mut weighted_sum = 0.0; - let mut confidence_sum = 0.0; - - for (price, confidence, weight) in &prices { - let w = (*confidence as f64 / 100.0) * weight; - weighted_sum += price * w; - confidence_sum += *confidence as f64 * w; - total_weight += w; - } - - let avg_price = weighted_sum / total_weight; - let avg_confidence = (confidence_sum / total_weight) as u8; - - Some((avg_price, avg_confidence, prices.len())) - } -} - -// ============ Oracle Node ============ - -struct OracleNode { - id: OracleId, - crdt: Arc>, - is_byzantine: bool, - base_price: f64, -} - -impl OracleNode { - fn new(id: String, is_byzantine: bool) -> Self { - Self { - id: OracleId(id), - crdt: Arc::new(Mutex::new(OracleNetworkCRDT::new())), - is_byzantine, - base_price: 2500.0, - } - } - - fn submit_price(&self) { - let mut rng = rand::rng(); - - let price = if self.is_byzantine { - self.base_price * 1.2 // Try to manipulate 20% higher - } else { - self.base_price * (1.0 + rng.random_range(-0.01..0.01)) - }; - - let attestation = PriceAttestation { - id: format!("{}_{}", self.id.0, timestamp()), - oracle_id: self.id.clone(), - asset_pair: AssetPair("ETH/USD".to_string()), - price, - confidence: if self.is_byzantine { 50 } else { 95 }, - timestamp: timestamp(), - }; - - let mut crdt = self.crdt.lock().unwrap(); - crdt.submit_attestation(attestation); - } -} - -// ============ Network Simulator ============ - -struct NetworkSimulator { - nodes: Vec>, - partitioned: Arc>, -} - -impl NetworkSimulator { - fn new() -> Self { - let mut nodes = Vec::new(); - - // Create 5 honest nodes - for i in 1..=5 { - nodes.push(Arc::new(OracleNode::new(format!("honest_{}", i), false))); - } - - // Create 2 Byzantine nodes - for i in 1..=2 { - nodes.push(Arc::new(OracleNode::new(format!("byzantine_{}", i), true))); - } - - Self { - nodes, - partitioned: Arc::new(Mutex::new(false)), - } - } - - fn run(&self, duration: Duration) { - println!( - "{}", - "🚀 Starting BFT-CRDT Oracle Network Demo".cyan().bold() - ); - println!("{}", "=========================================".cyan()); - println!("📊 Network: {} nodes ({} Byzantine)", self.nodes.len(), 2); - println!("⏱️ Duration: {:?}\n", duration); - - let start = Instant::now(); - - // Spawn oracle threads - let handles: Vec<_> = self - .nodes - .iter() - .map(|node| { - let node_clone = Arc::clone(node); - let start_clone = start.clone(); - thread::spawn(move || { - while start_clone.elapsed() < duration { - node_clone.submit_price(); - thread::sleep(Duration::from_millis(1000)); - } - }) - }) - .collect(); - - // Spawn network propagation thread - let nodes_clone = self.nodes.clone(); - let partitioned_clone = Arc::clone(&self.partitioned); - let start_clone = start.clone(); - let propagation_handle = thread::spawn(move || { - while start_clone.elapsed() < duration { - let is_partitioned = *partitioned_clone.lock().unwrap(); - - // Propagate between nodes - for i in 0..nodes_clone.len() { - for j in 0..nodes_clone.len() { - if i != j { - // Skip if partitioned - if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) { - continue; - } - - let crdt1 = nodes_clone[i].crdt.lock().unwrap(); - let mut crdt2 = nodes_clone[j].crdt.lock().unwrap(); - crdt2.merge(&*crdt1); - } - } - } - thread::sleep(Duration::from_millis(100)); - } - }); - - // Main monitoring loop - let mut last_partition = Instant::now(); - while start.elapsed() < duration { - thread::sleep(Duration::from_secs(2)); - - // Print current state - self.print_network_state(); - - // Simulate network partition every 10 seconds - if last_partition.elapsed() > Duration::from_secs(10) { - let mut partitioned = self.partitioned.lock().unwrap(); - *partitioned = !*partitioned; - if *partitioned { - println!( - "\n{}", - "⚠️ NETWORK PARTITION ACTIVE - Nodes split into two groups" - .yellow() - .bold() - ); - } else { - println!( - "\n{}", - "✅ NETWORK PARTITION HEALED - All nodes can communicate" - .green() - .bold() - ); - } - last_partition = Instant::now(); - } - } - - // Wait for threads - for handle in handles { - handle.join().unwrap(); - } - propagation_handle.join().unwrap(); - - // Print final statistics - self.print_final_stats(); - } - - fn print_network_state(&self) { - println!("\n{}", "📈 Current Network State:".white().bold()); - println!("{}", "------------------------".white()); - - // Get price from each node's perspective - let mut prices = Vec::new(); - for node in &self.nodes { - let crdt = node.crdt.lock().unwrap(); - if let Some((price, confidence, sources)) = - crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60) - { - prices.push((node.id.0.clone(), price, confidence, sources)); - let price_str = format!("${:.2}", price); - let confidence_str = format!("{}%", confidence); - - let line = if node.is_byzantine { - format!( - " {} sees: {} (confidence: {}, sources: {})", - node.id.0.red(), - price_str.red(), - confidence_str.red(), - sources - ) - } else { - format!( - " {} sees: {} (confidence: {}, sources: {})", - node.id.0.green(), - price_str.green(), - confidence_str.green(), - sources - ) - }; - println!("{}", line); - } - } - - // Calculate network consensus - if !prices.is_empty() { - let avg_price: f64 = - prices.iter().map(|(_, p, _, _)| *p).sum::() / prices.len() as f64; - let min_price = prices - .iter() - .map(|(_, p, _, _)| *p) - .fold(f64::INFINITY, |a, b| a.min(b)); - let max_price = prices - .iter() - .map(|(_, p, _, _)| *p) - .fold(f64::NEG_INFINITY, |a, b| a.max(b)); - let deviation = ((max_price - min_price) / avg_price) * 100.0; - - println!("\n{}", "📊 Network Consensus:".cyan().bold()); - println!(" Average: {}", format!("${:.2}", avg_price).cyan()); - println!( - " Range: {} - {}", - format!("${:.2}", min_price).cyan(), - format!("${:.2}", max_price).cyan() - ); - println!(" Max Deviation: {}", format!("{:.2}%", deviation).cyan()); - } - } - - fn print_final_stats(&self) { - println!("\n\n{}", "🏁 Final Statistics".yellow().bold()); - println!("{}", "===================".yellow()); - - let mut total_attestations = 0; - let mut oracle_stats = Vec::new(); - - for node in &self.nodes { - let crdt = node.crdt.lock().unwrap(); - let node_attestations = crdt.attestations.len(); - total_attestations += node_attestations; - - let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5); - oracle_stats.push(( - node.id.0.clone(), - node_attestations, - *score, - node.is_byzantine, - )); - } - - println!("\n{}", "📈 Oracle Performance:".white().bold()); - for (id, attestations, score, is_byzantine) in oracle_stats { - let icon = if is_byzantine { "🔴" } else { "🟢" }; - let line = format!( - " {} {} - Attestations: {}, Reputation: {:.2}", - icon, id, attestations, score - ); - if is_byzantine { - println!("{}", line.red()); - } else { - println!("{}", line.green()); - } - } - - println!("\n{}", "📊 Network Totals:".cyan().bold()); - println!(" Total Attestations: {}", total_attestations); - println!( - " Attestations/second: {:.2}", - total_attestations as f64 / 30.0 - ); - - // Show that Byzantine nodes were filtered out - if let Some(node) = self.nodes.first() { - let crdt = node.crdt.lock().unwrap(); - if let Some((price, confidence, _)) = - crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300) - { - println!("\n{}", "✅ Final Aggregated Price:".green().bold()); - println!( - " {} (confidence: {}%)", - format!("${:.2}", price).green().bold(), - confidence - ); - println!(" {}", "Despite Byzantine manipulation attempts!".green()); - } - } - } -} - -// ============ Helper Functions ============ - -fn timestamp() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() -} - // ============ Main Function ============ fn main() { println!("{}", "BFT-CRDT Oracle Network Demo".cyan().bold()); println!("{}", "============================\n".cyan()); - let simulator = NetworkSimulator::new(); + let simulator = network::Simulator::new(); simulator.run(Duration::from_secs(30)); println!("\n{}", "✅ Demo completed!".green().bold()); diff --git a/crates/oracle-demo/src/network.rs b/crates/oracle-demo/src/network.rs new file mode 100644 index 0000000..4c1634d --- /dev/null +++ b/crates/oracle-demo/src/network.rs @@ -0,0 +1,253 @@ +use std::{ + sync::{Arc, Mutex}, + thread, + time::{Duration, Instant}, +}; + +use colored::Colorize; + +use crate::{oracle, AssetPair}; + +pub(crate) struct Simulator { + nodes: Vec>, + partitioned: Arc>, +} + +impl Simulator { + pub(crate) fn new() -> Self { + let mut nodes = Vec::new(); + + // Create 5 honest nodes + for i in 1..=5 { + nodes.push(Arc::new(oracle::Node::new(format!("honest_{}", i), false))); + } + + // Create 2 Byzantine nodes + for i in 1..=2 { + nodes.push(Arc::new(oracle::Node::new( + format!("byzantine_{}", i), + true, + ))); + } + + Self { + nodes, + partitioned: Arc::new(Mutex::new(false)), + } + } + + pub(crate) fn run(&self, duration: Duration) { + println!( + "{}", + "🚀 Starting BFT-CRDT Oracle Network Demo".cyan().bold() + ); + println!("{}", "=========================================".cyan()); + println!("📊 Network: {} nodes ({} Byzantine)", self.nodes.len(), 2); + println!("⏱️ Duration: {:?}\n", duration); + + let start = Instant::now(); + + // Spawn oracle threads + let handles: Vec<_> = self + .nodes + .iter() + .map(|node| { + let node_clone = Arc::clone(node); + let start_clone = start.clone(); + thread::spawn(move || { + while start_clone.elapsed() < duration { + node_clone.submit_price(); + thread::sleep(Duration::from_millis(1000)); + } + }) + }) + .collect(); + + // Spawn network propagation thread + let nodes_clone = self.nodes.clone(); + let partitioned_clone = Arc::clone(&self.partitioned); + let start_clone = start.clone(); + let propagation_handle = thread::spawn(move || { + while start_clone.elapsed() < duration { + let is_partitioned = *partitioned_clone.lock().unwrap(); + + // Propagate between nodes + for i in 0..nodes_clone.len() { + for j in 0..nodes_clone.len() { + if i != j { + // Skip if partitioned + if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) { + continue; + } + + let crdt1 = nodes_clone[i].crdt.lock().unwrap(); + let mut crdt2 = nodes_clone[j].crdt.lock().unwrap(); + crdt2.merge(&*crdt1); + } + } + } + thread::sleep(Duration::from_millis(100)); + } + }); + + // Main monitoring loop + let mut last_partition = Instant::now(); + while start.elapsed() < duration { + thread::sleep(Duration::from_secs(2)); + + // Print current state + self.print_network_state(); + + // Simulate network partition every 10 seconds + if last_partition.elapsed() > Duration::from_secs(10) { + let mut partitioned = self.partitioned.lock().unwrap(); + *partitioned = !*partitioned; + if *partitioned { + println!( + "\n{}", + "⚠️ NETWORK PARTITION ACTIVE - Nodes split into two groups" + .yellow() + .bold() + ); + } else { + println!( + "\n{}", + "✅ NETWORK PARTITION HEALED - All nodes can communicate" + .green() + .bold() + ); + } + last_partition = Instant::now(); + } + } + + // Wait for threads + for handle in handles { + handle.join().unwrap(); + } + propagation_handle.join().unwrap(); + + // Print final statistics + self.print_final_stats(); + } + + fn print_network_state(&self) { + println!("\n{}", "📈 Current Network State:".white().bold()); + println!("{}", "------------------------".white()); + + // Get price from each node's perspective + let mut prices = Vec::new(); + for node in &self.nodes { + let crdt = node.crdt.lock().unwrap(); + if let Some((price, confidence, sources)) = + crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60) + { + prices.push((node.id.0.clone(), price, confidence, sources)); + let price_str = format!("${:.2}", price); + let confidence_str = format!("{}%", confidence); + + let line = if node.is_byzantine { + format!( + " {} sees: {} (confidence: {}, sources: {})", + node.id.0.red(), + price_str.red(), + confidence_str.red(), + sources + ) + } else { + format!( + " {} sees: {} (confidence: {}, sources: {})", + node.id.0.green(), + price_str.green(), + confidence_str.green(), + sources + ) + }; + println!("{}", line); + } + } + + // Calculate network consensus + if !prices.is_empty() { + let avg_price: f64 = + prices.iter().map(|(_, p, _, _)| *p).sum::() / prices.len() as f64; + let min_price = prices + .iter() + .map(|(_, p, _, _)| *p) + .fold(f64::INFINITY, |a, b| a.min(b)); + let max_price = prices + .iter() + .map(|(_, p, _, _)| *p) + .fold(f64::NEG_INFINITY, |a, b| a.max(b)); + let deviation = ((max_price - min_price) / avg_price) * 100.0; + + println!("\n{}", "📊 Network Consensus:".cyan().bold()); + println!(" Average: {}", format!("${:.2}", avg_price).cyan()); + println!( + " Range: {} - {}", + format!("${:.2}", min_price).cyan(), + format!("${:.2}", max_price).cyan() + ); + println!(" Max Deviation: {}", format!("{:.2}%", deviation).cyan()); + } + } + + fn print_final_stats(&self) { + println!("\n\n{}", "🏁 Final Statistics".yellow().bold()); + println!("{}", "===================".yellow()); + + let mut total_attestations = 0; + let mut oracle_stats = Vec::new(); + + for node in &self.nodes { + let crdt = node.crdt.lock().unwrap(); + let node_attestations = crdt.attestations.len(); + total_attestations += node_attestations; + + let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5); + oracle_stats.push(( + node.id.0.clone(), + node_attestations, + *score, + node.is_byzantine, + )); + } + + println!("\n{}", "📈 Oracle Performance:".white().bold()); + for (id, attestations, score, is_byzantine) in oracle_stats { + let icon = if is_byzantine { "🔴" } else { "🟢" }; + let line = format!( + " {} {} - Attestations: {}, Reputation: {:.2}", + icon, id, attestations, score + ); + if is_byzantine { + println!("{}", line.red()); + } else { + println!("{}", line.green()); + } + } + + println!("\n{}", "📊 Network Totals:".cyan().bold()); + println!(" Total Attestations: {}", total_attestations); + println!( + " Attestations/second: {:.2}", + total_attestations as f64 / 30.0 + ); + + // Show that Byzantine nodes were filtered out + if let Some(node) = self.nodes.first() { + let crdt = node.crdt.lock().unwrap(); + if let Some((price, confidence, _)) = + crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300) + { + println!("\n{}", "✅ Final Aggregated Price:".green().bold()); + println!( + " {} (confidence: {}%)", + format!("${:.2}", price).green().bold(), + confidence + ); + println!(" {}", "Despite Byzantine manipulation attempts!".green()); + } + } + } +} diff --git a/crates/oracle-demo/src/oracle/mod.rs b/crates/oracle-demo/src/oracle/mod.rs new file mode 100644 index 0000000..5f38ad7 --- /dev/null +++ b/crates/oracle-demo/src/oracle/mod.rs @@ -0,0 +1,5 @@ +mod network_crdt; +mod node; + +pub(crate) use network_crdt::OracleNetworkCRDT; +pub(crate) use node::OracleNode as Node; diff --git a/crates/oracle-demo/src/oracle/network_crdt.rs b/crates/oracle-demo/src/oracle/network_crdt.rs new file mode 100644 index 0000000..5c3964c --- /dev/null +++ b/crates/oracle-demo/src/oracle/network_crdt.rs @@ -0,0 +1,94 @@ +use std::collections::HashMap; + +use crate::{utils, AssetPair, OracleId, PriceAttestation}; + +#[derive(Clone)] +pub(crate) struct OracleNetworkCRDT { + pub(crate) attestations: HashMap, + pub(crate) oracle_scores: HashMap, +} + +impl OracleNetworkCRDT { + pub(crate) fn new() -> Self { + Self { + attestations: HashMap::new(), + oracle_scores: HashMap::new(), + } + } + + pub(crate) fn submit_attestation(&mut self, attestation: PriceAttestation) { + self.attestations + .insert(attestation.id.clone(), attestation.clone()); + + // Update oracle reputation + let score = self + .oracle_scores + .entry(attestation.oracle_id.clone()) + .or_insert(0.5); + *score = (*score * 0.95) + 0.05; + } + + pub(crate) fn merge(&mut self, other: &Self) { + for (id, attestation) in &other.attestations { + if !self.attestations.contains_key(id) { + self.attestations.insert(id.clone(), attestation.clone()); + } + } + + for (oracle_id, score) in &other.oracle_scores { + self.oracle_scores.insert(oracle_id.clone(), *score); + } + } + + pub(crate) fn get_aggregate_price( + &self, + asset_pair: &AssetPair, + max_age: u64, + ) -> Option<(f64, u8, usize)> { + let now = utils::timestamp(); + let min_time = now.saturating_sub(max_age); + + let mut prices = Vec::new(); + for attestation in self.attestations.values() { + if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time { + let weight = self + .oracle_scores + .get(&attestation.oracle_id) + .unwrap_or(&0.5); + prices.push((attestation.price, attestation.confidence, *weight)); + } + } + + if prices.is_empty() { + return None; + } + + // Remove outliers + prices.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap()); + if prices.len() > 4 { + let q1 = prices[prices.len() / 4].0; + let q3 = prices[3 * prices.len() / 4].0; + let iqr = q3 - q1; + let lower = q1 - iqr * 1.5; + let upper = q3 + iqr * 1.5; + prices.retain(|(price, _, _)| *price >= lower && *price <= upper); + } + + // Calculate weighted average + let mut total_weight = 0.0; + let mut weighted_sum = 0.0; + let mut confidence_sum = 0.0; + + for (price, confidence, weight) in &prices { + let w = (*confidence as f64 / 100.0) * weight; + weighted_sum += price * w; + confidence_sum += *confidence as f64 * w; + total_weight += w; + } + + let avg_price = weighted_sum / total_weight; + let avg_confidence = (confidence_sum / total_weight) as u8; + + Some((avg_price, avg_confidence, prices.len())) + } +} diff --git a/crates/oracle-demo/src/oracle/node.rs b/crates/oracle-demo/src/oracle/node.rs new file mode 100644 index 0000000..f059287 --- /dev/null +++ b/crates/oracle-demo/src/oracle/node.rs @@ -0,0 +1,45 @@ +use std::sync::{Arc, Mutex}; + +use rand::Rng; + +use crate::{utils, AssetPair, OracleId, PriceAttestation}; + +pub(crate) struct OracleNode { + pub(crate) id: OracleId, + pub(crate) crdt: Arc>, + pub(crate) is_byzantine: bool, + pub(crate) base_price: f64, +} + +impl OracleNode { + pub(crate) fn new(id: String, is_byzantine: bool) -> Self { + Self { + id: OracleId(id), + crdt: Arc::new(Mutex::new(super::OracleNetworkCRDT::new())), + is_byzantine, + base_price: 2500.0, + } + } + + pub(crate) fn submit_price(&self) { + let mut rng = rand::rng(); + + let price = if self.is_byzantine { + self.base_price * 1.2 // Try to manipulate 20% higher + } else { + self.base_price * (1.0 + rng.random_range(-0.01..0.01)) + }; + + let attestation = PriceAttestation { + id: format!("{}_{}", self.id.0, utils::timestamp()), + oracle_id: self.id.clone(), + asset_pair: AssetPair("ETH/USD".to_string()), + price, + confidence: if self.is_byzantine { 50 } else { 95 }, + timestamp: utils::timestamp(), + }; + + let mut crdt = self.crdt.lock().unwrap(); + crdt.submit_attestation(attestation); + } +} diff --git a/crates/oracle-demo/src/utils.rs b/crates/oracle-demo/src/utils.rs new file mode 100644 index 0000000..063bf91 --- /dev/null +++ b/crates/oracle-demo/src/utils.rs @@ -0,0 +1,8 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +pub(crate) fn timestamp() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() +}