use super::*;
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc, time::SystemTime};
#[derive(Clone, Default, Debug)]
pub struct ReplConfigData {
pub lamport_clock: Nonce,
pub last_clock: Nonce,
pub nodes: HashMap<String, String>,
}
#[derive(Clone)]
pub struct ReplInfo {
pub state: Arc<Mutex<HashMap<String, ReplConfigData>>>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ConsistencyModel {
Eventual,
Strong(ConsensusModel),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ConsensusModel {
All,
MinPeers(u64),
}
#[derive(Clone, Debug)]
pub enum ReplNetworkConfig {
Custom {
queue_length: u64,
expiry_time: Option<Seconds>,
sync_wait_time: Seconds,
consistency_model: ConsistencyModel,
data_aging_period: Seconds,
},
Default,
}
#[derive(Clone, Debug)]
pub struct ReplBufferData {
pub data: StringVector,
pub lamport_clock: Nonce,
pub outgoing_timestamp: Seconds,
pub incoming_timestamp: Seconds,
pub message_id: String,
pub sender: PeerId,
pub confirmations: Option<Nonce>,
}
impl Ord for ReplBufferData {
fn cmp(&self, other: &Self) -> Ordering {
self.lamport_clock
.cmp(&other.lamport_clock) .then_with(|| self.message_id.cmp(&other.message_id)) }
}
impl PartialOrd for ReplBufferData {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for ReplBufferData {}
impl PartialEq for ReplBufferData {
fn eq(&self, other: &Self) -> bool {
self.lamport_clock == other.lamport_clock && self.message_id == other.message_id
}
}
pub(crate) struct ReplicaBufferQueue {
config: ReplNetworkConfig,
temporary_queue: Mutex<BTreeMap<String, BTreeMap<String, ReplBufferData>>>,
queue: Mutex<BTreeMap<String, BTreeSet<ReplBufferData>>>,
}
impl ReplicaBufferQueue {
const MAX_CAPACITY: u64 = 150;
const EXPIRY_TIME: Seconds = 60;
const SYNC_WAIT_TIME: Seconds = 5;
const DATA_AGING_PERIOD: Seconds = 5;
pub fn new(config: ReplNetworkConfig) -> Self {
Self {
config,
temporary_queue: Mutex::new(Default::default()),
queue: Mutex::new(Default::default()),
}
}
pub fn consistency_model(&self) -> ConsistencyModel {
match self.config {
ReplNetworkConfig::Default => ConsistencyModel::Eventual,
ReplNetworkConfig::Custom {
consistency_model, ..
} => consistency_model,
}
}
pub async fn init(&self, repl_network: String) {
let mut queue = self.queue.lock().await;
queue.insert(repl_network.clone(), Default::default());
if self.consistency_model() != ConsistencyModel::Eventual {
let mut queue = self.temporary_queue.lock().await;
queue.insert(repl_network.clone(), Default::default());
}
}
pub async fn push(&self, mut core: Core, replica_network: String, data: ReplBufferData) {
match self.config {
ReplNetworkConfig::Default => {
let mut queue = self.queue.lock().await;
let queue = queue.entry(replica_network).or_default();
while queue.len() as u64 >= Self::MAX_CAPACITY {
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut expired_items = Vec::new();
for entry in queue.iter() {
if current_time.saturating_sub(entry.outgoing_timestamp)
>= Self::EXPIRY_TIME
{
expired_items.push(entry.clone());
}
}
for expired in expired_items {
queue.remove(&expired);
}
if queue.len() as u64 >= Self::MAX_CAPACITY {
if let Some(first) = queue.iter().next().cloned() {
queue.remove(&first);
}
}
}
queue.insert(data);
},
ReplNetworkConfig::Custom {
queue_length,
expiry_time,
consistency_model,
..
} => {
match consistency_model {
ConsistencyModel::Eventual => {
let mut queue = self.queue.lock().await;
let queue = queue.entry(replica_network).or_default();
while queue.len() as u64 >= queue_length {
if let Some(expiry_time) = expiry_time {
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let mut expired_items = Vec::new();
for entry in queue.iter() {
if current_time.saturating_sub(entry.outgoing_timestamp)
>= expiry_time
{
expired_items.push(entry.clone());
}
}
for expired in expired_items {
queue.remove(&expired);
}
}
if queue.len() as u64 >= queue_length {
if let Some(first) = queue.iter().next().cloned() {
queue.remove(&first);
}
}
}
queue.insert(data);
},
ConsistencyModel::Strong(consensus_model) => {
let mut temp_queue = self.temporary_queue.lock().await;
let temp_queue = temp_queue.entry(replica_network.clone()).or_default();
if temp_queue.len() as u64 >= Self::MAX_CAPACITY {
if let Some(first_key) = temp_queue.keys().next().cloned() {
temp_queue.remove(&first_key);
}
}
let replica_peers = core.replica_peers(&replica_network).await.len();
if replica_peers == 1 || consensus_model == ConsensusModel::MinPeers(1) {
let mut queue = self.queue.lock().await;
let entry = queue.entry(replica_network.clone()).or_default();
entry.insert(data);
} else {
let message_id = data.message_id.clone();
temp_queue.insert(data.message_id.clone(), data);
let message = vec![
Core::STRONG_CONSISTENCY_FLAG.as_bytes().to_vec(), replica_network.clone().into(), message_id.as_bytes().into(), ];
let gossip_request = AppData::GossipsubBroadcastMessage {
topic: replica_network.into(),
message,
};
let _ = core.query_network(gossip_request).await;
}
},
}
},
}
}
pub async fn pop_front(&self, core: Core, replica_network: &str) -> Option<ReplBufferData> {
let first_data = {
let mut queue = self.queue.lock().await;
if let Some(queue) = queue.get_mut(replica_network) {
if let Some(first) = queue.iter().next().cloned() {
queue.remove(&first);
Some(first)
} else {
None
}
} else {
None
}
};
let first = first_data?;
{
let mut cfg = core.network_info.replication.state.lock().await;
let entry = cfg
.entry(replica_network.to_owned())
.or_insert_with(|| ReplConfigData {
lamport_clock: 0,
last_clock: first.lamport_clock,
nodes: Default::default(),
});
entry.last_clock = first.lamport_clock;
}
Some(first)
}
pub async fn handle_data_confirmation(
&self,
mut core: Core,
replica_network: String,
message_id: String,
) {
let peers_count = match self.config {
ReplNetworkConfig::Custom {
consistency_model, ..
} => match consistency_model {
ConsistencyModel::Eventual => 0,
ConsistencyModel::Strong(consensus_model) => match consensus_model {
ConsensusModel::All => {
core.replica_peers(&replica_network).await.len() as u64
},
ConsensusModel::MinPeers(required_peers) => required_peers,
},
},
ReplNetworkConfig::Default => 0,
};
let is_fully_confirmed = {
let mut flag = false;
let mut temporary_queue = self.temporary_queue.lock().await;
if let Some(temp_queue) = temporary_queue.get_mut(&replica_network) {
if let Some(data_entry) = temp_queue.get_mut(&message_id) {
if data_entry.confirmations.unwrap() < peers_count {
data_entry.confirmations = Some(data_entry.confirmations.unwrap_or(1) + 1);
}
flag = peers_count != 0 && data_entry.confirmations == Some(peers_count);
}
}
flag
};
if is_fully_confirmed {
let mut public_queue = self.queue.lock().await;
let public_queue = public_queue
.entry(replica_network.clone())
.or_insert_with(BTreeSet::new);
if let ReplNetworkConfig::Custom {
queue_length,
expiry_time,
..
} = self.config
{
if public_queue.len() as u64 >= queue_length {
let current_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
if let Some(expiry_time) = expiry_time {
public_queue.retain(|entry| {
current_time.saturating_sub(entry.outgoing_timestamp) < expiry_time
});
}
}
while public_queue.len() as u64 >= queue_length {
if let Some(first) = public_queue.iter().next().cloned() {
public_queue.remove(&first);
}
}
}
let mut temporary_queue = self.temporary_queue.lock().await;
if let Some(temp_queue) = temporary_queue.get_mut(&replica_network) {
if let Some(data_entry) = temp_queue.remove(&message_id) {
public_queue.insert(data_entry);
}
}
}
}
pub async fn sync_with_eventual_consistency(&self, core: Core, repl_network: String) {
loop {
let repl_network = repl_network.clone();
let mut core = core.clone();
let data_aging_time = match self.config {
ReplNetworkConfig::Default => Self::DATA_AGING_PERIOD,
ReplNetworkConfig::Custom {
data_aging_period, ..
} => data_aging_period,
};
let local_data_state = {
let queue = self.queue.lock().await;
queue.get(&repl_network).cloned()
};
if let Some(local_data_state) = local_data_state {
let local_data = local_data_state
.iter()
.filter(|&d| {
util::get_unix_timestamp().saturating_sub(d.incoming_timestamp)
> data_aging_time
})
.cloned()
.collect::<BTreeSet<_>>();
let (min_clock, max_clock) =
if let (Some(first), Some(last)) = (local_data.first(), local_data.last()) {
(first.lamport_clock, last.lamport_clock)
} else {
(0, 0)
};
let mut message_ids = local_data
.iter()
.map(|data| {
let id = data.message_id.clone()
+ Core::ENTRY_DELIMITER
+ &data.sender.to_string();
id.into()
})
.collect::<ByteVector>();
let mut message = vec![
Core::EVENTUAL_CONSISTENCY_FLAG.as_bytes().to_vec(),
core.peer_id().to_string().into(),
repl_network.clone().into(),
min_clock.to_string().into(),
max_clock.to_string().into(),
];
message.append(&mut message_ids);
let gossip_request = AppData::GossipsubBroadcastMessage {
topic: repl_network.into(),
message,
};
let _ = core.query_network(gossip_request).await;
}
#[cfg(feature = "tokio-runtime")]
tokio::time::sleep(Duration::from_secs(Self::SYNC_WAIT_TIME)).await;
#[cfg(feature = "async-std-runtime")]
async_std::task::sleep(Duration::from_secs(Self::SYNC_WAIT_TIME)).await;
}
}
pub async fn sync_buffer_image(
&self,
mut core: Core,
repl_peer_id: PeerIdString,
repl_network: String,
lamports_clock_bound: (u64, u64),
replica_data_state: StringVector,
) {
let state = core.network_info.replication.state.lock().await;
if let Some(state) = state.get(&repl_network) {
if state.last_clock >= lamports_clock_bound.1 {
return;
}
}
drop(state);
let replica_buffer_state = replica_data_state
.into_iter()
.filter(|id| !id.contains(&core.peer_id().to_string()))
.map(|id| {
let msg_id = id.split(Core::ENTRY_DELIMITER).collect::<Vec<_>>()[0];
msg_id.into()
})
.collect::<BTreeSet<_>>();
let mut missing_msgs = {
let mut queue = self.queue.lock().await;
if let Some(local_state) = queue.get_mut(&repl_network) {
let local_buffer_state = local_state
.iter()
.filter(|data| {
data.lamport_clock >= lamports_clock_bound.0
&& data.lamport_clock <= lamports_clock_bound.1
})
.map(|data| data.message_id.clone())
.collect::<BTreeSet<_>>();
replica_buffer_state
.difference(&local_buffer_state)
.cloned()
.map(|id| id.into())
.collect::<ByteVector>()
} else {
return; }
};
if !missing_msgs.is_empty() {
if let Ok(repl_peer_id) = repl_peer_id.parse::<PeerId>() {
let mut rpc_data: ByteVector = vec![
Core::RPC_SYNC_PULL_FLAG.into(), repl_network.clone().into(), ];
rpc_data.append(&mut missing_msgs);
let fetch_request = AppData::SendRpc {
keys: rpc_data,
peer: repl_peer_id,
};
if let Ok(response) = core.query_network(fetch_request).await {
if let AppResponse::SendRpc(messages) = response {
let response = util::unmarshal_messages(messages);
let mut queue = self.queue.lock().await;
if let Some(local_state) = queue.get_mut(&repl_network) {
for missing_msg in response {
local_state.insert(missing_msg);
}
}
}
}
}
}
}
pub async fn pull_missing_data(
&self,
repl_network: String,
message_ids: &[Vec<u8>],
) -> ByteVector {
let local_state = {
let queue = self.queue.lock().await;
queue.get(&repl_network).cloned()
};
if let Some(local_state) = local_state {
let requested_msgs = if message_ids[0].is_empty() {
local_state.iter().collect::<Vec<_>>()
} else {
local_state
.iter()
.filter(|&data| message_ids.contains(&data.message_id.as_bytes().to_vec()))
.collect::<Vec<_>>()
};
let mut result = Vec::new();
for msg in requested_msgs {
let joined_data = msg.data.join(Core::DATA_DELIMITER);
let mut entry = Vec::new();
entry.extend_from_slice(joined_data.as_bytes());
entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
entry.extend_from_slice(msg.lamport_clock.to_string().as_bytes());
entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
entry.extend_from_slice(msg.outgoing_timestamp.to_string().as_bytes());
entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
entry.extend_from_slice(msg.incoming_timestamp.to_string().as_bytes());
entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
entry.extend_from_slice(msg.message_id.as_bytes());
entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
entry.extend_from_slice(msg.sender.to_base58().as_bytes());
if !result.is_empty() {
result.extend_from_slice(Core::ENTRY_DELIMITER.to_string().as_bytes());
}
result.extend(entry);
}
return vec![result];
}
Default::default()
}
pub async fn replicate_buffer(
&self,
mut core: Core,
repl_network: String,
replica_node: PeerId,
) -> Result<(), NetworkError> {
let rpc_data: ByteVector = vec![
Core::RPC_SYNC_PULL_FLAG.into(),
repl_network.clone().into(), vec![], ];
let fetch_request = AppData::SendRpc {
keys: rpc_data,
peer: replica_node,
};
let mut queue = self.queue.lock().await;
match queue.get_mut(&repl_network) {
Some(local_state) => {
match core.query_network(fetch_request).await? {
AppResponse::SendRpc(messages) => {
let response = util::unmarshal_messages(messages);
for missing_msg in response {
local_state.insert(missing_msg);
}
Ok(())
},
AppResponse::Error(err) => Err(err),
_ => Err(NetworkError::RpcDataFetchError),
}
},
None => Err(NetworkError::MissingReplNetwork),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const CUSTOM_TCP_PORT: Port = 49666;
const CUSTOM_UDP_PORT: Port = 49852;
pub async fn setup_node(ports: (Port, Port)) -> Core {
let config = BootstrapConfig::default()
.with_tcp(ports.0)
.with_udp(ports.1);
CoreBuilder::with_config(config).build().await.unwrap()
}
#[test]
fn test_initialization_with_default_config() {
let buffer = ReplicaBufferQueue::new(ReplNetworkConfig::Default);
match buffer.consistency_model() {
ConsistencyModel::Eventual => assert!(true),
_ => panic!("Consistency model not initialized correctly"),
}
}
#[test]
fn test_initialization_with_custom_config() {
let config = ReplNetworkConfig::Custom {
queue_length: 200,
expiry_time: Some(120),
sync_wait_time: 10,
consistency_model: ConsistencyModel::Strong(ConsensusModel::All),
data_aging_period: 15,
};
let buffer = ReplicaBufferQueue::new(config);
match buffer.consistency_model() {
ConsistencyModel::Strong(ConsensusModel::All) => assert!(true),
_ => panic!("Consistency model not initialized correctly"),
}
match buffer.config {
ReplNetworkConfig::Custom { queue_length, .. } => {
assert_eq!(queue_length, 200);
},
_ => panic!("Queue length not initialized correctly"),
}
match buffer.config {
ReplNetworkConfig::Custom { expiry_time, .. } => {
assert_eq!(expiry_time, Some(120));
},
_ => panic!("Expiry time not initialized correctly"),
}
match buffer.config {
ReplNetworkConfig::Custom { sync_wait_time, .. } => {
assert_eq!(sync_wait_time, 10);
},
_ => panic!("Sync wait time not initialized correctly"),
}
match buffer.config {
ReplNetworkConfig::Custom {
data_aging_period, ..
} => {
assert_eq!(data_aging_period, 15);
},
_ => panic!("Data aging period not initialized correctly"),
}
}
#[test]
fn test_buffer_overflow_expiry_behavior() {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let expiry_period: u64 = 2;
let config = ReplNetworkConfig::Custom {
queue_length: 4,
expiry_time: Some(expiry_period), sync_wait_time: 10,
consistency_model: ConsistencyModel::Eventual,
data_aging_period: 10,
};
let network = setup_node((CUSTOM_TCP_PORT, CUSTOM_UDP_PORT)).await;
let buffer = ReplicaBufferQueue::new(config);
for clock in 1..5 {
let data = ReplBufferData {
data: vec!["Data 1".into()],
lamport_clock: clock,
outgoing_timestamp: util::get_unix_timestamp(),
incoming_timestamp: util::get_unix_timestamp(),
message_id: "msg1".into(),
sender: PeerId::random(),
confirmations: None,
};
buffer
.push(network.clone(), "network1".into(), data.clone())
.await;
}
assert_eq!(
buffer
.pop_front(network.clone(), "network1")
.await
.unwrap()
.lamport_clock,
1
);
tokio::time::sleep(std::time::Duration::from_secs(expiry_period)).await; assert_eq!(buffer.queue.lock().await.get("network1").unwrap().len(), 3);
buffer
.push(
network.clone(),
"network1".into(),
ReplBufferData {
data: vec!["Data 1".into()],
lamport_clock: 6,
outgoing_timestamp: util::get_unix_timestamp(),
incoming_timestamp: util::get_unix_timestamp(),
message_id: "msg1".into(),
sender: PeerId::random(),
confirmations: None,
},
)
.await;
assert_eq!(buffer.queue.lock().await.get("network1").unwrap().len(), 4);
buffer
.push(
network.clone(),
"network1".into(),
ReplBufferData {
data: vec!["Data 1".into()],
lamport_clock: 42,
outgoing_timestamp: util::get_unix_timestamp(),
incoming_timestamp: util::get_unix_timestamp(),
message_id: "msg1".into(),
sender: PeerId::random(),
confirmations: None,
},
)
.await;
assert_eq!(
buffer
.pop_front(network.clone(), "network1")
.await
.unwrap()
.lamport_clock,
6
);
assert_eq!(
buffer
.pop_front(network.clone(), "network1")
.await
.unwrap()
.lamport_clock,
42
);
});
}
#[test]
fn test_buffer_overflow_no_expiry_behavior() {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let config = ReplNetworkConfig::Custom {
queue_length: 4,
expiry_time: None, sync_wait_time: 10,
consistency_model: ConsistencyModel::Eventual,
data_aging_period: 10,
};
let network = setup_node((15555, 6666)).await;
let buffer = ReplicaBufferQueue::new(config);
for clock in 1..5 {
let data = ReplBufferData {
data: vec!["Data 1".into()],
lamport_clock: clock,
outgoing_timestamp: util::get_unix_timestamp(),
incoming_timestamp: util::get_unix_timestamp(),
message_id: "msg1".into(),
sender: PeerId::random(),
confirmations: None,
};
buffer
.push(network.clone(), "network1".into(), data.clone())
.await;
}
assert_eq!(
buffer
.pop_front(network.clone(), "network1")
.await
.unwrap()
.lamport_clock,
1
);
buffer
.push(
network.clone(),
"network1".into(),
ReplBufferData {
data: vec!["Data 1".into()],
lamport_clock: 6,
outgoing_timestamp: util::get_unix_timestamp(),
incoming_timestamp: util::get_unix_timestamp(),
message_id: "msg1".into(),
sender: PeerId::random(),
confirmations: None,
},
)
.await;
assert_eq!(
buffer
.pop_front(network.clone(), "network1")
.await
.unwrap()
.lamport_clock,
2
);
assert_eq!(
buffer
.pop_front(network.clone(), "network1")
.await
.unwrap()
.lamport_clock,
3
);
});
}
#[test]
fn test_pop_from_empty_buffer() {
tokio::runtime::Runtime::new().unwrap().block_on(async {
let config = ReplNetworkConfig::Default;
let buffer = ReplicaBufferQueue::new(config);
let network = setup_node((15551, 6661)).await;
let result = buffer.pop_front(network.clone(), "network1").await;
assert_eq!(result.is_none(), true);
});
}
}