use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; /// A runnable demonstration of the BFT-CRDT Oracle Network /// /// This example shows: /// 1. Multiple oracle nodes submitting prices independently /// 2. Byzantine oracles trying to manipulate prices /// 3. Network partitions and healing /// 4. Real-time price aggregation without consensus /// /// Run with: cargo run --example run_oracle_demo // ============ Core Types ============ #[derive(Debug, Clone, Hash, Eq, PartialEq)] struct OracleId(String); #[derive(Debug, Clone, Hash, Eq, PartialEq)] struct AssetPair(String); #[derive(Debug, Clone)] struct PriceAttestation { id: String, oracle_id: OracleId, asset_pair: AssetPair, price: u128, confidence: u8, timestamp: u64, sources: Vec, } #[derive(Debug, Clone)] struct DataSource { name: String, price: u128, volume: u128, } // ============ Simple CRDT Implementation ============ #[derive(Clone)] struct OracleNetworkCRDT { attestations: HashMap, oracle_scores: HashMap, } impl OracleNetworkCRDT { fn new() -> Self { Self { attestations: HashMap::new(), oracle_scores: HashMap::new(), } } fn submit_attestation(&mut self, attestation: PriceAttestation) { self.attestations .insert(attestation.id.clone(), attestation.clone()); // Update oracle score let score = self .oracle_scores .entry(attestation.oracle_id.clone()) .or_insert(0.5); *score = (*score * 0.95) + 0.05; // Simple reputation update } fn merge(&mut self, other: &Self) { // Merge attestations for (id, attestation) in &other.attestations { if !self.attestations.contains_key(id) { self.attestations.insert(id.clone(), attestation.clone()); } } // Merge oracle scores for (oracle_id, score) in &other.oracle_scores { self.oracle_scores.insert(oracle_id.clone(), *score); } } fn get_aggregate_price( &self, asset_pair: &AssetPair, max_age: u64, ) -> Option<(u128, u8, usize)> { let now = timestamp(); let min_time = now.saturating_sub(max_age); let mut prices = Vec::new(); for attestation in self.attestations.values() { if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time { let weight = self .oracle_scores .get(&attestation.oracle_id) .unwrap_or(&0.5); prices.push((attestation.price, attestation.confidence, *weight)); } } if prices.is_empty() { return None; } // Remove outliers using simple IQR method prices.sort_by_key(|(price, _, _)| *price); let q1_idx = prices.len() / 4; let q3_idx = 3 * prices.len() / 4; if prices.len() > 4 { let q1 = prices[q1_idx].0; let q3 = prices[q3_idx].0; let iqr = q3.saturating_sub(q1); let lower = q1.saturating_sub(iqr * 3 / 2); let upper = q3.saturating_add(iqr * 3 / 2); prices.retain(|(price, _, _)| *price >= lower && *price <= upper); } // Calculate weighted average let mut total_weight = 0.0; let mut weighted_sum = 0.0; let mut confidence_sum = 0.0; for (price, confidence, weight) in &prices { let w = (*confidence as f64 / 100.0) * weight; weighted_sum += *price as f64 * w; confidence_sum += *confidence as f64 * w; total_weight += w; } let avg_price = (weighted_sum / total_weight) as u128; let avg_confidence = (confidence_sum / total_weight) as u8; Some((avg_price, avg_confidence, prices.len())) } } // ============ Oracle Node ============ struct OracleNode { id: OracleId, crdt: Arc>, is_byzantine: bool, base_price: u128, } impl OracleNode { fn new(id: String, is_byzantine: bool) -> Self { Self { id: OracleId(id), crdt: Arc::new(Mutex::new(OracleNetworkCRDT::new())), is_byzantine, base_price: 2500_000_000, // $2500 with 6 decimals } } fn submit_price(&self) { let price = if self.is_byzantine { // Byzantine nodes try to manipulate self.base_price * 120 / 100 // 20% higher } else { // Honest nodes add realistic variance let variance = (rand() * 0.02 - 0.01) as f64; ((self.base_price as f64) * (1.0 + variance)) as u128 }; let attestation = PriceAttestation { id: format!("{}_{}", self.id.0, timestamp()), oracle_id: self.id.clone(), asset_pair: AssetPair("ETH/USD".to_string()), price, confidence: if self.is_byzantine { 50 } else { 95 }, timestamp: timestamp(), sources: vec![ DataSource { name: "Binance".to_string(), price, volume: 1000_000_000, }, DataSource { name: "Coinbase".to_string(), price: price + 1_000_000, // Slight difference volume: 800_000_000, }, ], }; let mut crdt = self.crdt.lock().unwrap(); crdt.submit_attestation(attestation); } } // ============ Network Simulation ============ struct NetworkSimulator { nodes: Vec>, partitioned: Arc>, } impl NetworkSimulator { fn new() -> Self { let mut nodes = Vec::new(); // Create 5 honest nodes for i in 1..=5 { nodes.push(Arc::new(OracleNode::new(format!("honest_{}", i), false))); } // Create 2 Byzantine nodes for i in 1..=2 { nodes.push(Arc::new(OracleNode::new(format!("byzantine_{}", i), true))); } Self { nodes, partitioned: Arc::new(Mutex::new(false)), } } fn run(&self, duration: Duration) { println!("šŸš€ Starting BFT-CRDT Oracle Network Demo"); println!("========================================="); println!("šŸ“Š Network: {} nodes ({} Byzantine)", self.nodes.len(), 2); println!("ā±ļø Duration: {:?}\n", duration); let start = Instant::now(); // Spawn oracle threads let handles: Vec<_> = self .nodes .iter() .map(|node| { let node_clone = Arc::clone(node); thread::spawn(move || { while start.elapsed() < duration { node_clone.submit_price(); thread::sleep(Duration::from_millis(1000)); } }) }) .collect(); // Spawn network propagation thread let nodes_clone = self.nodes.clone(); let partitioned_clone = Arc::clone(&self.partitioned); let propagation_handle = thread::spawn(move || { while start.elapsed() < duration { let is_partitioned = *partitioned_clone.lock().unwrap(); // Propagate between nodes for i in 0..nodes_clone.len() { for j in 0..nodes_clone.len() { if i != j { // Skip if partitioned if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) { continue; } let crdt1 = nodes_clone[i].crdt.lock().unwrap(); let mut crdt2 = nodes_clone[j].crdt.lock().unwrap(); crdt2.merge(&*crdt1); } } } thread::sleep(Duration::from_millis(100)); } }); // Main monitoring loop let mut last_partition = Instant::now(); while start.elapsed() < duration { thread::sleep(Duration::from_secs(2)); // Print current state self.print_network_state(); // Simulate network partition every 10 seconds if last_partition.elapsed() > Duration::from_secs(10) { let mut partitioned = self.partitioned.lock().unwrap(); *partitioned = !*partitioned; if *partitioned { println!("\nāš ļø NETWORK PARTITION ACTIVE - Nodes split into two groups"); } else { println!("\nāœ… NETWORK PARTITION HEALED - All nodes can communicate"); } last_partition = Instant::now(); } } // Wait for threads for handle in handles { handle.join().unwrap(); } propagation_handle.join().unwrap(); // Print final statistics self.print_final_stats(); } fn print_network_state(&self) { println!("\nšŸ“ˆ Current Network State:"); println!("------------------------"); // Get price from each node's perspective let mut prices = Vec::new(); for node in &self.nodes { let crdt = node.crdt.lock().unwrap(); if let Some((price, confidence, sources)) = crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60) { prices.push((node.id.0.clone(), price, confidence, sources)); println!( " {} sees: ${:.2} (confidence: {}%, sources: {})", node.id.0, price as f64 / 1_000_000.0, confidence, sources ); } } // Calculate network consensus if !prices.is_empty() { let avg_price: u128 = prices.iter().map(|(_, p, _, _)| *p).sum::() / prices.len() as u128; let min_price = prices.iter().map(|(_, p, _, _)| *p).min().unwrap(); let max_price = prices.iter().map(|(_, p, _, _)| *p).max().unwrap(); let deviation = ((max_price - min_price) as f64 / avg_price as f64) * 100.0; println!("\nšŸ“Š Network Consensus:"); println!(" Average: ${:.2}", avg_price as f64 / 1_000_000.0); println!( " Range: ${:.2} - ${:.2}", min_price as f64 / 1_000_000.0, max_price as f64 / 1_000_000.0 ); println!(" Max Deviation: {:.2}%", deviation); } } fn print_final_stats(&self) { println!("\n\nšŸ Final Statistics"); println!("==================="); let mut total_attestations = 0; let mut oracle_stats = Vec::new(); for node in &self.nodes { let crdt = node.crdt.lock().unwrap(); let node_attestations = crdt.attestations.len(); total_attestations += node_attestations; let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5); oracle_stats.push((node.id.0.clone(), node_attestations, *score)); } println!("\nšŸ“ˆ Oracle Performance:"); for (id, attestations, score) in oracle_stats { let node_type = if id.starts_with("byzantine") { "šŸ”“" } else { "🟢" }; println!( " {} {} - Attestations: {}, Reputation: {:.2}", node_type, id, attestations, score ); } println!("\nšŸ“Š Network Totals:"); println!(" Total Attestations: {}", total_attestations); println!( " Attestations/second: {:.2}", total_attestations as f64 / 30.0 ); // Show that Byzantine nodes were filtered out if let Some(node) = self.nodes.first() { let crdt = node.crdt.lock().unwrap(); if let Some((price, confidence, sources)) = crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300) { println!( "\nāœ… Final Aggregated Price: ${:.2} (confidence: {}%)", price as f64 / 1_000_000.0, confidence ); println!(" Despite Byzantine manipulation attempts!"); } } } } // ============ Helper Functions ============ fn timestamp() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() } fn rand() -> f64 { let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_nanos(); ((nanos % 1000) as f64) / 1000.0 } // ============ Main Function ============ fn main() { println!("BFT-CRDT Oracle Network Demo"); println!("============================\n"); let simulator = NetworkSimulator::new(); simulator.run(Duration::from_secs(30)); println!("\nāœ… Demo completed!"); println!("\nšŸ’” Key Takeaways:"); println!(" • Oracles submitted prices without coordination"); println!(" • Byzantine nodes couldn't corrupt the aggregate price"); println!(" • Network partitions were handled gracefully"); println!(" • No consensus protocol was needed!"); }