311 lines
10 KiB
Rust
311 lines
10 KiB
Rust
use serde::{Deserialize, Serialize};
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::time::{SystemTime, UNIX_EPOCH};
|
|
|
|
/// Represents a blockchain identifier
|
|
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
|
|
pub struct ChainId(pub String);
|
|
|
|
/// A message being relayed between chains
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct CrossChainMessage {
|
|
/// Unique identifier for this message
|
|
pub id: String,
|
|
/// Source blockchain
|
|
pub source_chain: ChainId,
|
|
/// Destination blockchain
|
|
pub dest_chain: ChainId,
|
|
/// Block height on source chain when message was created
|
|
pub source_block: u64,
|
|
/// Nonce for ordering messages from same block
|
|
pub nonce: u64,
|
|
/// The actual message payload
|
|
pub payload: Vec<u8>,
|
|
/// Timestamp when message was created
|
|
pub timestamp: u64,
|
|
/// Signatures from source chain validators
|
|
pub validator_signatures: Vec<ValidatorSignature>,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct ValidatorSignature {
|
|
pub validator_id: String,
|
|
pub signature: Vec<u8>,
|
|
}
|
|
|
|
/// CRDT structure for accumulating cross-chain messages
|
|
#[derive(Debug, Clone)]
|
|
pub struct CrossChainRelayCRDT {
|
|
/// All messages seen by this node
|
|
messages: HashMap<String, CrossChainMessage>,
|
|
/// Track which messages have been delivered to which chains
|
|
deliveries: HashMap<ChainId, HashSet<String>>,
|
|
/// Byzantine fault detection - track conflicting messages
|
|
conflicts: HashMap<String, Vec<CrossChainMessage>>,
|
|
}
|
|
|
|
impl CrossChainRelayCRDT {
|
|
pub fn new() -> Self {
|
|
Self {
|
|
messages: HashMap::new(),
|
|
deliveries: HashMap::new(),
|
|
conflicts: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
/// Add a new message to the CRDT
|
|
pub fn add_message(&mut self, message: CrossChainMessage) -> Result<(), String> {
|
|
// Verify the message has enough validator signatures
|
|
if !self.verify_signatures(&message) {
|
|
return Err("Insufficient valid signatures".to_string());
|
|
}
|
|
|
|
// Check for Byzantine behavior - same ID but different content
|
|
if let Some(existing) = self.messages.get(&message.id) {
|
|
if !self.messages_equal(existing, &message) {
|
|
self.conflicts
|
|
.entry(message.id.clone())
|
|
.or_insert_with(Vec::new)
|
|
.push(message.clone());
|
|
return Err("Conflicting message detected".to_string());
|
|
}
|
|
}
|
|
|
|
// Add the message
|
|
self.messages.insert(message.id.clone(), message);
|
|
Ok(())
|
|
}
|
|
|
|
/// Merge another CRDT instance into this one
|
|
pub fn merge(&mut self, other: &CrossChainRelayCRDT) {
|
|
// Merge messages
|
|
for (id, message) in &other.messages {
|
|
if !self.messages.contains_key(id) {
|
|
let _ = self.add_message(message.clone());
|
|
}
|
|
}
|
|
|
|
// Merge delivery tracking
|
|
for (chain, delivered) in &other.deliveries {
|
|
self.deliveries
|
|
.entry(chain.clone())
|
|
.or_insert_with(HashSet::new)
|
|
.extend(delivered.clone());
|
|
}
|
|
|
|
// Merge conflict tracking
|
|
for (id, conflicts) in &other.conflicts {
|
|
self.conflicts
|
|
.entry(id.clone())
|
|
.or_insert_with(Vec::new)
|
|
.extend(conflicts.clone());
|
|
}
|
|
}
|
|
|
|
/// Get all messages destined for a specific chain that haven't been delivered yet
|
|
pub fn get_pending_messages(&self, dest_chain: &ChainId) -> Vec<&CrossChainMessage> {
|
|
let delivered = self.deliveries.get(dest_chain).cloned().unwrap_or_default();
|
|
|
|
let mut pending: Vec<&CrossChainMessage> = self
|
|
.messages
|
|
.values()
|
|
.filter(|msg| msg.dest_chain == *dest_chain && !delivered.contains(&msg.id))
|
|
.collect();
|
|
|
|
// Sort by source block height, then nonce for deterministic ordering
|
|
pending.sort_by(|a, b| {
|
|
a.source_block
|
|
.cmp(&b.source_block)
|
|
.then(a.nonce.cmp(&b.nonce))
|
|
});
|
|
|
|
pending
|
|
}
|
|
|
|
/// Mark messages as delivered to a chain
|
|
pub fn mark_delivered(&mut self, chain: &ChainId, message_ids: Vec<String>) {
|
|
let delivered = self
|
|
.deliveries
|
|
.entry(chain.clone())
|
|
.or_insert_with(HashSet::new);
|
|
delivered.extend(message_ids);
|
|
}
|
|
|
|
/// Get messages within a time window (for oracle-like use cases)
|
|
pub fn get_messages_in_window(
|
|
&self,
|
|
chain: &ChainId,
|
|
start_time: u64,
|
|
end_time: u64,
|
|
) -> Vec<&CrossChainMessage> {
|
|
self.messages
|
|
.values()
|
|
.filter(|msg| {
|
|
msg.dest_chain == *chain && msg.timestamp >= start_time && msg.timestamp <= end_time
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Verify validator signatures (simplified - real implementation would check against validator set)
|
|
fn verify_signatures(&self, message: &CrossChainMessage) -> bool {
|
|
// In real implementation:
|
|
// 1. Get validator set for source chain at source block height
|
|
// 2. Verify each signature
|
|
// 3. Check if we have enough stake represented
|
|
|
|
// For now, just check we have at least 2 signatures
|
|
message.validator_signatures.len() >= 2
|
|
}
|
|
|
|
fn messages_equal(&self, a: &CrossChainMessage, b: &CrossChainMessage) -> bool {
|
|
a.source_chain == b.source_chain
|
|
&& a.dest_chain == b.dest_chain
|
|
&& a.source_block == b.source_block
|
|
&& a.nonce == b.nonce
|
|
&& a.payload == b.payload
|
|
}
|
|
}
|
|
|
|
/// Example: Oracle price aggregation use case
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct PriceOracleMessage {
|
|
pub oracle_id: String,
|
|
pub asset_pair: String,
|
|
pub price: u128,
|
|
pub confidence: u8,
|
|
}
|
|
|
|
impl CrossChainRelayCRDT {
|
|
/// Aggregate oracle prices from messages in a time window
|
|
pub fn aggregate_oracle_prices(
|
|
&self,
|
|
dest_chain: &ChainId,
|
|
asset_pair: &str,
|
|
time_window: u64,
|
|
) -> Option<u128> {
|
|
let now = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_secs();
|
|
let messages = self.get_messages_in_window(dest_chain, now - time_window, now);
|
|
|
|
let mut prices = Vec::new();
|
|
|
|
for msg in messages {
|
|
if let Ok(oracle_msg) = serde_json::from_slice::<PriceOracleMessage>(&msg.payload) {
|
|
if oracle_msg.asset_pair == asset_pair {
|
|
prices.push(oracle_msg.price);
|
|
}
|
|
}
|
|
}
|
|
|
|
if prices.is_empty() {
|
|
return None;
|
|
}
|
|
|
|
// Calculate median price
|
|
prices.sort();
|
|
Some(prices[prices.len() / 2])
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_cross_chain_relay() {
|
|
let mut relay1 = CrossChainRelayCRDT::new();
|
|
let mut relay2 = CrossChainRelayCRDT::new();
|
|
|
|
let ethereum = ChainId("ethereum".to_string());
|
|
let polygon = ChainId("polygon".to_string());
|
|
|
|
// Create a message from Ethereum to Polygon
|
|
let message = CrossChainMessage {
|
|
id: "msg1".to_string(),
|
|
source_chain: ethereum.clone(),
|
|
dest_chain: polygon.clone(),
|
|
source_block: 1000,
|
|
nonce: 1,
|
|
payload: b"transfer:100:USDC:0x123...".to_vec(),
|
|
timestamp: 1234567890,
|
|
validator_signatures: vec![
|
|
ValidatorSignature {
|
|
validator_id: "val1".to_string(),
|
|
signature: vec![1, 2, 3],
|
|
},
|
|
ValidatorSignature {
|
|
validator_id: "val2".to_string(),
|
|
signature: vec![4, 5, 6],
|
|
},
|
|
],
|
|
};
|
|
|
|
// Add to first relay
|
|
relay1.add_message(message.clone()).unwrap();
|
|
|
|
// Get pending messages for Polygon
|
|
let pending = relay1.get_pending_messages(&polygon);
|
|
assert_eq!(pending.len(), 1);
|
|
|
|
// Merge relay1 into relay2
|
|
relay2.merge(&relay1);
|
|
|
|
// Both should now have the same message
|
|
assert_eq!(relay2.get_pending_messages(&polygon).len(), 1);
|
|
|
|
// Mark as delivered
|
|
relay2.mark_delivered(&polygon, vec!["msg1".to_string()]);
|
|
assert_eq!(relay2.get_pending_messages(&polygon).len(), 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_oracle_aggregation() {
|
|
let mut relay = CrossChainRelayCRDT::new();
|
|
let ethereum = ChainId("ethereum".to_string());
|
|
let now = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_secs();
|
|
|
|
// Add price submissions from multiple oracles
|
|
for i in 0..5 {
|
|
let price_msg = PriceOracleMessage {
|
|
oracle_id: format!("oracle{}", i),
|
|
asset_pair: "ETH/USD".to_string(),
|
|
price: 2000 + (i as u128 * 10), // Prices: 2000, 2010, 2020, 2030, 2040
|
|
confidence: 95,
|
|
};
|
|
|
|
let message = CrossChainMessage {
|
|
id: format!("price{}", i),
|
|
source_chain: ChainId(format!("oracle{}", i)),
|
|
dest_chain: ethereum.clone(),
|
|
source_block: 1000,
|
|
nonce: i as u64,
|
|
payload: serde_json::to_vec(&price_msg).unwrap(),
|
|
timestamp: now - 30, // 30 seconds ago
|
|
validator_signatures: vec![
|
|
ValidatorSignature {
|
|
validator_id: "val1".to_string(),
|
|
signature: vec![1, 2, 3],
|
|
},
|
|
ValidatorSignature {
|
|
validator_id: "val2".to_string(),
|
|
signature: vec![4, 5, 6],
|
|
},
|
|
],
|
|
};
|
|
|
|
relay.add_message(message).unwrap();
|
|
}
|
|
|
|
// Aggregate prices from last minute
|
|
let median_price = relay
|
|
.aggregate_oracle_prices(ðereum, "ETH/USD", 60)
|
|
.unwrap();
|
|
assert_eq!(median_price, 2020); // Median of 2000, 2010, 2020, 2030, 2040
|
|
}
|
|
}
|