Files
bft-crdt-experiment/examples/cross_chain_relay.rs

311 lines
10 KiB
Rust
Raw Normal View History

use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::time::{SystemTime, UNIX_EPOCH};
/// Represents a blockchain identifier
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct ChainId(pub String);
/// A message being relayed between chains
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrossChainMessage {
/// Unique identifier for this message
pub id: String,
/// Source blockchain
pub source_chain: ChainId,
/// Destination blockchain
pub dest_chain: ChainId,
/// Block height on source chain when message was created
pub source_block: u64,
/// Nonce for ordering messages from same block
pub nonce: u64,
/// The actual message payload
pub payload: Vec<u8>,
/// Timestamp when message was created
pub timestamp: u64,
/// Signatures from source chain validators
pub validator_signatures: Vec<ValidatorSignature>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidatorSignature {
pub validator_id: String,
pub signature: Vec<u8>,
}
/// CRDT structure for accumulating cross-chain messages
#[derive(Debug, Clone)]
pub struct CrossChainRelayCRDT {
/// All messages seen by this node
messages: HashMap<String, CrossChainMessage>,
/// Track which messages have been delivered to which chains
deliveries: HashMap<ChainId, HashSet<String>>,
/// Byzantine fault detection - track conflicting messages
conflicts: HashMap<String, Vec<CrossChainMessage>>,
}
impl CrossChainRelayCRDT {
pub fn new() -> Self {
Self {
messages: HashMap::new(),
deliveries: HashMap::new(),
conflicts: HashMap::new(),
}
}
/// Add a new message to the CRDT
pub fn add_message(&mut self, message: CrossChainMessage) -> Result<(), String> {
// Verify the message has enough validator signatures
if !self.verify_signatures(&message) {
return Err("Insufficient valid signatures".to_string());
}
// Check for Byzantine behavior - same ID but different content
if let Some(existing) = self.messages.get(&message.id) {
if !self.messages_equal(existing, &message) {
self.conflicts
.entry(message.id.clone())
.or_insert_with(Vec::new)
.push(message.clone());
return Err("Conflicting message detected".to_string());
}
}
// Add the message
self.messages.insert(message.id.clone(), message);
Ok(())
}
/// Merge another CRDT instance into this one
pub fn merge(&mut self, other: &CrossChainRelayCRDT) {
// Merge messages
for (id, message) in &other.messages {
if !self.messages.contains_key(id) {
let _ = self.add_message(message.clone());
}
}
// Merge delivery tracking
for (chain, delivered) in &other.deliveries {
self.deliveries
.entry(chain.clone())
.or_insert_with(HashSet::new)
.extend(delivered.clone());
}
// Merge conflict tracking
for (id, conflicts) in &other.conflicts {
self.conflicts
.entry(id.clone())
.or_insert_with(Vec::new)
.extend(conflicts.clone());
}
}
/// Get all messages destined for a specific chain that haven't been delivered yet
pub fn get_pending_messages(&self, dest_chain: &ChainId) -> Vec<&CrossChainMessage> {
let delivered = self.deliveries.get(dest_chain).cloned().unwrap_or_default();
let mut pending: Vec<&CrossChainMessage> = self
.messages
.values()
.filter(|msg| msg.dest_chain == *dest_chain && !delivered.contains(&msg.id))
.collect();
// Sort by source block height, then nonce for deterministic ordering
pending.sort_by(|a, b| {
a.source_block
.cmp(&b.source_block)
.then(a.nonce.cmp(&b.nonce))
});
pending
}
/// Mark messages as delivered to a chain
pub fn mark_delivered(&mut self, chain: &ChainId, message_ids: Vec<String>) {
let delivered = self
.deliveries
.entry(chain.clone())
.or_insert_with(HashSet::new);
delivered.extend(message_ids);
}
/// Get messages within a time window (for oracle-like use cases)
pub fn get_messages_in_window(
&self,
chain: &ChainId,
start_time: u64,
end_time: u64,
) -> Vec<&CrossChainMessage> {
self.messages
.values()
.filter(|msg| {
msg.dest_chain == *chain && msg.timestamp >= start_time && msg.timestamp <= end_time
})
.collect()
}
/// Verify validator signatures (simplified - real implementation would check against validator set)
fn verify_signatures(&self, message: &CrossChainMessage) -> bool {
// In real implementation:
// 1. Get validator set for source chain at source block height
// 2. Verify each signature
// 3. Check if we have enough stake represented
// For now, just check we have at least 2 signatures
message.validator_signatures.len() >= 2
}
fn messages_equal(&self, a: &CrossChainMessage, b: &CrossChainMessage) -> bool {
a.source_chain == b.source_chain
&& a.dest_chain == b.dest_chain
&& a.source_block == b.source_block
&& a.nonce == b.nonce
&& a.payload == b.payload
}
}
/// Example: Oracle price aggregation use case
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PriceOracleMessage {
pub oracle_id: String,
pub asset_pair: String,
pub price: u128,
pub confidence: u8,
}
impl CrossChainRelayCRDT {
/// Aggregate oracle prices from messages in a time window
pub fn aggregate_oracle_prices(
&self,
dest_chain: &ChainId,
asset_pair: &str,
time_window: u64,
) -> Option<u128> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let messages = self.get_messages_in_window(dest_chain, now - time_window, now);
let mut prices = Vec::new();
for msg in messages {
if let Ok(oracle_msg) = serde_json::from_slice::<PriceOracleMessage>(&msg.payload) {
if oracle_msg.asset_pair == asset_pair {
prices.push(oracle_msg.price);
}
}
}
if prices.is_empty() {
return None;
}
// Calculate median price
prices.sort();
Some(prices[prices.len() / 2])
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cross_chain_relay() {
let mut relay1 = CrossChainRelayCRDT::new();
let mut relay2 = CrossChainRelayCRDT::new();
let ethereum = ChainId("ethereum".to_string());
let polygon = ChainId("polygon".to_string());
// Create a message from Ethereum to Polygon
let message = CrossChainMessage {
id: "msg1".to_string(),
source_chain: ethereum.clone(),
dest_chain: polygon.clone(),
source_block: 1000,
nonce: 1,
payload: b"transfer:100:USDC:0x123...".to_vec(),
timestamp: 1234567890,
validator_signatures: vec![
ValidatorSignature {
validator_id: "val1".to_string(),
signature: vec![1, 2, 3],
},
ValidatorSignature {
validator_id: "val2".to_string(),
signature: vec![4, 5, 6],
},
],
};
// Add to first relay
relay1.add_message(message.clone()).unwrap();
// Get pending messages for Polygon
let pending = relay1.get_pending_messages(&polygon);
assert_eq!(pending.len(), 1);
// Merge relay1 into relay2
relay2.merge(&relay1);
// Both should now have the same message
assert_eq!(relay2.get_pending_messages(&polygon).len(), 1);
// Mark as delivered
relay2.mark_delivered(&polygon, vec!["msg1".to_string()]);
assert_eq!(relay2.get_pending_messages(&polygon).len(), 0);
}
#[test]
fn test_oracle_aggregation() {
let mut relay = CrossChainRelayCRDT::new();
let ethereum = ChainId("ethereum".to_string());
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
// Add price submissions from multiple oracles
for i in 0..5 {
let price_msg = PriceOracleMessage {
oracle_id: format!("oracle{}", i),
asset_pair: "ETH/USD".to_string(),
price: 2000 + (i as u128 * 10), // Prices: 2000, 2010, 2020, 2030, 2040
confidence: 95,
};
let message = CrossChainMessage {
id: format!("price{}", i),
source_chain: ChainId(format!("oracle{}", i)),
dest_chain: ethereum.clone(),
source_block: 1000,
nonce: i as u64,
payload: serde_json::to_vec(&price_msg).unwrap(),
timestamp: now - 30, // 30 seconds ago
validator_signatures: vec![
ValidatorSignature {
validator_id: "val1".to_string(),
signature: vec![1, 2, 3],
},
ValidatorSignature {
validator_id: "val2".to_string(),
signature: vec![4, 5, 6],
},
],
};
relay.add_message(message).unwrap();
}
// Aggregate prices from last minute
let median_price = relay
.aggregate_oracle_prices(&ethereum, "ETH/USD", 60)
.unwrap();
assert_eq!(median_price, 2020); // Median of 2000, 2010, 2020, 2030, 2040
}
}