Refactored into multiple modules

This commit is contained in:
Dave
2025-06-12 15:49:04 -04:00
parent 7878bb9149
commit e2d50144ca
6 changed files with 410 additions and 393 deletions

View File

@@ -1,9 +1,8 @@
use colored::*;
use rand::Rng;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::time::Duration;
mod network;
mod oracle;
mod utils;
/// A simple demonstration of the BFT-CRDT Oracle Network
/// Run with: cargo run -p oracle-demo
@@ -26,400 +25,13 @@ struct PriceAttestation {
timestamp: u64,
}
// ============ Simple CRDT ============
#[derive(Clone)]
struct OracleNetworkCRDT {
attestations: HashMap<String, PriceAttestation>,
oracle_scores: HashMap<OracleId, f64>,
}
impl OracleNetworkCRDT {
fn new() -> Self {
Self {
attestations: HashMap::new(),
oracle_scores: HashMap::new(),
}
}
fn submit_attestation(&mut self, attestation: PriceAttestation) {
self.attestations
.insert(attestation.id.clone(), attestation.clone());
// Update oracle reputation
let score = self
.oracle_scores
.entry(attestation.oracle_id.clone())
.or_insert(0.5);
*score = (*score * 0.95) + 0.05;
}
fn merge(&mut self, other: &Self) {
for (id, attestation) in &other.attestations {
if !self.attestations.contains_key(id) {
self.attestations.insert(id.clone(), attestation.clone());
}
}
for (oracle_id, score) in &other.oracle_scores {
self.oracle_scores.insert(oracle_id.clone(), *score);
}
}
fn get_aggregate_price(
&self,
asset_pair: &AssetPair,
max_age: u64,
) -> Option<(f64, u8, usize)> {
let now = timestamp();
let min_time = now.saturating_sub(max_age);
let mut prices = Vec::new();
for attestation in self.attestations.values() {
if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time {
let weight = self
.oracle_scores
.get(&attestation.oracle_id)
.unwrap_or(&0.5);
prices.push((attestation.price, attestation.confidence, *weight));
}
}
if prices.is_empty() {
return None;
}
// Remove outliers
prices.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
if prices.len() > 4 {
let q1 = prices[prices.len() / 4].0;
let q3 = prices[3 * prices.len() / 4].0;
let iqr = q3 - q1;
let lower = q1 - iqr * 1.5;
let upper = q3 + iqr * 1.5;
prices.retain(|(price, _, _)| *price >= lower && *price <= upper);
}
// Calculate weighted average
let mut total_weight = 0.0;
let mut weighted_sum = 0.0;
let mut confidence_sum = 0.0;
for (price, confidence, weight) in &prices {
let w = (*confidence as f64 / 100.0) * weight;
weighted_sum += price * w;
confidence_sum += *confidence as f64 * w;
total_weight += w;
}
let avg_price = weighted_sum / total_weight;
let avg_confidence = (confidence_sum / total_weight) as u8;
Some((avg_price, avg_confidence, prices.len()))
}
}
// ============ Oracle Node ============
struct OracleNode {
id: OracleId,
crdt: Arc<Mutex<OracleNetworkCRDT>>,
is_byzantine: bool,
base_price: f64,
}
impl OracleNode {
fn new(id: String, is_byzantine: bool) -> Self {
Self {
id: OracleId(id),
crdt: Arc::new(Mutex::new(OracleNetworkCRDT::new())),
is_byzantine,
base_price: 2500.0,
}
}
fn submit_price(&self) {
let mut rng = rand::rng();
let price = if self.is_byzantine {
self.base_price * 1.2 // Try to manipulate 20% higher
} else {
self.base_price * (1.0 + rng.random_range(-0.01..0.01))
};
let attestation = PriceAttestation {
id: format!("{}_{}", self.id.0, timestamp()),
oracle_id: self.id.clone(),
asset_pair: AssetPair("ETH/USD".to_string()),
price,
confidence: if self.is_byzantine { 50 } else { 95 },
timestamp: timestamp(),
};
let mut crdt = self.crdt.lock().unwrap();
crdt.submit_attestation(attestation);
}
}
// ============ Network Simulator ============
struct NetworkSimulator {
nodes: Vec<Arc<OracleNode>>,
partitioned: Arc<Mutex<bool>>,
}
impl NetworkSimulator {
fn new() -> Self {
let mut nodes = Vec::new();
// Create 5 honest nodes
for i in 1..=5 {
nodes.push(Arc::new(OracleNode::new(format!("honest_{}", i), false)));
}
// Create 2 Byzantine nodes
for i in 1..=2 {
nodes.push(Arc::new(OracleNode::new(format!("byzantine_{}", i), true)));
}
Self {
nodes,
partitioned: Arc::new(Mutex::new(false)),
}
}
fn run(&self, duration: Duration) {
println!(
"{}",
"🚀 Starting BFT-CRDT Oracle Network Demo".cyan().bold()
);
println!("{}", "=========================================".cyan());
println!("📊 Network: {} nodes ({} Byzantine)", self.nodes.len(), 2);
println!("⏱️ Duration: {:?}\n", duration);
let start = Instant::now();
// Spawn oracle threads
let handles: Vec<_> = self
.nodes
.iter()
.map(|node| {
let node_clone = Arc::clone(node);
let start_clone = start.clone();
thread::spawn(move || {
while start_clone.elapsed() < duration {
node_clone.submit_price();
thread::sleep(Duration::from_millis(1000));
}
})
})
.collect();
// Spawn network propagation thread
let nodes_clone = self.nodes.clone();
let partitioned_clone = Arc::clone(&self.partitioned);
let start_clone = start.clone();
let propagation_handle = thread::spawn(move || {
while start_clone.elapsed() < duration {
let is_partitioned = *partitioned_clone.lock().unwrap();
// Propagate between nodes
for i in 0..nodes_clone.len() {
for j in 0..nodes_clone.len() {
if i != j {
// Skip if partitioned
if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) {
continue;
}
let crdt1 = nodes_clone[i].crdt.lock().unwrap();
let mut crdt2 = nodes_clone[j].crdt.lock().unwrap();
crdt2.merge(&*crdt1);
}
}
}
thread::sleep(Duration::from_millis(100));
}
});
// Main monitoring loop
let mut last_partition = Instant::now();
while start.elapsed() < duration {
thread::sleep(Duration::from_secs(2));
// Print current state
self.print_network_state();
// Simulate network partition every 10 seconds
if last_partition.elapsed() > Duration::from_secs(10) {
let mut partitioned = self.partitioned.lock().unwrap();
*partitioned = !*partitioned;
if *partitioned {
println!(
"\n{}",
"⚠️ NETWORK PARTITION ACTIVE - Nodes split into two groups"
.yellow()
.bold()
);
} else {
println!(
"\n{}",
"✅ NETWORK PARTITION HEALED - All nodes can communicate"
.green()
.bold()
);
}
last_partition = Instant::now();
}
}
// Wait for threads
for handle in handles {
handle.join().unwrap();
}
propagation_handle.join().unwrap();
// Print final statistics
self.print_final_stats();
}
fn print_network_state(&self) {
println!("\n{}", "📈 Current Network State:".white().bold());
println!("{}", "------------------------".white());
// Get price from each node's perspective
let mut prices = Vec::new();
for node in &self.nodes {
let crdt = node.crdt.lock().unwrap();
if let Some((price, confidence, sources)) =
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60)
{
prices.push((node.id.0.clone(), price, confidence, sources));
let price_str = format!("${:.2}", price);
let confidence_str = format!("{}%", confidence);
let line = if node.is_byzantine {
format!(
" {} sees: {} (confidence: {}, sources: {})",
node.id.0.red(),
price_str.red(),
confidence_str.red(),
sources
)
} else {
format!(
" {} sees: {} (confidence: {}, sources: {})",
node.id.0.green(),
price_str.green(),
confidence_str.green(),
sources
)
};
println!("{}", line);
}
}
// Calculate network consensus
if !prices.is_empty() {
let avg_price: f64 =
prices.iter().map(|(_, p, _, _)| *p).sum::<f64>() / prices.len() as f64;
let min_price = prices
.iter()
.map(|(_, p, _, _)| *p)
.fold(f64::INFINITY, |a, b| a.min(b));
let max_price = prices
.iter()
.map(|(_, p, _, _)| *p)
.fold(f64::NEG_INFINITY, |a, b| a.max(b));
let deviation = ((max_price - min_price) / avg_price) * 100.0;
println!("\n{}", "📊 Network Consensus:".cyan().bold());
println!(" Average: {}", format!("${:.2}", avg_price).cyan());
println!(
" Range: {} - {}",
format!("${:.2}", min_price).cyan(),
format!("${:.2}", max_price).cyan()
);
println!(" Max Deviation: {}", format!("{:.2}%", deviation).cyan());
}
}
fn print_final_stats(&self) {
println!("\n\n{}", "🏁 Final Statistics".yellow().bold());
println!("{}", "===================".yellow());
let mut total_attestations = 0;
let mut oracle_stats = Vec::new();
for node in &self.nodes {
let crdt = node.crdt.lock().unwrap();
let node_attestations = crdt.attestations.len();
total_attestations += node_attestations;
let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5);
oracle_stats.push((
node.id.0.clone(),
node_attestations,
*score,
node.is_byzantine,
));
}
println!("\n{}", "📈 Oracle Performance:".white().bold());
for (id, attestations, score, is_byzantine) in oracle_stats {
let icon = if is_byzantine { "🔴" } else { "🟢" };
let line = format!(
" {} {} - Attestations: {}, Reputation: {:.2}",
icon, id, attestations, score
);
if is_byzantine {
println!("{}", line.red());
} else {
println!("{}", line.green());
}
}
println!("\n{}", "📊 Network Totals:".cyan().bold());
println!(" Total Attestations: {}", total_attestations);
println!(
" Attestations/second: {:.2}",
total_attestations as f64 / 30.0
);
// Show that Byzantine nodes were filtered out
if let Some(node) = self.nodes.first() {
let crdt = node.crdt.lock().unwrap();
if let Some((price, confidence, _)) =
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300)
{
println!("\n{}", "✅ Final Aggregated Price:".green().bold());
println!(
" {} (confidence: {}%)",
format!("${:.2}", price).green().bold(),
confidence
);
println!(" {}", "Despite Byzantine manipulation attempts!".green());
}
}
}
}
// ============ Helper Functions ============
fn timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}
// ============ Main Function ============
fn main() {
println!("{}", "BFT-CRDT Oracle Network Demo".cyan().bold());
println!("{}", "============================\n".cyan());
let simulator = NetworkSimulator::new();
let simulator = network::Simulator::new();
simulator.run(Duration::from_secs(30));
println!("\n{}", "✅ Demo completed!".green().bold());

View File

@@ -0,0 +1,253 @@
use std::{
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
use colored::Colorize;
use crate::{oracle, AssetPair};
pub(crate) struct Simulator {
nodes: Vec<Arc<oracle::Node>>,
partitioned: Arc<Mutex<bool>>,
}
impl Simulator {
pub(crate) fn new() -> Self {
let mut nodes = Vec::new();
// Create 5 honest nodes
for i in 1..=5 {
nodes.push(Arc::new(oracle::Node::new(format!("honest_{}", i), false)));
}
// Create 2 Byzantine nodes
for i in 1..=2 {
nodes.push(Arc::new(oracle::Node::new(
format!("byzantine_{}", i),
true,
)));
}
Self {
nodes,
partitioned: Arc::new(Mutex::new(false)),
}
}
pub(crate) fn run(&self, duration: Duration) {
println!(
"{}",
"🚀 Starting BFT-CRDT Oracle Network Demo".cyan().bold()
);
println!("{}", "=========================================".cyan());
println!("📊 Network: {} nodes ({} Byzantine)", self.nodes.len(), 2);
println!("⏱️ Duration: {:?}\n", duration);
let start = Instant::now();
// Spawn oracle threads
let handles: Vec<_> = self
.nodes
.iter()
.map(|node| {
let node_clone = Arc::clone(node);
let start_clone = start.clone();
thread::spawn(move || {
while start_clone.elapsed() < duration {
node_clone.submit_price();
thread::sleep(Duration::from_millis(1000));
}
})
})
.collect();
// Spawn network propagation thread
let nodes_clone = self.nodes.clone();
let partitioned_clone = Arc::clone(&self.partitioned);
let start_clone = start.clone();
let propagation_handle = thread::spawn(move || {
while start_clone.elapsed() < duration {
let is_partitioned = *partitioned_clone.lock().unwrap();
// Propagate between nodes
for i in 0..nodes_clone.len() {
for j in 0..nodes_clone.len() {
if i != j {
// Skip if partitioned
if is_partitioned && ((i < 3 && j >= 3) || (i >= 3 && j < 3)) {
continue;
}
let crdt1 = nodes_clone[i].crdt.lock().unwrap();
let mut crdt2 = nodes_clone[j].crdt.lock().unwrap();
crdt2.merge(&*crdt1);
}
}
}
thread::sleep(Duration::from_millis(100));
}
});
// Main monitoring loop
let mut last_partition = Instant::now();
while start.elapsed() < duration {
thread::sleep(Duration::from_secs(2));
// Print current state
self.print_network_state();
// Simulate network partition every 10 seconds
if last_partition.elapsed() > Duration::from_secs(10) {
let mut partitioned = self.partitioned.lock().unwrap();
*partitioned = !*partitioned;
if *partitioned {
println!(
"\n{}",
"⚠️ NETWORK PARTITION ACTIVE - Nodes split into two groups"
.yellow()
.bold()
);
} else {
println!(
"\n{}",
"✅ NETWORK PARTITION HEALED - All nodes can communicate"
.green()
.bold()
);
}
last_partition = Instant::now();
}
}
// Wait for threads
for handle in handles {
handle.join().unwrap();
}
propagation_handle.join().unwrap();
// Print final statistics
self.print_final_stats();
}
fn print_network_state(&self) {
println!("\n{}", "📈 Current Network State:".white().bold());
println!("{}", "------------------------".white());
// Get price from each node's perspective
let mut prices = Vec::new();
for node in &self.nodes {
let crdt = node.crdt.lock().unwrap();
if let Some((price, confidence, sources)) =
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 60)
{
prices.push((node.id.0.clone(), price, confidence, sources));
let price_str = format!("${:.2}", price);
let confidence_str = format!("{}%", confidence);
let line = if node.is_byzantine {
format!(
" {} sees: {} (confidence: {}, sources: {})",
node.id.0.red(),
price_str.red(),
confidence_str.red(),
sources
)
} else {
format!(
" {} sees: {} (confidence: {}, sources: {})",
node.id.0.green(),
price_str.green(),
confidence_str.green(),
sources
)
};
println!("{}", line);
}
}
// Calculate network consensus
if !prices.is_empty() {
let avg_price: f64 =
prices.iter().map(|(_, p, _, _)| *p).sum::<f64>() / prices.len() as f64;
let min_price = prices
.iter()
.map(|(_, p, _, _)| *p)
.fold(f64::INFINITY, |a, b| a.min(b));
let max_price = prices
.iter()
.map(|(_, p, _, _)| *p)
.fold(f64::NEG_INFINITY, |a, b| a.max(b));
let deviation = ((max_price - min_price) / avg_price) * 100.0;
println!("\n{}", "📊 Network Consensus:".cyan().bold());
println!(" Average: {}", format!("${:.2}", avg_price).cyan());
println!(
" Range: {} - {}",
format!("${:.2}", min_price).cyan(),
format!("${:.2}", max_price).cyan()
);
println!(" Max Deviation: {}", format!("{:.2}%", deviation).cyan());
}
}
fn print_final_stats(&self) {
println!("\n\n{}", "🏁 Final Statistics".yellow().bold());
println!("{}", "===================".yellow());
let mut total_attestations = 0;
let mut oracle_stats = Vec::new();
for node in &self.nodes {
let crdt = node.crdt.lock().unwrap();
let node_attestations = crdt.attestations.len();
total_attestations += node_attestations;
let score = crdt.oracle_scores.get(&node.id).unwrap_or(&0.5);
oracle_stats.push((
node.id.0.clone(),
node_attestations,
*score,
node.is_byzantine,
));
}
println!("\n{}", "📈 Oracle Performance:".white().bold());
for (id, attestations, score, is_byzantine) in oracle_stats {
let icon = if is_byzantine { "🔴" } else { "🟢" };
let line = format!(
" {} {} - Attestations: {}, Reputation: {:.2}",
icon, id, attestations, score
);
if is_byzantine {
println!("{}", line.red());
} else {
println!("{}", line.green());
}
}
println!("\n{}", "📊 Network Totals:".cyan().bold());
println!(" Total Attestations: {}", total_attestations);
println!(
" Attestations/second: {:.2}",
total_attestations as f64 / 30.0
);
// Show that Byzantine nodes were filtered out
if let Some(node) = self.nodes.first() {
let crdt = node.crdt.lock().unwrap();
if let Some((price, confidence, _)) =
crdt.get_aggregate_price(&AssetPair("ETH/USD".to_string()), 300)
{
println!("\n{}", "✅ Final Aggregated Price:".green().bold());
println!(
" {} (confidence: {}%)",
format!("${:.2}", price).green().bold(),
confidence
);
println!(" {}", "Despite Byzantine manipulation attempts!".green());
}
}
}
}

View File

@@ -0,0 +1,5 @@
mod network_crdt;
mod node;
pub(crate) use network_crdt::OracleNetworkCRDT;
pub(crate) use node::OracleNode as Node;

View File

@@ -0,0 +1,94 @@
use std::collections::HashMap;
use crate::{utils, AssetPair, OracleId, PriceAttestation};
#[derive(Clone)]
pub(crate) struct OracleNetworkCRDT {
pub(crate) attestations: HashMap<String, PriceAttestation>,
pub(crate) oracle_scores: HashMap<OracleId, f64>,
}
impl OracleNetworkCRDT {
pub(crate) fn new() -> Self {
Self {
attestations: HashMap::new(),
oracle_scores: HashMap::new(),
}
}
pub(crate) fn submit_attestation(&mut self, attestation: PriceAttestation) {
self.attestations
.insert(attestation.id.clone(), attestation.clone());
// Update oracle reputation
let score = self
.oracle_scores
.entry(attestation.oracle_id.clone())
.or_insert(0.5);
*score = (*score * 0.95) + 0.05;
}
pub(crate) fn merge(&mut self, other: &Self) {
for (id, attestation) in &other.attestations {
if !self.attestations.contains_key(id) {
self.attestations.insert(id.clone(), attestation.clone());
}
}
for (oracle_id, score) in &other.oracle_scores {
self.oracle_scores.insert(oracle_id.clone(), *score);
}
}
pub(crate) fn get_aggregate_price(
&self,
asset_pair: &AssetPair,
max_age: u64,
) -> Option<(f64, u8, usize)> {
let now = utils::timestamp();
let min_time = now.saturating_sub(max_age);
let mut prices = Vec::new();
for attestation in self.attestations.values() {
if attestation.asset_pair == *asset_pair && attestation.timestamp >= min_time {
let weight = self
.oracle_scores
.get(&attestation.oracle_id)
.unwrap_or(&0.5);
prices.push((attestation.price, attestation.confidence, *weight));
}
}
if prices.is_empty() {
return None;
}
// Remove outliers
prices.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap());
if prices.len() > 4 {
let q1 = prices[prices.len() / 4].0;
let q3 = prices[3 * prices.len() / 4].0;
let iqr = q3 - q1;
let lower = q1 - iqr * 1.5;
let upper = q3 + iqr * 1.5;
prices.retain(|(price, _, _)| *price >= lower && *price <= upper);
}
// Calculate weighted average
let mut total_weight = 0.0;
let mut weighted_sum = 0.0;
let mut confidence_sum = 0.0;
for (price, confidence, weight) in &prices {
let w = (*confidence as f64 / 100.0) * weight;
weighted_sum += price * w;
confidence_sum += *confidence as f64 * w;
total_weight += w;
}
let avg_price = weighted_sum / total_weight;
let avg_confidence = (confidence_sum / total_weight) as u8;
Some((avg_price, avg_confidence, prices.len()))
}
}

View File

@@ -0,0 +1,45 @@
use std::sync::{Arc, Mutex};
use rand::Rng;
use crate::{utils, AssetPair, OracleId, PriceAttestation};
pub(crate) struct OracleNode {
pub(crate) id: OracleId,
pub(crate) crdt: Arc<Mutex<super::OracleNetworkCRDT>>,
pub(crate) is_byzantine: bool,
pub(crate) base_price: f64,
}
impl OracleNode {
pub(crate) fn new(id: String, is_byzantine: bool) -> Self {
Self {
id: OracleId(id),
crdt: Arc::new(Mutex::new(super::OracleNetworkCRDT::new())),
is_byzantine,
base_price: 2500.0,
}
}
pub(crate) fn submit_price(&self) {
let mut rng = rand::rng();
let price = if self.is_byzantine {
self.base_price * 1.2 // Try to manipulate 20% higher
} else {
self.base_price * (1.0 + rng.random_range(-0.01..0.01))
};
let attestation = PriceAttestation {
id: format!("{}_{}", self.id.0, utils::timestamp()),
oracle_id: self.id.clone(),
asset_pair: AssetPair("ETH/USD".to_string()),
price,
confidence: if self.is_byzantine { 50 } else { 95 },
timestamp: utils::timestamp(),
};
let mut crdt = self.crdt.lock().unwrap();
crdt.submit_attestation(attestation);
}
}

View File

@@ -0,0 +1,8 @@
use std::time::{SystemTime, UNIX_EPOCH};
pub(crate) fn timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}