Files
bft-crdt-experiment/examples/oracle_network.rs

803 lines
25 KiB
Rust

use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
/// A comprehensive example of a decentralized oracle network using BFT-CRDTs
/// that eliminates the need for consensus on price updates.
// ==== Core Types ====
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct OracleId(pub String);
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct AssetPair(pub String);
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct AttestationId(pub String);
/// A price attestation from an oracle
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PriceAttestation {
pub id: AttestationId,
pub oracle_id: OracleId,
pub asset_pair: AssetPair,
pub price: u128,
pub confidence: u8, // 0-100
pub timestamp: u64,
pub sources: Vec<DataSource>,
pub proof: AttestationProof,
pub signature: Vec<u8>,
}
/// Data source information
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DataSource {
pub name: String,
pub price: u128,
pub volume: u128,
pub timestamp: u64,
}
/// Proof that the price data is authentic
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AttestationProof {
/// TLS proof from HTTPS API
TlsProof {
server_cert: Vec<u8>,
response_hash: Vec<u8>,
timestamp: u64,
},
/// Signed data from WebSocket feed
SignedFeed {
exchange_signature: Vec<u8>,
sequence_number: u64,
},
/// On-chain proof (e.g., from DEX)
OnChainProof {
chain_id: String,
block_number: u64,
transaction_hash: Vec<u8>,
},
}
/// Oracle reputation and performance metrics
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OracleMetrics {
pub oracle_id: OracleId,
pub total_attestations: u64,
pub average_deviation: f64,
pub uptime_percentage: f64,
pub last_submission: u64,
pub quality_score: f64,
}
/// The main CRDT structure for the oracle network
#[derive(Debug, Clone)]
pub struct OracleNetworkCRDT {
/// All price attestations
attestations: HashMap<AttestationId, PriceAttestation>,
/// Index by asset pair and time for efficient queries
price_index: BTreeMap<(AssetPair, u64), Vec<AttestationId>>,
/// Oracle performance metrics
oracle_metrics: HashMap<OracleId, OracleMetrics>,
/// Detected anomalies and disputes
anomalies: HashMap<AttestationId, AnomalyReport>,
/// Network parameters
params: NetworkParams,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkParams {
pub min_oracle_stake: u128,
pub max_price_age: Duration,
pub outlier_threshold: f64,
pub min_sources: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnomalyReport {
pub attestation_id: AttestationId,
pub report_type: AnomalyType,
pub reporter: OracleId,
pub evidence: String,
pub timestamp: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AnomalyType {
OutlierPrice { deviation_percentage: f64 },
InvalidProof,
StaleData { age_seconds: u64 },
SuspiciousPattern,
}
impl OracleNetworkCRDT {
pub fn new(params: NetworkParams) -> Self {
Self {
attestations: HashMap::new(),
price_index: BTreeMap::new(),
oracle_metrics: HashMap::new(),
anomalies: HashMap::new(),
params,
}
}
/// Submit a new price attestation to the network
pub fn submit_attestation(
&mut self,
attestation: PriceAttestation,
) -> Result<(), String> {
// Validate attestation
self.validate_attestation(&attestation)?;
// Check for duplicate
if self.attestations.contains_key(&attestation.id) {
return Ok(()); // Idempotent
}
// Add to main storage
let id = attestation.id.clone();
let asset_pair = attestation.asset_pair.clone();
let timestamp = attestation.timestamp;
let oracle_id = attestation.oracle_id.clone();
self.attestations.insert(id.clone(), attestation);
// Update index
self.price_index
.entry((asset_pair, timestamp))
.or_insert_with(Vec::new)
.push(id);
// Update oracle metrics
self.update_oracle_metrics(&oracle_id);
Ok(())
}
/// Validate an attestation before accepting it
fn validate_attestation(&self, attestation: &PriceAttestation) -> Result<(), String> {
// Check timestamp is reasonable
let now = Self::timestamp();
if attestation.timestamp > now + 60 {
return Err("Attestation timestamp is in the future".to_string());
}
if attestation.timestamp < now - self.params.max_price_age.as_secs() {
return Err("Attestation is too old".to_string());
}
// Verify minimum sources
if attestation.sources.len() < self.params.min_sources {
return Err("Insufficient data sources".to_string());
}
// Verify signature (placeholder - real implementation would verify cryptographically)
if attestation.signature.is_empty() {
return Err("Missing signature".to_string());
}
// Validate proof
match &attestation.proof {
AttestationProof::TlsProof { timestamp, .. } => {
if *timestamp < attestation.timestamp - 300 {
return Err("TLS proof too old".to_string());
}
}
AttestationProof::SignedFeed { .. } => {
// Verify exchange signature in real implementation
}
AttestationProof::OnChainProof { .. } => {
// Verify on-chain data in real implementation
}
}
Ok(())
}
/// Get aggregated price for an asset pair within a time window
pub fn get_aggregate_price(
&self,
asset_pair: &AssetPair,
time_window: Duration,
) -> Option<AggregatedPrice> {
let now = Self::timestamp();
let start_time = now.saturating_sub(time_window.as_secs());
// Collect all attestations in time window
let mut attestations = Vec::new();
for ((pair, timestamp), attestation_ids) in self.price_index.range(
(asset_pair.clone(), start_time)..=(asset_pair.clone(), now)
) {
if pair == asset_pair {
for id in attestation_ids {
if let Some(attestation) = self.attestations.get(id) {
attestations.push(attestation);
}
}
}
}
if attestations.is_empty() {
return None;
}
// Calculate aggregated price
self.aggregate_prices(attestations, now)
}
/// Aggregate multiple price attestations into a single price
fn aggregate_prices(
&self,
attestations: Vec<&PriceAttestation>,
current_time: u64,
) -> Option<AggregatedPrice> {
let mut weighted_prices = Vec::new();
for attestation in &attestations {
// Calculate weight based on:
// 1. Oracle quality score
// 2. Attestation confidence
// 3. Recency
let oracle_score = self.oracle_metrics
.get(&attestation.oracle_id)
.map(|m| m.quality_score)
.unwrap_or(0.5);
let confidence_weight = attestation.confidence as f64 / 100.0;
let age = current_time.saturating_sub(attestation.timestamp);
let recency_weight = 1.0 / (1.0 + (age as f64 / 300.0)); // 5-minute half-life
let total_weight = oracle_score * confidence_weight * recency_weight;
weighted_prices.push((attestation.price, total_weight));
}
// Remove outliers
let filtered_prices = self.remove_outliers(weighted_prices);
if filtered_prices.is_empty() {
return None;
}
// Calculate weighted average
let total_weight: f64 = filtered_prices.iter().map(|(_, w)| w).sum();
let weighted_sum: f64 = filtered_prices
.iter()
.map(|(price, weight)| *price as f64 * weight)
.sum();
let average_price = (weighted_sum / total_weight) as u128;
// Calculate confidence metrics
let prices: Vec<u128> = filtered_prices.iter().map(|(p, _)| *p).collect();
let std_dev = self.calculate_std_dev(&prices, average_price);
let confidence = self.calculate_confidence(&prices, std_dev, average_price);
Some(AggregatedPrice {
price: average_price,
confidence,
num_sources: filtered_prices.len(),
std_deviation: std_dev,
timestamp: current_time,
})
}
/// Remove statistical outliers from price data
fn remove_outliers(&self, mut prices: Vec<(u128, f64)>) -> Vec<(u128, f64)> {
if prices.len() < 3 {
return prices; // Not enough data to detect outliers
}
// Sort by price
prices.sort_by_key(|(price, _)| *price);
// Calculate IQR (Interquartile Range)
let q1_idx = prices.len() / 4;
let q3_idx = 3 * prices.len() / 4;
let q1 = prices[q1_idx].0;
let q3 = prices[q3_idx].0;
let iqr = q3.saturating_sub(q1);
// Filter out outliers (prices outside 1.5 * IQR)
let lower_bound = q1.saturating_sub(iqr * 3 / 2);
let upper_bound = q3.saturating_add(iqr * 3 / 2);
prices
.into_iter()
.filter(|(price, _)| *price >= lower_bound && *price <= upper_bound)
.collect()
}
/// Calculate standard deviation
fn calculate_std_dev(&self, prices: &[u128], mean: u128) -> u128 {
if prices.is_empty() {
return 0;
}
let variance: f64 = prices
.iter()
.map(|price| {
let diff = if *price > mean {
(*price - mean) as f64
} else {
(mean - *price) as f64
};
diff * diff
})
.sum::<f64>() / prices.len() as f64;
variance.sqrt() as u128
}
/// Calculate confidence score based on data quality
fn calculate_confidence(&self, prices: &[u128], std_dev: u128, mean: u128) -> u8 {
// Base confidence on:
// 1. Number of sources
// 2. Standard deviation relative to mean
// 3. Agreement between sources
let num_sources_score = (prices.len().min(10) * 10) as u8;
let deviation_ratio = if mean > 0 {
(std_dev as f64) / (mean as f64)
} else {
1.0
};
let deviation_score = if deviation_ratio < 0.01 {
100
} else if deviation_ratio < 0.05 {
80
} else if deviation_ratio < 0.10 {
60
} else {
40
};
(num_sources_score.min(100) + deviation_score) / 2
}
/// Update oracle performance metrics
fn update_oracle_metrics(&mut self, oracle_id: &OracleId) {
let attestations: Vec<_> = self.attestations
.values()
.filter(|a| &a.oracle_id == oracle_id)
.collect();
if attestations.is_empty() {
return;
}
let metrics = self.oracle_metrics
.entry(oracle_id.clone())
.or_insert(OracleMetrics {
oracle_id: oracle_id.clone(),
total_attestations: 0,
average_deviation: 0.0,
uptime_percentage: 100.0,
last_submission: 0,
quality_score: 0.5,
});
metrics.total_attestations = attestations.len() as u64;
metrics.last_submission = attestations
.iter()
.map(|a| a.timestamp)
.max()
.unwrap_or(0);
// Calculate quality score based on historical performance
// (simplified - real implementation would track accuracy over time)
metrics.quality_score = 0.5 + (metrics.total_attestations.min(100) as f64 / 200.0);
}
/// Report an anomaly in an attestation
pub fn report_anomaly(
&mut self,
attestation_id: AttestationId,
reporter: OracleId,
anomaly_type: AnomalyType,
evidence: String,
) -> Result<(), String> {
if !self.attestations.contains_key(&attestation_id) {
return Err("Attestation not found".to_string());
}
let report = AnomalyReport {
attestation_id: attestation_id.clone(),
report_type: anomaly_type,
reporter,
evidence,
timestamp: Self::timestamp(),
};
self.anomalies.insert(attestation_id, report);
Ok(())
}
/// Get oracle reputation score
pub fn get_oracle_reputation(&self, oracle_id: &OracleId) -> Option<OracleReputation> {
let metrics = self.oracle_metrics.get(oracle_id)?;
// Count anomalies reported against this oracle
let anomaly_count = self.anomalies
.values()
.filter(|report| {
self.attestations
.get(&report.attestation_id)
.map(|a| &a.oracle_id == oracle_id)
.unwrap_or(false)
})
.count();
Some(OracleReputation {
oracle_id: oracle_id.clone(),
quality_score: metrics.quality_score,
total_attestations: metrics.total_attestations,
anomaly_reports: anomaly_count as u64,
last_active: metrics.last_submission,
})
}
/// Merge another CRDT instance
pub fn merge(&mut self, other: &Self) {
// Merge attestations
for (id, attestation) in &other.attestations {
if !self.attestations.contains_key(id) {
let _ = self.submit_attestation(attestation.clone());
}
}
// Merge anomaly reports
for (id, report) in &other.anomalies {
self.anomalies.entry(id.clone()).or_insert(report.clone());
}
// Recalculate metrics after merge
for oracle_id in other.oracle_metrics.keys() {
self.update_oracle_metrics(oracle_id);
}
}
fn timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}
}
/// Aggregated price result
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggregatedPrice {
pub price: u128,
pub confidence: u8,
pub num_sources: usize,
pub std_deviation: u128,
pub timestamp: u64,
}
/// Oracle reputation information
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OracleReputation {
pub oracle_id: OracleId,
pub quality_score: f64,
pub total_attestations: u64,
pub anomaly_reports: u64,
pub last_active: u64,
}
/// Example oracle client implementation
pub struct OracleClient {
oracle_id: OracleId,
network: OracleNetworkCRDT,
data_sources: Vec<Box<dyn DataSourceClient>>,
}
trait DataSourceClient {
fn fetch_price(&self, asset_pair: &AssetPair) -> Result<DataSource, String>;
}
impl OracleClient {
pub async fn submit_price(&mut self, asset_pair: AssetPair) -> Result<(), String> {
// Fetch prices from multiple sources
let mut sources = Vec::new();
for client in &self.data_sources {
match client.fetch_price(&asset_pair) {
Ok(source) => sources.push(source),
Err(_) => continue, // Skip failed sources
}
}
if sources.len() < self.network.params.min_sources {
return Err("Insufficient data sources available".to_string());
}
// Calculate aggregate price from sources
let total_volume: u128 = sources.iter().map(|s| s.volume).sum();
let weighted_sum: u128 = sources
.iter()
.map(|s| s.price * s.volume)
.sum();
let price = weighted_sum / total_volume;
// Calculate confidence based on source agreement
let prices: Vec<u128> = sources.iter().map(|s| s.price).collect();
let std_dev = self.network.calculate_std_dev(&prices, price);
let confidence = if std_dev < price / 100 { 95 } else { 80 };
// Create attestation
let attestation = PriceAttestation {
id: AttestationId(format!("{}_{}", self.oracle_id.0, Self::timestamp())),
oracle_id: self.oracle_id.clone(),
asset_pair,
price,
confidence,
timestamp: Self::timestamp(),
sources,
proof: AttestationProof::SignedFeed {
exchange_signature: vec![1, 2, 3], // Placeholder
sequence_number: Self::timestamp(),
},
signature: vec![4, 5, 6], // Placeholder
};
// Submit to network
self.network.submit_attestation(attestation)
}
fn timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_oracle_network() {
let params = NetworkParams {
min_oracle_stake: 1000,
max_price_age: Duration::from_secs(3600),
outlier_threshold: 0.1,
min_sources: 2,
};
let mut network = OracleNetworkCRDT::new(params);
// Create test oracles
let oracle1 = OracleId("oracle1".to_string());
let oracle2 = OracleId("oracle2".to_string());
let oracle3 = OracleId("oracle3".to_string());
let eth_usd = AssetPair("ETH/USD".to_string());
// Submit prices from multiple oracles
let attestation1 = PriceAttestation {
id: AttestationId("att1".to_string()),
oracle_id: oracle1.clone(),
asset_pair: eth_usd.clone(),
price: 2500_000_000, // $2500 with 6 decimals
confidence: 95,
timestamp: OracleNetworkCRDT::timestamp(),
sources: vec![
DataSource {
name: "Binance".to_string(),
price: 2501_000_000,
volume: 1000_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
DataSource {
name: "Coinbase".to_string(),
price: 2499_000_000,
volume: 800_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
],
proof: AttestationProof::SignedFeed {
exchange_signature: vec![1, 2, 3],
sequence_number: 1,
},
signature: vec![4, 5, 6],
};
network.submit_attestation(attestation1).unwrap();
// Submit from oracle 2
let attestation2 = PriceAttestation {
id: AttestationId("att2".to_string()),
oracle_id: oracle2.clone(),
asset_pair: eth_usd.clone(),
price: 2502_000_000,
confidence: 90,
timestamp: OracleNetworkCRDT::timestamp(),
sources: vec![
DataSource {
name: "Kraken".to_string(),
price: 2502_000_000,
volume: 500_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
DataSource {
name: "Gemini".to_string(),
price: 2502_000_000,
volume: 300_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
],
proof: AttestationProof::SignedFeed {
exchange_signature: vec![7, 8, 9],
sequence_number: 2,
},
signature: vec![10, 11, 12],
};
network.submit_attestation(attestation2).unwrap();
// Submit outlier price from oracle 3
let attestation3 = PriceAttestation {
id: AttestationId("att3".to_string()),
oracle_id: oracle3.clone(),
asset_pair: eth_usd.clone(),
price: 3000_000_000, // Outlier price
confidence: 50,
timestamp: OracleNetworkCRDT::timestamp(),
sources: vec![
DataSource {
name: "Unknown".to_string(),
price: 3000_000_000,
volume: 100_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
DataSource {
name: "Sketchy".to_string(),
price: 3000_000_000,
volume: 50_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
],
proof: AttestationProof::SignedFeed {
exchange_signature: vec![13, 14, 15],
sequence_number: 3,
},
signature: vec![16, 17, 18],
};
network.submit_attestation(attestation3).unwrap();
// Get aggregated price
let aggregated = network
.get_aggregate_price(&eth_usd, Duration::from_secs(300))
.unwrap();
// Should filter out the outlier
assert!(aggregated.price > 2490_000_000 && aggregated.price < 2510_000_000);
assert!(aggregated.confidence > 80);
assert_eq!(aggregated.num_sources, 2); // Outlier filtered out
// Report anomaly
network
.report_anomaly(
AttestationId("att3".to_string()),
oracle1.clone(),
AnomalyType::OutlierPrice {
deviation_percentage: 20.0,
},
"Price deviates significantly from market".to_string(),
)
.unwrap();
// Check oracle reputation
let reputation = network.get_oracle_reputation(&oracle3).unwrap();
assert_eq!(reputation.anomaly_reports, 1);
}
#[test]
fn test_crdt_merge() {
let params = NetworkParams {
min_oracle_stake: 1000,
max_price_age: Duration::from_secs(3600),
outlier_threshold: 0.1,
min_sources: 2,
};
let mut network1 = OracleNetworkCRDT::new(params.clone());
let mut network2 = OracleNetworkCRDT::new(params);
let oracle1 = OracleId("oracle1".to_string());
let btc_usd = AssetPair("BTC/USD".to_string());
// Submit to network1
let attestation1 = PriceAttestation {
id: AttestationId("att1".to_string()),
oracle_id: oracle1.clone(),
asset_pair: btc_usd.clone(),
price: 45000_000_000,
confidence: 95,
timestamp: OracleNetworkCRDT::timestamp(),
sources: vec![
DataSource {
name: "Binance".to_string(),
price: 45000_000_000,
volume: 2000_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
DataSource {
name: "Coinbase".to_string(),
price: 45000_000_000,
volume: 1500_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
],
proof: AttestationProof::OnChainProof {
chain_id: "ethereum".to_string(),
block_number: 18000000,
transaction_hash: vec![1, 2, 3],
},
signature: vec![4, 5, 6],
};
network1.submit_attestation(attestation1).unwrap();
// Submit different attestation to network2
let attestation2 = PriceAttestation {
id: AttestationId("att2".to_string()),
oracle_id: oracle1.clone(),
asset_pair: btc_usd.clone(),
price: 45100_000_000,
confidence: 90,
timestamp: OracleNetworkCRDT::timestamp() + 1,
sources: vec![
DataSource {
name: "Kraken".to_string(),
price: 45100_000_000,
volume: 1000_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
DataSource {
name: "Gemini".to_string(),
price: 45100_000_000,
volume: 800_000_000,
timestamp: OracleNetworkCRDT::timestamp(),
},
],
proof: AttestationProof::SignedFeed {
exchange_signature: vec![7, 8, 9],
sequence_number: 100,
},
signature: vec![10, 11, 12],
};
network2.submit_attestation(attestation2).unwrap();
// Merge networks
network1.merge(&network2);
network2.merge(&network1);
// Both should have same data
assert_eq!(network1.attestations.len(), 2);
assert_eq!(network2.attestations.len(), 2);
// Both should calculate same aggregate price
let price1 = network1
.get_aggregate_price(&btc_usd, Duration::from_secs(300))
.unwrap();
let price2 = network2
.get_aggregate_price(&btc_usd, Duration::from_secs(300))
.unwrap();
assert_eq!(price1.price, price2.price);
}
}