Files
bft-crdt-experiment/examples/oracle_simulation.rs

560 lines
19 KiB
Rust
Raw Normal View History

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
// Import our oracle network types
use crate::oracle_network::*;
/// A practical simulation showing how the BFT-CRDT oracle network operates
/// This demonstrates:
/// 1. Multiple oracles submitting prices independently
/// 2. Network partitions and reunification
/// 3. Byzantine oracle behavior
/// 4. Real-time price aggregation
/// 5. Performance under various conditions
pub struct OracleSimulation {
/// Multiple oracle nodes in the network
oracle_nodes: HashMap<String, Arc<Mutex<OracleNode>>>,
/// Network conditions for simulation
network_conditions: NetworkConditions,
/// Statistics collector
stats: SimulationStats,
}
pub struct OracleNode {
pub id: OracleId,
pub crdt: OracleNetworkCRDT,
pub data_sources: Vec<MockDataSource>,
pub is_byzantine: bool,
pub partition_group: Option<u8>,
}
pub struct NetworkConditions {
pub latency_ms: u64,
pub packet_loss_rate: f64,
pub partition_active: bool,
pub partition_groups: Vec<Vec<String>>,
}
pub struct SimulationStats {
pub total_attestations: u64,
pub successful_aggregations: u64,
pub byzantine_attempts: u64,
pub network_merges: u64,
pub average_price_deviation: f64,
}
pub struct MockDataSource {
pub name: String,
pub base_price: u128,
pub volatility: f64,
pub reliability: f64,
}
impl OracleSimulation {
pub fn new() -> Self {
let params = NetworkParams {
min_oracle_stake: 1000,
max_price_age: Duration::from_secs(300),
outlier_threshold: 0.15,
min_sources: 2,
};
let mut oracle_nodes = HashMap::new();
// Create honest oracles
for i in 1..=5 {
let oracle_id = OracleId(format!("oracle_{}", i));
let node = OracleNode {
id: oracle_id.clone(),
crdt: OracleNetworkCRDT::new(params.clone()),
data_sources: Self::create_data_sources(),
is_byzantine: false,
partition_group: None,
};
oracle_nodes.insert(format!("oracle_{}", i), Arc::new(Mutex::new(node)));
}
// Create Byzantine oracles
for i in 6..=7 {
let oracle_id = OracleId(format!("byzantine_{}", i));
let node = OracleNode {
id: oracle_id.clone(),
crdt: OracleNetworkCRDT::new(params.clone()),
data_sources: Self::create_data_sources(),
is_byzantine: true,
partition_group: None,
};
oracle_nodes.insert(format!("byzantine_{}", i), Arc::new(Mutex::new(node)));
}
Self {
oracle_nodes,
network_conditions: NetworkConditions {
latency_ms: 50,
packet_loss_rate: 0.01,
partition_active: false,
partition_groups: vec![],
},
stats: SimulationStats {
total_attestations: 0,
successful_aggregations: 0,
byzantine_attempts: 0,
network_merges: 0,
average_price_deviation: 0.0,
},
}
}
fn create_data_sources() -> Vec<MockDataSource> {
vec![
MockDataSource {
name: "Binance".to_string(),
base_price: 2500_000_000,
volatility: 0.02,
reliability: 0.99,
},
MockDataSource {
name: "Coinbase".to_string(),
base_price: 2501_000_000,
volatility: 0.02,
reliability: 0.98,
},
MockDataSource {
name: "Kraken".to_string(),
base_price: 2499_000_000,
volatility: 0.025,
reliability: 0.97,
},
]
}
/// Run the simulation for a specified duration
pub fn run(&mut self, duration: Duration) {
println!("Starting Oracle Network Simulation");
println!("==================================");
println!("Nodes: {} ({} Byzantine)", self.oracle_nodes.len(), 2);
println!("Duration: {:?}", duration);
println!();
let start_time = Instant::now();
let mut last_stats_print = Instant::now();
// Spawn oracle threads
let handles: Vec<_> = self
.oracle_nodes
.iter()
.map(|(name, node)| {
let node_clone = Arc::clone(node);
let name_clone = name.clone();
let duration_clone = duration;
thread::spawn(move || {
Self::oracle_thread(name_clone, node_clone, duration_clone);
})
})
.collect();
// Main simulation loop
while start_time.elapsed() < duration {
thread::sleep(Duration::from_millis(100));
// Simulate network propagation
self.propagate_attestations();
// Simulate network partition if active
if self.network_conditions.partition_active {
self.simulate_partition();
}
// Print statistics every 5 seconds
if last_stats_print.elapsed() > Duration::from_secs(5) {
self.print_current_state();
last_stats_print = Instant::now();
}
// Randomly introduce network events
if rand::random::<f64>() < 0.1 {
self.introduce_network_event();
}
}
// Wait for oracle threads to complete
for handle in handles {
handle.join().unwrap();
}
self.print_final_statistics();
}
/// Oracle thread that submits prices periodically
fn oracle_thread(name: String, node: Arc<Mutex<OracleNode>>, duration: Duration) {
let start = Instant::now();
let mut last_submission = Instant::now();
while start.elapsed() < duration {
if last_submission.elapsed() > Duration::from_secs(1) {
let mut node_guard = node.lock().unwrap();
// Fetch prices from data sources
let mut sources = Vec::new();
for data_source in &node_guard.data_sources {
if rand::random::<f64>() < data_source.reliability {
let price = if node_guard.is_byzantine {
// Byzantine oracles submit manipulated prices
Self::generate_byzantine_price(data_source.base_price)
} else {
Self::generate_realistic_price(
data_source.base_price,
data_source.volatility,
)
};
sources.push(DataSource {
name: data_source.name.clone(),
price,
volume: (rand::random::<f64>() * 1000_000_000.0) as u128,
timestamp: Self::timestamp(),
});
}
}
if sources.len() >= 2 {
// Create attestation
let attestation = PriceAttestation {
id: AttestationId(format!("{}_{}", name, Self::timestamp())),
oracle_id: node_guard.id.clone(),
asset_pair: AssetPair("ETH/USD".to_string()),
price: Self::calculate_weighted_price(&sources),
confidence: if node_guard.is_byzantine {
50
} else {
90 + (rand::random::<f64>() * 10.0) as u8
},
timestamp: Self::timestamp(),
sources,
proof: AttestationProof::SignedFeed {
exchange_signature: vec![1, 2, 3],
sequence_number: Self::timestamp(),
},
signature: vec![4, 5, 6],
};
if let Err(e) = node_guard.crdt.submit_attestation(attestation) {
eprintln!("Failed to submit attestation from {}: {}", name, e);
}
}
last_submission = Instant::now();
}
thread::sleep(Duration::from_millis(100));
}
}
/// Propagate attestations between nodes based on network conditions
fn propagate_attestations(&mut self) {
let nodes_snapshot: Vec<_> = self.oracle_nodes.keys().cloned().collect();
for i in 0..nodes_snapshot.len() {
for j in (i + 1)..nodes_snapshot.len() {
let node1_name = &nodes_snapshot[i];
let node2_name = &nodes_snapshot[j];
// Skip if nodes are in different partition groups
if self.network_conditions.partition_active {
if !self.can_communicate(node1_name, node2_name) {
continue;
}
}
// Simulate packet loss
if rand::random::<f64>() < self.network_conditions.packet_loss_rate {
continue;
}
// Get nodes
let node1 = Arc::clone(&self.oracle_nodes[node1_name]);
let node2 = Arc::clone(&self.oracle_nodes[node2_name]);
// Merge CRDTs with simulated latency
thread::spawn(move || {
thread::sleep(Duration::from_millis(50)); // Simulated network latency
let mut node1_guard = node1.lock().unwrap();
let mut node2_guard = node2.lock().unwrap();
// Bidirectional merge
node1_guard.crdt.merge(&node2_guard.crdt);
node2_guard.crdt.merge(&node1_guard.crdt);
});
self.stats.network_merges += 1;
}
}
}
/// Check if two nodes can communicate (for partition simulation)
fn can_communicate(&self, node1: &str, node2: &str) -> bool {
if !self.network_conditions.partition_active {
return true;
}
for group in &self.network_conditions.partition_groups {
let node1_in_group = group.contains(&node1.to_string());
let node2_in_group = group.contains(&node2.to_string());
if node1_in_group && node2_in_group {
return true;
}
}
false
}
/// Introduce random network events
fn introduce_network_event(&mut self) {
let event = rand::random::<f64>();
if event < 0.3 {
// Increase latency
self.network_conditions.latency_ms = 200;
println!("Network event: High latency (200ms)");
} else if event < 0.5 {
// Network partition
self.network_conditions.partition_active = true;
self.network_conditions.partition_groups = vec![
vec![
"oracle_1".to_string(),
"oracle_2".to_string(),
"oracle_3".to_string(),
],
vec![
"oracle_4".to_string(),
"oracle_5".to_string(),
"byzantine_6".to_string(),
"byzantine_7".to_string(),
],
];
println!("Network event: Partition active");
} else if event < 0.7 {
// Heal partition
if self.network_conditions.partition_active {
self.network_conditions.partition_active = false;
println!("Network event: Partition healed");
}
} else {
// Restore normal conditions
self.network_conditions.latency_ms = 50;
self.network_conditions.packet_loss_rate = 0.01;
println!("Network event: Normal conditions restored");
}
}
/// Simulate network partition effects
fn simulate_partition(&mut self) {
// Partitions are handled in propagate_attestations
// This method could add additional partition-specific logic
}
/// Print current state of the network
fn print_current_state(&self) {
println!("\n--- Current Network State ---");
// Get aggregate price from first available node
let mut aggregate_price = None;
for (name, node) in &self.oracle_nodes {
let node_guard = node.lock().unwrap();
if let Some(price) = node_guard
.crdt
.get_aggregate_price(&AssetPair("ETH/USD".to_string()), Duration::from_secs(60))
{
aggregate_price = Some(price);
break;
}
}
if let Some(price) = aggregate_price {
println!(
"Aggregate ETH/USD Price: ${:.2} (confidence: {}%)",
price.price as f64 / 1_000_000.0,
price.confidence
);
println!(
"Sources: {}, Std Dev: ${:.2}",
price.num_sources,
price.std_deviation as f64 / 1_000_000.0
);
}
// Show individual node states
println!("\nNode Attestation Counts:");
for (name, node) in &self.oracle_nodes {
let node_guard = node.lock().unwrap();
let count = node_guard.crdt.attestations.len();
println!(" {}: {} attestations", name, count);
}
// Network conditions
println!("\nNetwork Conditions:");
println!(" Latency: {}ms", self.network_conditions.latency_ms);
println!(
" Packet Loss: {:.1}%",
self.network_conditions.packet_loss_rate * 100.0
);
println!(
" Partition Active: {}",
self.network_conditions.partition_active
);
}
/// Print final simulation statistics
fn print_final_statistics(&self) {
println!("\n\n=== Final Simulation Statistics ===");
// Count total attestations across all nodes
let mut total_attestations = 0;
let mut price_samples = Vec::new();
for (_, node) in &self.oracle_nodes {
let node_guard = node.lock().unwrap();
total_attestations += node_guard.crdt.attestations.len();
// Collect price samples
if let Some(price) = node_guard
.crdt
.get_aggregate_price(&AssetPair("ETH/USD".to_string()), Duration::from_secs(300))
{
price_samples.push(price.price);
}
}
println!("Total Attestations: {}", total_attestations);
println!("Network Merges: {}", self.stats.network_merges);
// Calculate price consistency
if !price_samples.is_empty() {
let avg_price: u128 = price_samples.iter().sum::<u128>() / price_samples.len() as u128;
let max_deviation = price_samples
.iter()
.map(|p| {
if *p > avg_price {
((*p - avg_price) as f64 / avg_price as f64) * 100.0
} else {
((avg_price - *p) as f64 / avg_price as f64) * 100.0
}
})
.fold(0.0, f64::max);
println!("\nPrice Consistency:");
println!(" Average Price: ${:.2}", avg_price as f64 / 1_000_000.0);
println!(" Max Deviation: {:.2}%", max_deviation);
println!(
" Nodes in Agreement: {}/{}",
price_samples.len(),
self.oracle_nodes.len()
);
}
// Performance metrics
let attestation_rate = total_attestations as f64 / 300.0; // Assuming 5 minute simulation
println!("\nPerformance:");
println!(" Attestation Rate: {:.1} per second", attestation_rate);
println!(
" Merge Rate: {:.1} per second",
self.stats.network_merges as f64 / 300.0
);
}
/// Generate realistic price with volatility
fn generate_realistic_price(base_price: u128, volatility: f64) -> u128 {
let change = (rand::random::<f64>() - 0.5) * 2.0 * volatility;
let multiplier = 1.0 + change;
(base_price as f64 * multiplier) as u128
}
/// Generate manipulated price for Byzantine oracles
fn generate_byzantine_price(base_price: u128) -> u128 {
let manipulation = rand::random::<f64>();
if manipulation < 0.3 {
// Try subtle manipulation
(base_price as f64 * 1.05) as u128
} else if manipulation < 0.6 {
// Try larger manipulation
(base_price as f64 * 1.20) as u128
} else {
// Sometimes submit normal price to avoid detection
base_price
}
}
/// Calculate weighted average price from sources
fn calculate_weighted_price(sources: &[DataSource]) -> u128 {
let total_volume: u128 = sources.iter().map(|s| s.volume).sum();
let weighted_sum: u128 = sources.iter().map(|s| s.price * s.volume).sum();
weighted_sum / total_volume
}
fn timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}
}
/// Example usage
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_oracle_simulation() {
let mut simulation = OracleSimulation::new();
// Run for 30 seconds
simulation.run(Duration::from_secs(30));
// Verify network reached consistency
assert!(simulation.stats.network_merges > 0);
}
#[test]
fn test_byzantine_resistance() {
let mut simulation = OracleSimulation::new();
// Run simulation
simulation.run(Duration::from_secs(10));
// Check that Byzantine oracles didn't corrupt the network
// The aggregate price should still be reasonable despite Byzantine attempts
for (_, node) in &simulation.oracle_nodes {
let node_guard = node.lock().unwrap();
if let Some(price) = node_guard
.crdt
.get_aggregate_price(&AssetPair("ETH/USD".to_string()), Duration::from_secs(60))
{
// Price should be within reasonable bounds despite Byzantine oracles
assert!(price.price > 2000_000_000 && price.price < 3000_000_000);
}
}
}
}
// Mock rand module for simulation
mod rand {
pub fn random<T>() -> T
where
T: From<f64>,
{
// Simple pseudo-random for simulation
let time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
T::from((time % 1000) as f64 / 1000.0)
}
}