430 lines
14 KiB
Rust
430 lines
14 KiB
Rust
use std::collections::{BTreeMap, HashMap, HashSet};
|
|
use std::sync::{Arc, Mutex};
|
|
use std::thread;
|
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
|
|
|
/// A runnable demonstration of the BFT-CRDT Oracle Network
|
|
///
|
|
/// This example shows:
|
|
/// 1. Multiple oracle nodes submitting prices independently
|
|
/// 2. Byzantine oracles trying to manipulate prices
|
|
/// 3. Network partitions and healing
|
|
/// 4. Real-time price aggregation without consensus
|
|
///
|
|
/// Run with: cargo run --example run_oracle_demo
|
|
|
|
// ============ Core Types ============
|
|
|
|
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
|
|
struct OracleId(String);
|
|
|
|
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
|
|
struct AssetPair(String);
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct PriceAttestation {
|
|
id: String,
|
|
oracle_id: OracleId,
|
|
asset_pair: AssetPair,
|
|
price: u128,
|
|
confidence: u8,
|
|
timestamp: u64,
|
|
sources: Vec<DataSource>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct DataSource {
|
|
name: String,
|
|
price: u128,
|
|
volume: u128,
|
|
}
|
|
|
|
// ============ Simple CRDT Implementation ============
|
|
|
|
#[derive(Clone)]
|
|
struct OracleNetworkCRDT {
|
|
attestations: HashMap<String, PriceAttestation>,
|
|
oracle_scores: HashMap<OracleId, f64>,
|
|
}
|
|
|
|
impl OracleNetworkCRDT {
|
|
fn new() -> Self {
|
|
Self {
|
|
attestations: HashMap::new(),
|
|
oracle_scores: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
fn submit_attestation(&mut self, attestation: PriceAttestation) {
|
|
self.attestations
|
|
.insert(attestation.id.clone(), attestation.clone());
|
|
|
|
// Update oracle score
|
|
let score = self
|
|
.oracle_scores
|
|
.entry(attestation.oracle_id.clone())
|
|
.or_insert(0.5);
|
|
*score = (*score * 0.95) + 0.05; // Simple reputation update
|
|
}
|
|
|
|
fn merge(&mut self, other: &Self) {
|
|
// Merge attestations
|
|
for (id, attestation) in &other.attestations {
|
|
if !self.attestations.contains_key(id) {
|
|
self.attestations.insert(id.clone(), attestation.clone());
|
|
}
|
|
}
|
|
|
|
// Merge oracle scores
|
|
for (oracle_id, score) in &other.oracle_scores {
|
|
self.oracle_scores.insert(oracle_id.clone(), *score);
|
|
}
|
|
}
|
|
|
|
fn get_aggregate_price(
|
|
&self,
|
|
asset_pair: &AssetPair,
|
|
max_age: u64,
|
|
) -> Option<(u128, u8, usize)> {
|
|
let now = timestamp();
|
|
let min_time = now.saturating_sub(max_age);
|
|
|
|
let mut prices = Vec::new();
|
|
for attestation in self.attestations.values() {
|
|
if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time {
|
|
let weight = self
|
|
.oracle_scores
|
|
.get(&attestation.oracle_id)
|
|
.unwrap_or(&0.5);
|
|
prices.push((attestation.price, attestation.confidence, *weight));
|
|
}
|
|
}
|
|
|
|
if prices.is_empty() {
|
|
return None;
|
|
}
|
|
|
|
// Remove outliers using simple IQR method
|
|
prices.sort_by_key(|(price, _, _)| *price);
|
|
let q1_idx = prices.len() / 4;
|
|
let q3_idx = 3 * prices.len() / 4;
|
|
|
|
if prices.len() > 4 {
|
|
let q1 = prices[q1_idx].0;
|
|
let q3 = prices[q3_idx].0;
|
|
let iqr = q3.saturating_sub(q1);
|
|
let lower = q1.saturating_sub(iqr * 3 / 2);
|
|
let upper = q3.saturating_add(iqr * 3 / 2);
|
|
|
|
prices.retain(|(price, _, _)| *price >= lower && *price <= upper);
|
|
}
|
|
|
|
// Calculate weighted average
|
|
let mut total_weight = 0.0;
|
|
let mut weighted_sum = 0.0;
|
|
let mut confidence_sum = 0.0;
|
|
|
|
for (price, confidence, weight) in &prices {
|
|
let w = (*confidence as f64 / 100.0) * weight;
|
|
weighted_sum += *price as f64 * w;
|
|
confidence_sum += *confidence as f64 * w;
|
|
total_weight += w;
|
|
}
|
|
|
|
let avg_price = (weighted_sum / total_weight) as u128;
|
|
let avg_confidence = (confidence_sum / total_weight) as u8;
|
|
|
|
Some((avg_price, avg_confidence, prices.len()))
|
|
}
|
|
}
|
|
|
|
// ============ Oracle Node ============
|
|
|
|
struct OracleNode {
|
|
id: OracleId,
|
|
crdt: Arc<Mutex<OracleNetworkCRDT>>,
|
|
is_byzantine: bool,
|
|
base_price: u128,
|
|
}
|
|
|
|
impl OracleNode {
|
|
fn new(id: String, is_byzantine: bool) -> Self {
|
|
Self {
|
|
id: OracleId(id),
|
|
crdt: Arc::new(Mutex::new(OracleNetworkCRDT::new())),
|
|
is_byzantine,
|
|
base_price: 2500_000_000, // $2500 with 6 decimals
|
|
}
|
|
}
|
|
|
|
fn submit_price(&self) {
|
|
let price = if self.is_byzantine {
|
|
// Byzantine nodes try to manipulate
|
|
self.base_price * 120 / 100 // 20% higher
|
|
} else {
|
|
// Honest nodes add realistic variance
|
|
let variance = (rand() * 0.02 - 0.01) as f64;
|
|
((self.base_price as f64) * (1.0 + variance)) as u128
|
|
};
|
|
|
|
let attestation = PriceAttestation {
|
|
id: format!("{}_{}", self.id.0, timestamp()),
|
|
oracle_id: self.id.clone(),
|
|
asset_pair: AssetPair("ETH/USD".to_string()),
|
|
price,
|
|
confidence: if self.is_byzantine { 50 } else { 95 },
|
|
timestamp: timestamp(),
|
|
sources: vec![
|
|
DataSource {
|
|
name: "Binance".to_string(),
|
|
price,
|
|
volume: 1000_000_000,
|
|
},
|
|
DataSource {
|
|
name: "Coinbase".to_string(),
|
|
price: price + 1_000_000, // Slight difference
|
|
volume: 800_000_000,
|
|
},
|
|
],
|
|
};
|
|
|
|
let mut crdt = self.crdt.lock().unwrap();
|
|
crdt.submit_attestation(attestation);
|
|
}
|
|
}
|
|
|
|
// ============ Network Simulation ============
|
|
|
|
struct NetworkSimulator {
|
|
nodes: Vec<Arc<OracleNode>>,
|
|
partitioned: Arc<Mutex<bool>>,
|
|
}
|
|
|
|
impl NetworkSimulator {
|
|
fn new() -> Self {
|
|
let mut nodes = Vec::new();
|
|
|
|
// Create 5 honest nodes
|
|
for i in 1..=5 {
|
|
nodes.push(Arc::new(OracleNode::new(format!("honest_{}", i), false)));
|
|
}
|
|
|
|
// Create 2 Byzantine nodes
|
|
for i in 1..=2 {
|
|
nodes.push(Arc::new(OracleNode::new(format!("byzantine_{}", i), true)));
|
|
}
|
|
|
|
Self {
|
|
nodes,
|
|
partitioned: Arc::new(Mutex::new(false)),
|
|
}
|
|
}
|
|
|
|
fn run(&self, duration: Duration) {
|
|
println!("🚀 Starting BFT-CRDT Oracle Network Demo");
|
|
println!("=========================================");
|
|
println!("📊 Network: {} nodes ({} Byzantine)", self.nodes.len(), 2);
|
|
println!("⏱️ Duration: {:?}\n", duration);
|
|
|
|
let start = Instant::now();
|
|
|
|
// Spawn oracle threads
|
|
let handles: Vec<_> = self
|
|
.nodes
|
|
.iter()
|
|
.map(|node| {
|
|
let node_clone = Arc::clone(node);
|
|
thread::spawn(move || {
|
|
while start.elapsed() < duration {
|
|
node_clone.submit_price();
|
|
thread::sleep(Duration::from_millis(1000));
|
|
}
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
// Spawn network propagation thread
|
|
let nodes_clone = self.nodes.clone();
|
|
let partitioned_clone = Arc::clone(&self.partitioned);
|
|
let propagation_handle = thread::spawn(move || {
|
|
while start.elapsed() < duration {
|
|
let is_partitioned = *partitioned_clone.lock().unwrap();
|
|
|
|
// Propagate between nodes
|
|
for i in 0..nodes_clone.len() {
|
|
for j in 0..nodes_clone.len() {
|
|
if i != j {
|
|
// Skip if partitioned
|
|
if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) {
|
|
continue;
|
|
}
|
|
|
|
let crdt1 = nodes_clone[i].crdt.lock().unwrap();
|
|
let mut crdt2 = nodes_clone[j].crdt.lock().unwrap();
|
|
crdt2.merge(&*crdt1);
|
|
}
|
|
}
|
|
}
|
|
thread::sleep(Duration::from_millis(100));
|
|
}
|
|
});
|
|
|
|
// Main monitoring loop
|
|
let mut last_partition = Instant::now();
|
|
while start.elapsed() < duration {
|
|
thread::sleep(Duration::from_secs(2));
|
|
|
|
// Print current state
|
|
self.print_network_state();
|
|
|
|
// Simulate network partition every 10 seconds
|
|
if last_partition.elapsed() > Duration::from_secs(10) {
|
|
let mut partitioned = self.partitioned.lock().unwrap();
|
|
*partitioned = !*partitioned;
|
|
if *partitioned {
|
|
println!("\n⚠️ NETWORK PARTITION ACTIVE - Nodes split into two groups");
|
|
} else {
|
|
println!("\n✅ NETWORK PARTITION HEALED - All nodes can communicate");
|
|
}
|
|
last_partition = Instant::now();
|
|
}
|
|
}
|
|
|
|
// Wait for threads
|
|
for handle in handles {
|
|
handle.join().unwrap();
|
|
}
|
|
propagation_handle.join().unwrap();
|
|
|
|
// Print final statistics
|
|
self.print_final_stats();
|
|
}
|
|
|
|
fn print_network_state(&self) {
|
|
println!("\n📈 Current Network State:");
|
|
println!("------------------------");
|
|
|
|
// Get price from each node's perspective
|
|
let mut prices = Vec::new();
|
|
for node in &self.nodes {
|
|
let crdt = node.crdt.lock().unwrap();
|
|
if let Some((price, confidence, sources)) =
|
|
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60)
|
|
{
|
|
prices.push((node.id.0.clone(), price, confidence, sources));
|
|
println!(
|
|
" {} sees: ${:.2} (confidence: {}%, sources: {})",
|
|
node.id.0,
|
|
price as f64 / 1_000_000.0,
|
|
confidence,
|
|
sources
|
|
);
|
|
}
|
|
}
|
|
|
|
// Calculate network consensus
|
|
if !prices.is_empty() {
|
|
let avg_price: u128 =
|
|
prices.iter().map(|(_, p, _, _)| *p).sum::<u128>() / prices.len() as u128;
|
|
let min_price = prices.iter().map(|(_, p, _, _)| *p).min().unwrap();
|
|
let max_price = prices.iter().map(|(_, p, _, _)| *p).max().unwrap();
|
|
let deviation = ((max_price - min_price) as f64 / avg_price as f64) * 100.0;
|
|
|
|
println!("\n📊 Network Consensus:");
|
|
println!(" Average: ${:.2}", avg_price as f64 / 1_000_000.0);
|
|
println!(
|
|
" Range: ${:.2} - ${:.2}",
|
|
min_price as f64 / 1_000_000.0,
|
|
max_price as f64 / 1_000_000.0
|
|
);
|
|
println!(" Max Deviation: {:.2}%", deviation);
|
|
}
|
|
}
|
|
|
|
fn print_final_stats(&self) {
|
|
println!("\n\n🏁 Final Statistics");
|
|
println!("===================");
|
|
|
|
let mut total_attestations = 0;
|
|
let mut oracle_stats = Vec::new();
|
|
|
|
for node in &self.nodes {
|
|
let crdt = node.crdt.lock().unwrap();
|
|
let node_attestations = crdt.attestations.len();
|
|
total_attestations += node_attestations;
|
|
|
|
let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5);
|
|
oracle_stats.push((node.id.0.clone(), node_attestations, *score));
|
|
}
|
|
|
|
println!("\n📈 Oracle Performance:");
|
|
for (id, attestations, score) in oracle_stats {
|
|
let node_type = if id.starts_with("byzantine") {
|
|
"🔴"
|
|
} else {
|
|
"🟢"
|
|
};
|
|
println!(
|
|
" {} {} - Attestations: {}, Reputation: {:.2}",
|
|
node_type, id, attestations, score
|
|
);
|
|
}
|
|
|
|
println!("\n📊 Network Totals:");
|
|
println!(" Total Attestations: {}", total_attestations);
|
|
println!(
|
|
" Attestations/second: {:.2}",
|
|
total_attestations as f64 / 30.0
|
|
);
|
|
|
|
// Show that Byzantine nodes were filtered out
|
|
if let Some(node) = self.nodes.first() {
|
|
let crdt = node.crdt.lock().unwrap();
|
|
if let Some((price, confidence, sources)) =
|
|
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300)
|
|
{
|
|
println!(
|
|
"\n✅ Final Aggregated Price: ${:.2} (confidence: {}%)",
|
|
price as f64 / 1_000_000.0,
|
|
confidence
|
|
);
|
|
println!(" Despite Byzantine manipulation attempts!");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ============ Helper Functions ============
|
|
|
|
fn timestamp() -> u64 {
|
|
SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_secs()
|
|
}
|
|
|
|
fn rand() -> f64 {
|
|
let nanos = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_nanos();
|
|
((nanos % 1000) as f64) / 1000.0
|
|
}
|
|
|
|
// ============ Main Function ============
|
|
|
|
fn main() {
|
|
println!("BFT-CRDT Oracle Network Demo");
|
|
println!("============================\n");
|
|
|
|
let simulator = NetworkSimulator::new();
|
|
simulator.run(Duration::from_secs(30));
|
|
|
|
println!("\n✅ Demo completed!");
|
|
println!("\n💡 Key Takeaways:");
|
|
println!(" • Oracles submitted prices without coordination");
|
|
println!(" • Byzantine nodes couldn't corrupt the aggregate price");
|
|
println!(" • Network partitions were handled gracefully");
|
|
println!(" • No consensus protocol was needed!");
|
|
}
|