swarm_nl/core/
replication.rs

1// Copyright 2024 Algorealm, Inc.
2// Apache 2.0 License
3
4//! Module that contains important data structures to manage replication operations on the
5//! network.
6
7use super::*;
8use std::{cmp::Ordering, collections::BTreeMap, sync::Arc, time::SystemTime};
9
10/// Struct respresenting data for configuring node replication.
11#[derive(Clone, Default, Debug)]
12pub struct ReplConfigData {
13	/// Lamport's clock for synchronization.
14	pub lamport_clock: Nonce,
15	/// Clock of last data consumed from the replica buffer.
16	pub last_clock: Nonce,
17	/// Replica nodes described by their addresses.
18	pub nodes: HashMap<String, String>,
19}
20
21/// Struct containing important information for replication.
22#[derive(Clone)]
23pub struct ReplInfo {
24	/// Internal state for replication.
25	pub state: Arc<Mutex<HashMap<String, ReplConfigData>>>,
26}
27
28/// The consistency models supported.
29///
30/// This is important as is determines the behaviour of the node in handling and delivering
31/// replicated data to the application layer. There are also trade-offs to be considered
32/// before choosing any model. You must choose the model that aligns and suits your exact
33/// usecase and objective.
34#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35pub enum ConsistencyModel {
36	/// Eventual consistency
37	Eventual,
38	/// Strong consistency
39	Strong(ConsensusModel),
40}
41
42/// This enum dictates how many nodes need to come to an agreement for consensus to be held
43/// during the impl of a strong consistency sync model.
44#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45pub enum ConsensusModel {
46	/// All nodes in the network must contribute to consensus
47	All,
48	/// Just a subset of the network are needed for consensus
49	MinPeers(u64),
50}
51
52/// Enum containing configurations for replication.
53#[derive(Clone, Debug)]
54pub enum ReplNetworkConfig {
55	/// A custom configuration.
56	Custom {
57		/// Max capacity for transient storage.
58		queue_length: u64,
59		/// Expiry time of data in the buffer if the buffer is full. Set to `None` for no expiry.
60		expiry_time: Option<Seconds>,
61		/// Epoch to wait before attempting the next network synchronization of data in the buffer.
62		sync_wait_time: Seconds,
63		/// The data consistency model to be supported by the node. This must be uniform across all
64		/// nodes to prevent undefined behaviour.
65		consistency_model: ConsistencyModel,
66		/// When data has arrived and is saved into the buffer, the time to wait for it to get to
67		/// other peers after which it can be picked for synchronization.
68		data_aging_period: Seconds,
69	},
70	/// A default configuration: `queue_length` = 100, `expiry_time` = 60 seconds,
71	/// `sync_wait_time` = 5 seconds, `consistency_model`: `Eventual`, `data_wait_period` = 5
72	/// seconds.
73	Default,
74}
75
76/// Important data to marshall from incoming relication payload and store in the replica
77/// buffer.
78#[derive(Clone, Debug)]
79pub struct ReplBufferData {
80	/// Raw incoming data.
81	pub data: StringVector,
82	/// Lamports clock for synchronization.
83	pub lamport_clock: Nonce,
84	/// Timestamp at which the message left the sending node.
85	pub outgoing_timestamp: Seconds,
86	/// Timestamp at which the message arrived.
87	pub incoming_timestamp: Seconds,
88	/// Message ID to prevent deduplication. It is usually a hash of the incoming message.
89	pub message_id: String,
90	/// Sender PeerId.
91	pub sender: PeerId,
92	/// Number of confirmations. This is to help the nodes using the strong consistency
93	/// synchronization data model to come to an agreement
94	pub confirmations: Option<Nonce>,
95}
96
97/// Implement Ord.
98impl Ord for ReplBufferData {
99	fn cmp(&self, other: &Self) -> Ordering {
100		self.lamport_clock
101			.cmp(&other.lamport_clock) // Compare by lamport_clock first
102			.then_with(|| self.message_id.cmp(&other.message_id)) // Then compare by message_id
103	}
104}
105
106/// Implement PartialOrd.
107impl PartialOrd for ReplBufferData {
108	fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
109		Some(self.cmp(other))
110	}
111}
112
113/// Implement Eq.
114impl Eq for ReplBufferData {}
115
116/// Implement PartialEq.
117impl PartialEq for ReplBufferData {
118	fn eq(&self, other: &Self) -> bool {
119		self.lamport_clock == other.lamport_clock && self.message_id == other.message_id
120	}
121}
122
123/// Replica buffer queue where incoming replicated data are stored temporarily.
124pub(crate) struct ReplicaBufferQueue {
125	/// Configuration for replication and general synchronization.
126	config: ReplNetworkConfig,
127	/// In the case of a strong consistency model, this is where data is buffered
128	/// initially before it is agreed upon by the majority of the network. After which
129	/// it is then moved to the queue exposed to the application layer.
130	temporary_queue: Mutex<BTreeMap<String, BTreeMap<String, ReplBufferData>>>,
131	/// Internal buffer containing replicated data to be consumed by the application layer.
132	queue: Mutex<BTreeMap<String, BTreeSet<ReplBufferData>>>,
133}
134
135impl ReplicaBufferQueue {
136	/// The default max capacity of the buffer.
137	const MAX_CAPACITY: u64 = 150;
138
139	/// The default expiry time of data in the buffer, when the buffer becomes full.
140	const EXPIRY_TIME: Seconds = 60;
141
142	/// The default epoch to wait before attempting the next network synchronization.
143	const SYNC_WAIT_TIME: Seconds = 5;
144
145	/// The default aging period after which the data can be synchronized across the network.
146	const DATA_AGING_PERIOD: Seconds = 5;
147
148	/// Create a new instance of [ReplicaBufferQueue].
149	pub fn new(config: ReplNetworkConfig) -> Self {
150		Self {
151			config,
152			temporary_queue: Mutex::new(Default::default()),
153			queue: Mutex::new(Default::default()),
154		}
155	}
156
157	/// Return the configured [`ConsistencyModel`] for data synchronization.
158	pub fn consistency_model(&self) -> ConsistencyModel {
159		match self.config {
160			// Default config always supports eventual consistency
161			ReplNetworkConfig::Default => ConsistencyModel::Eventual,
162			ReplNetworkConfig::Custom {
163				consistency_model, ..
164			} => consistency_model,
165		}
166	}
167
168	/// Initialize a replica networks data buffer. This occurs immediately after joining a new
169	/// network.
170	pub async fn init(&self, repl_network: String) {
171		// Initialize primary public buffer
172		let mut queue = self.queue.lock().await;
173		queue.insert(repl_network.clone(), Default::default());
174
175		// Initialize transient buffer, in case of a strong consistency model
176		if self.consistency_model() != ConsistencyModel::Eventual {
177			let mut queue = self.temporary_queue.lock().await;
178			queue.insert(repl_network.clone(), Default::default());
179		}
180	}
181
182	/// Push a new [ReplBufferData] item into the buffer.
183	pub async fn push(&self, mut core: Core, replica_network: String, data: ReplBufferData) {
184		// Different behaviours based on configurations
185		match self.config {
186			// Default implementation supports expiry of buffer items
187			ReplNetworkConfig::Default => {
188				// Lock the queue to modify it
189				let mut queue = self.queue.lock().await;
190
191				// Filter into replica network the data belongs to.
192				// If it doesn't exist, create new
193				let queue = queue.entry(replica_network).or_default();
194
195				// If the queue is full, remove expired data first
196				while queue.len() as u64 >= Self::MAX_CAPACITY {
197					// Check and remove expired data
198					let current_time = SystemTime::now()
199						.duration_since(SystemTime::UNIX_EPOCH)
200						.unwrap()
201						.as_secs();
202					let mut expired_items = Vec::new();
203
204					// Identify expired items and collect them for removal
205					for entry in queue.iter() {
206						if current_time.saturating_sub(entry.outgoing_timestamp)
207							>= Self::EXPIRY_TIME
208						{
209							expired_items.push(entry.clone());
210						}
211					}
212
213					// Remove expired items
214					for expired in expired_items {
215						queue.remove(&expired);
216					}
217
218					// If no expired items were removed, pop the front (oldest) item
219					if queue.len() as u64 >= Self::MAX_CAPACITY {
220						if let Some(first) = queue.iter().next().cloned() {
221							queue.remove(&first);
222						}
223					}
224				}
225
226				// Insert data right into the final queue
227				queue.insert(data);
228			},
229			// Here decay applies in addition to removal of excess buffer content
230			ReplNetworkConfig::Custom {
231				queue_length,
232				expiry_time,
233				consistency_model,
234				..
235			} => {
236				// Which buffer the incoming data will interact with initially is determined by
237				// the supported data consistency model
238				match consistency_model {
239					// For eventual consistency, data is written straight into the final queue
240					// for consumption
241					ConsistencyModel::Eventual => {
242						// Lock the queue to modify it
243						let mut queue = self.queue.lock().await;
244
245						// Filter into replica network the data belongs to.
246						// If it doesn't exist, create new
247						let queue = queue.entry(replica_network).or_default();
248
249						// If the queue is full, remove expired data first
250						while queue.len() as u64 >= queue_length {
251							// Remove only when data expiration is supported
252							if let Some(expiry_time) = expiry_time {
253								// Check and remove expired data
254								let current_time = SystemTime::now()
255									.duration_since(SystemTime::UNIX_EPOCH)
256									.unwrap()
257									.as_secs();
258								let mut expired_items = Vec::new();
259
260								// Identify expired items and collect them for removal
261								for entry in queue.iter() {
262									if current_time.saturating_sub(entry.outgoing_timestamp)
263										>= expiry_time
264									{
265										expired_items.push(entry.clone());
266									}
267								}
268
269								// Remove expired items
270								for expired in expired_items {
271									queue.remove(&expired);
272								}
273							}
274
275							// If no expired items were removed, pop the front (oldest) item
276							if queue.len() as u64 >= queue_length {
277								if let Some(first) = queue.iter().next().cloned() {
278									queue.remove(&first);
279								}
280							}
281						}
282
283						// Insert data right into the final queue
284						queue.insert(data);
285					},
286					// Here data is written into the temporary buffer first, for finalization to
287					// occur. It is then moved into the final queue after favourable consensus
288					// has been reached.
289					ConsistencyModel::Strong(consensus_model) => {
290						// Lock the queue to modify it
291						let mut temp_queue = self.temporary_queue.lock().await;
292
293						// Filter into replica network the data belongs to.
294						// If it doesn't exist, create new
295						let temp_queue = temp_queue.entry(replica_network.clone()).or_default();
296
297						// Remove the first item from the queue. No decay applies here
298						if temp_queue.len() as u64 >= Self::MAX_CAPACITY {
299							if let Some(first_key) = temp_queue.keys().next().cloned() {
300								temp_queue.remove(&first_key);
301							}
302						}
303
304						// Check whether we are 1 of 2 members of the replica network.
305						// Send a request to swarm
306						let replica_peers = core.replica_peers(&replica_network).await.len();
307
308						// Put into the primary public buffer directly if we are only two in the
309						// network or the consensus model required only one node confirmations
310						if replica_peers == 1 || consensus_model == ConsensusModel::MinPeers(1) {
311							let mut queue = self.queue.lock().await;
312							let entry = queue.entry(replica_network.clone()).or_default();
313
314							// Insert into queue
315							entry.insert(data);
316						} else {
317							// Get message ID
318							let message_id = data.message_id.clone();
319
320							// Insert data into queue. Confirmation count is already 1
321							temp_queue.insert(data.message_id.clone(), data);
322
323							// Start strong consistency synchronization algorithm:
324							// Broadcast just recieved message to peers to increase the
325							// confirmation. It is just the message ID that will be broadcast
326							let message = vec![
327								Core::STRONG_CONSISTENCY_FLAG.as_bytes().to_vec(), /* Strong Consistency Sync Gossip Flag */
328								replica_network.clone().into(),                    /* Replica network */
329								message_id.as_bytes().into(),                      /* Message id */
330							];
331
332							// Prepare a gossip request
333							let gossip_request = AppData::GossipsubBroadcastMessage {
334								topic: replica_network.into(),
335								message,
336							};
337
338							// Gossip data to replica nodes
339							let _ = core.query_network(gossip_request).await;
340						}
341					},
342				}
343			},
344		}
345	}
346
347	// Pop the front (earliest data) from the queue
348	pub async fn pop_front(&self, core: Core, replica_network: &str) -> Option<ReplBufferData> {
349		// Lock the queue and extract the replica network's queue
350		let first_data = {
351			let mut queue = self.queue.lock().await;
352
353			if let Some(queue) = queue.get_mut(replica_network) {
354				if let Some(first) = queue.iter().next().cloned() {
355					// Remove the front element from the queue
356					queue.remove(&first);
357					Some(first)
358				} else {
359					None
360				}
361			} else {
362				None
363			}
364		};
365
366		// If no data to process, return early
367		let first = first_data?;
368
369		// Lock replication state to update lamport clock
370		{
371			let mut cfg = core.network_info.replication.state.lock().await;
372
373			let entry = cfg
374				.entry(replica_network.to_owned())
375				.or_insert_with(|| ReplConfigData {
376					lamport_clock: 0,
377					last_clock: first.lamport_clock,
378					nodes: Default::default(),
379				});
380
381			// Update the clock
382			entry.last_clock = first.lamport_clock;
383		}
384
385		Some(first)
386	}
387
388	pub async fn handle_data_confirmation(
389		&self,
390		mut core: Core,
391		replica_network: String,
392		message_id: String,
393	) {
394		// Determine the number of peers required for consensus
395		let peers_count = match self.config {
396			ReplNetworkConfig::Custom {
397				consistency_model, ..
398			} => match consistency_model {
399				ConsistencyModel::Eventual => 0,
400				ConsistencyModel::Strong(consensus_model) => match consensus_model {
401					ConsensusModel::All => {
402						// Get total number of replica peers
403						core.replica_peers(&replica_network).await.len() as u64
404					},
405					ConsensusModel::MinPeers(required_peers) => required_peers,
406				},
407			},
408			ReplNetworkConfig::Default => 0,
409		};
410
411		// Update confirmation count while holding the lock minimally
412		let is_fully_confirmed = {
413			let mut flag = false;
414			let mut temporary_queue = self.temporary_queue.lock().await;
415			if let Some(temp_queue) = temporary_queue.get_mut(&replica_network) {
416				if let Some(data_entry) = temp_queue.get_mut(&message_id) {
417					if data_entry.confirmations.unwrap() < peers_count {
418						// Increment confirmation count
419						data_entry.confirmations = Some(data_entry.confirmations.unwrap_or(1) + 1);
420					}
421					// Check if confirmations meet required peers
422					flag = peers_count != 0 && data_entry.confirmations == Some(peers_count);
423				}
424			}
425
426			flag
427		};
428
429		// If fully confirmed, move data to the public queue
430		if is_fully_confirmed {
431			let mut public_queue = self.queue.lock().await;
432			let public_queue = public_queue
433				.entry(replica_network.clone())
434				.or_insert_with(BTreeSet::new);
435
436			// Cleanup expired or excessive entries
437			if let ReplNetworkConfig::Custom {
438				queue_length,
439				expiry_time,
440				..
441			} = self.config
442			{
443				// Remove oldest items if queue exceeds capacity, expired first
444				if public_queue.len() as u64 >= queue_length {
445					let current_time = SystemTime::now()
446						.duration_since(SystemTime::UNIX_EPOCH)
447						.unwrap()
448						.as_secs();
449
450					// Remove expired items
451					if let Some(expiry_time) = expiry_time {
452						public_queue.retain(|entry| {
453							current_time.saturating_sub(entry.outgoing_timestamp) < expiry_time
454						});
455					}
456				}
457
458				// If no expired content, or expiry is disabled, then remove first queue element
459				while public_queue.len() as u64 >= queue_length {
460					if let Some(first) = public_queue.iter().next().cloned() {
461						public_queue.remove(&first);
462					}
463				}
464			}
465
466			// Move confirmed entry to public queue
467			let mut temporary_queue = self.temporary_queue.lock().await;
468			if let Some(temp_queue) = temporary_queue.get_mut(&replica_network) {
469				if let Some(data_entry) = temp_queue.remove(&message_id) {
470					public_queue.insert(data_entry);
471				}
472			}
473		}
474	}
475
476	/// Synchronize the data in the buffer queue using eventual consistency.
477	pub async fn sync_with_eventual_consistency(&self, core: Core, repl_network: String) {
478		loop {
479			let repl_network = repl_network.clone();
480			let mut core = core.clone();
481
482			// Get configured aging period
483			let data_aging_time = match self.config {
484				ReplNetworkConfig::Default => Self::DATA_AGING_PERIOD,
485				ReplNetworkConfig::Custom {
486					data_aging_period, ..
487				} => data_aging_period,
488			};
489
490			// Fetch local data state while holding the lock minimally
491			let local_data_state = {
492				let queue = self.queue.lock().await;
493				queue.get(&repl_network).cloned()
494			};
495
496			if let Some(local_data_state) = local_data_state {
497				// Filter data outside the lock
498				let local_data = local_data_state
499					.iter()
500					.filter(|&d| {
501						util::get_unix_timestamp().saturating_sub(d.incoming_timestamp)
502							> data_aging_time
503					})
504					.cloned()
505					.collect::<BTreeSet<_>>();
506
507				// Extract the bounding Lamport clocks
508				let (min_clock, max_clock) =
509					if let (Some(first), Some(last)) = (local_data.first(), local_data.last()) {
510						(first.lamport_clock, last.lamport_clock)
511					} else {
512						// Default values if no data is present
513						(0, 0)
514					};
515
516				// Extract message IDs for synchronization
517				let mut message_ids = local_data
518					.iter()
519					.map(|data| {
520						// Make the ID a concatenation of the message Id and it's original pubishing
521						// peer
522						let id = data.message_id.clone()
523							+ Core::ENTRY_DELIMITER
524							+ &data.sender.to_string();
525						id.into()
526					})
527					.collect::<ByteVector>();
528
529				// Prepare gossip message
530				let mut message = vec![
531					// Strong Consistency Sync Gossip Flag
532					Core::EVENTUAL_CONSISTENCY_FLAG.as_bytes().to_vec(),
533					// Node's Peer ID
534					core.peer_id().to_string().into(),
535					repl_network.clone().into(),
536					min_clock.to_string().into(),
537					max_clock.to_string().into(),
538				];
539
540				// Append the message IDs
541				message.append(&mut message_ids);
542
543				// Broadcast gossip request
544				let gossip_request = AppData::GossipsubBroadcastMessage {
545					topic: repl_network.into(),
546					message,
547				};
548
549				let _ = core.query_network(gossip_request).await;
550			}
551
552			// Wait for a defined duration before the next sync
553			#[cfg(feature = "tokio-runtime")]
554			tokio::time::sleep(Duration::from_secs(Self::SYNC_WAIT_TIME)).await;
555
556			#[cfg(feature = "async-std-runtime")]
557			async_std::task::sleep(Duration::from_secs(Self::SYNC_WAIT_TIME)).await;
558		}
559	}
560
561	/// Synchronize incoming buffer image from a replica node with the local buffer image.
562	pub async fn sync_buffer_image(
563		&self,
564		mut core: Core,
565		repl_peer_id: PeerIdString,
566		repl_network: String,
567		lamports_clock_bound: (u64, u64),
568		replica_data_state: StringVector,
569	) {
570		// Only when the clock of the last consumed buffer is greater than or equal to the higher
571		// clock sync bound, will the synchronizatio occur
572		let state = core.network_info.replication.state.lock().await;
573		if let Some(state) = state.get(&repl_network) {
574			if state.last_clock >= lamports_clock_bound.1 {
575				return;
576			}
577		}
578
579		// Free `Core`
580		drop(state);
581
582		// Convert replica data state into a set outside the mutex lock.
583		// Filter replica buffer too so it doesn't contain the data that we published.
584		// This is done using the messageId since by gossipsub, messageId = (Publishing
585		// peerId + Nonce)
586		let replica_buffer_state = replica_data_state
587			.into_iter()
588			.filter(|id| !id.contains(&core.peer_id().to_string()))
589			.map(|id| {
590				// Extract message Id
591				let msg_id = id.split(Core::ENTRY_DELIMITER).collect::<Vec<_>>()[0];
592				msg_id.into()
593			})
594			.collect::<BTreeSet<_>>();
595
596		// Extract local buffer state and filter it while keeping the mutex lock duration
597		// minimal
598		let mut missing_msgs = {
599			let mut queue = self.queue.lock().await;
600			if let Some(local_state) = queue.get_mut(&repl_network) {
601				let local_buffer_state = local_state
602					.iter()
603					.filter(|data| {
604						data.lamport_clock >= lamports_clock_bound.0
605							&& data.lamport_clock <= lamports_clock_bound.1
606					})
607					.map(|data| data.message_id.clone())
608					.collect::<BTreeSet<_>>();
609
610				// Extract messages missing from our local buffer
611				replica_buffer_state
612					.difference(&local_buffer_state)
613					.cloned()
614					.map(|id| id.into())
615					.collect::<ByteVector>()
616			} else {
617				return; // If the network state doesn't exist, exit early
618			}
619		};
620
621		if !missing_msgs.is_empty() {
622			// Prepare an RPC fetch request for missing messages
623			if let Ok(repl_peer_id) = repl_peer_id.parse::<PeerId>() {
624				let mut rpc_data: ByteVector = vec![
625					Core::RPC_SYNC_PULL_FLAG.into(), // RPC sync pull flag
626					repl_network.clone().into(),     // Replica network
627				];
628
629				// Append the missing message ids to the request data
630				rpc_data.append(&mut missing_msgs);
631
632				// Prepare an RPC to ask the replica node for missing data
633				let fetch_request = AppData::SendRpc {
634					keys: rpc_data,
635					peer: repl_peer_id,
636				};
637
638				// Send the fetch request
639				if let Ok(response) = core.query_network(fetch_request).await {
640					if let AppResponse::SendRpc(messages) = response {
641						// Parse response
642						let response = util::unmarshal_messages(messages);
643
644						// Re-lock the mutex only for inserting new messages
645						let mut queue = self.queue.lock().await;
646						if let Some(local_state) = queue.get_mut(&repl_network) {
647							for missing_msg in response {
648								local_state.insert(missing_msg);
649							}
650						}
651					}
652				}
653			}
654		}
655	}
656
657	/// Pull and return missing data requested by a replica node.
658	pub async fn pull_missing_data(
659		&self,
660		repl_network: String,
661		message_ids: &[Vec<u8>],
662	) -> ByteVector {
663		// Fetch the local state from the queue with a minimal lock
664		let local_state = {
665			let queue = self.queue.lock().await;
666			queue.get(&repl_network).cloned()
667		};
668
669		// If the local state exists, process the message retrieval
670		if let Some(local_state) = local_state {
671			// Check if it a clone request
672			let requested_msgs = if message_ids[0].is_empty() {
673				// Retrieve all messages in buffer
674				local_state.iter().collect::<Vec<_>>()
675			} else {
676				// Retrieve messages that match the requested message IDs
677				local_state
678					.iter()
679					.filter(|&data| message_ids.contains(&data.message_id.as_bytes().to_vec()))
680					.collect::<Vec<_>>()
681			};
682
683			// Prepare the result buffer
684			let mut result = Vec::new();
685
686			for msg in requested_msgs {
687				// Serialize the `data` field (Vec<String>) into a single string, separated by
688				// `$$`
689				let joined_data = msg.data.join(Core::DATA_DELIMITER);
690
691				// Serialize individual fields, excluding `confirmations`
692				let mut entry = Vec::new();
693				entry.extend_from_slice(joined_data.as_bytes());
694				entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
695				entry.extend_from_slice(msg.lamport_clock.to_string().as_bytes());
696				entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
697				entry.extend_from_slice(msg.outgoing_timestamp.to_string().as_bytes());
698				entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
699				entry.extend_from_slice(msg.incoming_timestamp.to_string().as_bytes());
700				entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
701				entry.extend_from_slice(msg.message_id.as_bytes());
702				entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
703				entry.extend_from_slice(msg.sender.to_base58().as_bytes());
704 
705				// Append the entry to the result, separated by `ENTRY_DELIMITER`
706				if !result.is_empty() {
707					result.extend_from_slice(Core::ENTRY_DELIMITER.to_string().as_bytes());
708				}
709				result.extend(entry);
710			}
711
712			return vec![result];
713		}
714
715		// Default empty result if no local state is found.
716		Default::default()
717	}
718
719	/// Replicate and populate buffer with replica's state.
720	pub async fn replicate_buffer(
721		&self,
722		mut core: Core,
723		repl_network: String,
724		replica_node: PeerId,
725	) -> Result<(), NetworkError> {
726		// Send an RPC to the node to retreive it's buffer image
727		let rpc_data: ByteVector = vec![
728			// RPC buffer copy flag. It is the samething as the sync pull flag with an empty
729			// message id vector.
730			Core::RPC_SYNC_PULL_FLAG.into(),
731			repl_network.clone().into(), // Replica network
732			vec![],                      // Empty vector indicating a total PULL
733		];
734
735		// Prepare an RPC to ask the replica node for missing data
736		let fetch_request = AppData::SendRpc {
737			keys: rpc_data,
738			peer: replica_node,
739		};
740
741		// Try to query the replica node and insert data received into buffer
742		let mut queue = self.queue.lock().await;
743		match queue.get_mut(&repl_network) {
744			Some(local_state) => {
745				// Send the fetch request
746				match core.query_network(fetch_request).await? {
747					AppResponse::SendRpc(messages) => {
748						// Parse response
749						let response = util::unmarshal_messages(messages);
750						// Insert into data buffer queue
751						for missing_msg in response {
752							local_state.insert(missing_msg);
753						}
754
755						Ok(())
756					},
757					AppResponse::Error(err) => Err(err),
758					_ => Err(NetworkError::RpcDataFetchError),
759				}
760			},
761			None => Err(NetworkError::MissingReplNetwork),
762		}
763	}
764}
765
766#[cfg(test)]
767mod tests {
768
769	// use libp2p::dns::tokio;
770	use super::*;
771
772	// Define custom ports for testing
773	const CUSTOM_TCP_PORT: Port = 49666;
774	const CUSTOM_UDP_PORT: Port = 49852;
775
776	// Setup a node using default config
777	pub async fn setup_node(ports: (Port, Port)) -> Core {
778		let config = BootstrapConfig::default()
779			.with_tcp(ports.0)
780			.with_udp(ports.1);
781
782		// Set up network
783		CoreBuilder::with_config(config).build().await.unwrap()
784	}
785
786	#[test]
787	fn test_initialization_with_default_config() {
788		let buffer = ReplicaBufferQueue::new(ReplNetworkConfig::Default);
789
790		match buffer.consistency_model() {
791			ConsistencyModel::Eventual => assert!(true),
792			_ => panic!("Consistency model not initialized correctly"),
793		}
794	}
795
796	#[test]
797	fn test_initialization_with_custom_config() {
798		let config = ReplNetworkConfig::Custom {
799			queue_length: 200,
800			expiry_time: Some(120),
801			sync_wait_time: 10,
802			consistency_model: ConsistencyModel::Strong(ConsensusModel::All),
803			data_aging_period: 15,
804		};
805		let buffer = ReplicaBufferQueue::new(config);
806
807		match buffer.consistency_model() {
808			ConsistencyModel::Strong(ConsensusModel::All) => assert!(true),
809			_ => panic!("Consistency model not initialized correctly"),
810		}
811
812		// Verify queue length
813		match buffer.config {
814			ReplNetworkConfig::Custom { queue_length, .. } => {
815				assert_eq!(queue_length, 200);
816			},
817			_ => panic!("Queue length not initialized correctly"),
818		}
819
820		// Verify expiry time
821		match buffer.config {
822			ReplNetworkConfig::Custom { expiry_time, .. } => {
823				assert_eq!(expiry_time, Some(120));
824			},
825			_ => panic!("Expiry time not initialized correctly"),
826		}
827
828		// Verify sync wait time
829		match buffer.config {
830			ReplNetworkConfig::Custom { sync_wait_time, .. } => {
831				assert_eq!(sync_wait_time, 10);
832			},
833			_ => panic!("Sync wait time not initialized correctly"),
834		}
835
836		// Verify data aging period
837		match buffer.config {
838			ReplNetworkConfig::Custom {
839				data_aging_period, ..
840			} => {
841				assert_eq!(data_aging_period, 15);
842			},
843			_ => panic!("Data aging period not initialized correctly"),
844		}
845	}
846
847	// -- Buffer Queue Tests --
848
849	#[test]
850	fn test_buffer_overflow_expiry_behavior() {
851		tokio::runtime::Runtime::new().unwrap().block_on(async {
852			let expiry_period: u64 = 2;
853
854			let config = ReplNetworkConfig::Custom {
855				queue_length: 4,
856				expiry_time: Some(expiry_period), // Set very short expiry for testing
857				sync_wait_time: 10,
858				consistency_model: ConsistencyModel::Eventual,
859				data_aging_period: 10,
860			};
861
862			let network = setup_node((CUSTOM_TCP_PORT, CUSTOM_UDP_PORT)).await;
863			let buffer = ReplicaBufferQueue::new(config);
864
865			// Fill up buffer
866			for clock in 1..5 {
867				let data = ReplBufferData {
868					data: vec!["Data 1".into()],
869					lamport_clock: clock,
870					outgoing_timestamp: util::get_unix_timestamp(),
871					incoming_timestamp: util::get_unix_timestamp(),
872					message_id: "msg1".into(),
873					sender: PeerId::random(),
874					confirmations: None,
875				};
876
877				buffer
878					.push(network.clone(), "network1".into(), data.clone())
879					.await;
880			}
881
882			// Check that the first data lamport is 1
883			assert_eq!(
884				buffer
885					.pop_front(network.clone(), "network1")
886					.await
887					.unwrap()
888					.lamport_clock,
889				1
890			);
891
892			tokio::time::sleep(std::time::Duration::from_secs(expiry_period)).await; // Wait for expiry
893
894			// Buffer length should be 3 now
895			assert_eq!(buffer.queue.lock().await.get("network1").unwrap().len(), 3);
896
897			// Fill up buffer
898			buffer
899				.push(
900					network.clone(),
901					"network1".into(),
902					ReplBufferData {
903						data: vec!["Data 1".into()],
904						lamport_clock: 6,
905						outgoing_timestamp: util::get_unix_timestamp(),
906						incoming_timestamp: util::get_unix_timestamp(),
907						message_id: "msg1".into(),
908						sender: PeerId::random(),
909						confirmations: None,
910					},
911				)
912				.await;
913
914			// Verify buffer length is now 4
915			assert_eq!(buffer.queue.lock().await.get("network1").unwrap().len(), 4);
916
917			// Overflow buffer
918			buffer
919				.push(
920					network.clone(),
921					"network1".into(),
922					ReplBufferData {
923						data: vec!["Data 1".into()],
924						lamport_clock: 42,
925						outgoing_timestamp: util::get_unix_timestamp(),
926						incoming_timestamp: util::get_unix_timestamp(),
927						message_id: "msg1".into(),
928						sender: PeerId::random(),
929						confirmations: None,
930					},
931				)
932				.await;
933
934			// We expect that 6 is the first element and 42 is the second as they have not aged out
935			assert_eq!(
936				buffer
937					.pop_front(network.clone(), "network1")
938					.await
939					.unwrap()
940					.lamport_clock,
941				6
942			);
943			assert_eq!(
944				buffer
945					.pop_front(network.clone(), "network1")
946					.await
947					.unwrap()
948					.lamport_clock,
949				42
950			);
951		});
952	}
953
954	#[test]
955	fn test_buffer_overflow_no_expiry_behavior() {
956		tokio::runtime::Runtime::new().unwrap().block_on(async {
957			let config = ReplNetworkConfig::Custom {
958				queue_length: 4,
959				expiry_time: None, // Disable aging
960				sync_wait_time: 10,
961				consistency_model: ConsistencyModel::Eventual,
962				data_aging_period: 10,
963			};
964
965			let network = setup_node((15555, 6666)).await;
966			let buffer = ReplicaBufferQueue::new(config);
967
968			for clock in 1..5 {
969				let data = ReplBufferData {
970					data: vec!["Data 1".into()],
971					lamport_clock: clock,
972					outgoing_timestamp: util::get_unix_timestamp(),
973					incoming_timestamp: util::get_unix_timestamp(),
974					message_id: "msg1".into(),
975					sender: PeerId::random(),
976					confirmations: None,
977				};
978
979				buffer
980					.push(network.clone(), "network1".into(), data.clone())
981					.await;
982			}
983
984			// Check that the first data lamport is 1
985			assert_eq!(
986				buffer
987					.pop_front(network.clone(), "network1")
988					.await
989					.unwrap()
990					.lamport_clock,
991				1
992			);
993
994			buffer
995				.push(
996					network.clone(),
997					"network1".into(),
998					ReplBufferData {
999						data: vec!["Data 1".into()],
1000						lamport_clock: 6,
1001						outgoing_timestamp: util::get_unix_timestamp(),
1002						incoming_timestamp: util::get_unix_timestamp(),
1003						message_id: "msg1".into(),
1004						sender: PeerId::random(),
1005						confirmations: None,
1006					},
1007				)
1008				.await;
1009
1010			// Check that the data lamports are 2 and 3 as expected
1011			assert_eq!(
1012				buffer
1013					.pop_front(network.clone(), "network1")
1014					.await
1015					.unwrap()
1016					.lamport_clock,
1017				2
1018			);
1019			assert_eq!(
1020				buffer
1021					.pop_front(network.clone(), "network1")
1022					.await
1023					.unwrap()
1024					.lamport_clock,
1025				3
1026			);
1027		});
1028	}
1029
1030	#[test]
1031	fn test_pop_from_empty_buffer() {
1032		tokio::runtime::Runtime::new().unwrap().block_on(async {
1033			let config = ReplNetworkConfig::Default;
1034			let buffer = ReplicaBufferQueue::new(config);
1035
1036			let network = setup_node((15551, 6661)).await;
1037
1038			let result = buffer.pop_front(network.clone(), "network1").await;
1039			assert_eq!(result.is_none(), true);
1040		});
1041	}
1042}