1use super::*;
8use std::{cmp::Ordering, collections::BTreeMap, sync::Arc, time::SystemTime};
9
10#[derive(Clone, Default, Debug)]
12pub struct ReplConfigData {
13 pub lamport_clock: Nonce,
15 pub last_clock: Nonce,
17 pub nodes: HashMap<String, String>,
19}
20
21#[derive(Clone)]
23pub struct ReplInfo {
24 pub state: Arc<Mutex<HashMap<String, ReplConfigData>>>,
26}
27
28#[derive(Clone, Copy, Debug, PartialEq, Eq)]
35pub enum ConsistencyModel {
36 Eventual,
38 Strong(ConsensusModel),
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq)]
45pub enum ConsensusModel {
46 All,
48 MinPeers(u64),
50}
51
52#[derive(Clone, Debug)]
54pub enum ReplNetworkConfig {
55 Custom {
57 queue_length: u64,
59 expiry_time: Option<Seconds>,
61 sync_wait_time: Seconds,
63 consistency_model: ConsistencyModel,
66 data_aging_period: Seconds,
69 },
70 Default,
74}
75
76#[derive(Clone, Debug)]
79pub struct ReplBufferData {
80 pub data: StringVector,
82 pub lamport_clock: Nonce,
84 pub outgoing_timestamp: Seconds,
86 pub incoming_timestamp: Seconds,
88 pub message_id: String,
90 pub sender: PeerId,
92 pub confirmations: Option<Nonce>,
95}
96
97impl Ord for ReplBufferData {
99 fn cmp(&self, other: &Self) -> Ordering {
100 self.lamport_clock
101 .cmp(&other.lamport_clock) .then_with(|| self.message_id.cmp(&other.message_id)) }
104}
105
106impl PartialOrd for ReplBufferData {
108 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
109 Some(self.cmp(other))
110 }
111}
112
113impl Eq for ReplBufferData {}
115
116impl PartialEq for ReplBufferData {
118 fn eq(&self, other: &Self) -> bool {
119 self.lamport_clock == other.lamport_clock && self.message_id == other.message_id
120 }
121}
122
123pub(crate) struct ReplicaBufferQueue {
125 config: ReplNetworkConfig,
127 temporary_queue: Mutex<BTreeMap<String, BTreeMap<String, ReplBufferData>>>,
131 queue: Mutex<BTreeMap<String, BTreeSet<ReplBufferData>>>,
133}
134
135impl ReplicaBufferQueue {
136 const MAX_CAPACITY: u64 = 150;
138
139 const EXPIRY_TIME: Seconds = 60;
141
142 const SYNC_WAIT_TIME: Seconds = 5;
144
145 const DATA_AGING_PERIOD: Seconds = 5;
147
148 pub fn new(config: ReplNetworkConfig) -> Self {
150 Self {
151 config,
152 temporary_queue: Mutex::new(Default::default()),
153 queue: Mutex::new(Default::default()),
154 }
155 }
156
157 pub fn consistency_model(&self) -> ConsistencyModel {
159 match self.config {
160 ReplNetworkConfig::Default => ConsistencyModel::Eventual,
162 ReplNetworkConfig::Custom {
163 consistency_model, ..
164 } => consistency_model,
165 }
166 }
167
168 pub async fn init(&self, repl_network: String) {
171 let mut queue = self.queue.lock().await;
173 queue.insert(repl_network.clone(), Default::default());
174
175 if self.consistency_model() != ConsistencyModel::Eventual {
177 let mut queue = self.temporary_queue.lock().await;
178 queue.insert(repl_network.clone(), Default::default());
179 }
180 }
181
182 pub async fn push(&self, mut core: Core, replica_network: String, data: ReplBufferData) {
184 match self.config {
186 ReplNetworkConfig::Default => {
188 let mut queue = self.queue.lock().await;
190
191 let queue = queue.entry(replica_network).or_default();
194
195 while queue.len() as u64 >= Self::MAX_CAPACITY {
197 let current_time = SystemTime::now()
199 .duration_since(SystemTime::UNIX_EPOCH)
200 .unwrap()
201 .as_secs();
202 let mut expired_items = Vec::new();
203
204 for entry in queue.iter() {
206 if current_time.saturating_sub(entry.outgoing_timestamp)
207 >= Self::EXPIRY_TIME
208 {
209 expired_items.push(entry.clone());
210 }
211 }
212
213 for expired in expired_items {
215 queue.remove(&expired);
216 }
217
218 if queue.len() as u64 >= Self::MAX_CAPACITY {
220 if let Some(first) = queue.iter().next().cloned() {
221 queue.remove(&first);
222 }
223 }
224 }
225
226 queue.insert(data);
228 },
229 ReplNetworkConfig::Custom {
231 queue_length,
232 expiry_time,
233 consistency_model,
234 ..
235 } => {
236 match consistency_model {
239 ConsistencyModel::Eventual => {
242 let mut queue = self.queue.lock().await;
244
245 let queue = queue.entry(replica_network).or_default();
248
249 while queue.len() as u64 >= queue_length {
251 if let Some(expiry_time) = expiry_time {
253 let current_time = SystemTime::now()
255 .duration_since(SystemTime::UNIX_EPOCH)
256 .unwrap()
257 .as_secs();
258 let mut expired_items = Vec::new();
259
260 for entry in queue.iter() {
262 if current_time.saturating_sub(entry.outgoing_timestamp)
263 >= expiry_time
264 {
265 expired_items.push(entry.clone());
266 }
267 }
268
269 for expired in expired_items {
271 queue.remove(&expired);
272 }
273 }
274
275 if queue.len() as u64 >= queue_length {
277 if let Some(first) = queue.iter().next().cloned() {
278 queue.remove(&first);
279 }
280 }
281 }
282
283 queue.insert(data);
285 },
286 ConsistencyModel::Strong(consensus_model) => {
290 let mut temp_queue = self.temporary_queue.lock().await;
292
293 let temp_queue = temp_queue.entry(replica_network.clone()).or_default();
296
297 if temp_queue.len() as u64 >= Self::MAX_CAPACITY {
299 if let Some(first_key) = temp_queue.keys().next().cloned() {
300 temp_queue.remove(&first_key);
301 }
302 }
303
304 let replica_peers = core.replica_peers(&replica_network).await.len();
307
308 if replica_peers == 1 || consensus_model == ConsensusModel::MinPeers(1) {
311 let mut queue = self.queue.lock().await;
312 let entry = queue.entry(replica_network.clone()).or_default();
313
314 entry.insert(data);
316 } else {
317 let message_id = data.message_id.clone();
319
320 temp_queue.insert(data.message_id.clone(), data);
322
323 let message = vec![
327 Core::STRONG_CONSISTENCY_FLAG.as_bytes().to_vec(), replica_network.clone().into(), message_id.as_bytes().into(), ];
331
332 let gossip_request = AppData::GossipsubBroadcastMessage {
334 topic: replica_network.into(),
335 message,
336 };
337
338 let _ = core.query_network(gossip_request).await;
340 }
341 },
342 }
343 },
344 }
345 }
346
347 pub async fn pop_front(&self, core: Core, replica_network: &str) -> Option<ReplBufferData> {
349 let first_data = {
351 let mut queue = self.queue.lock().await;
352
353 if let Some(queue) = queue.get_mut(replica_network) {
354 if let Some(first) = queue.iter().next().cloned() {
355 queue.remove(&first);
357 Some(first)
358 } else {
359 None
360 }
361 } else {
362 None
363 }
364 };
365
366 let first = first_data?;
368
369 {
371 let mut cfg = core.network_info.replication.state.lock().await;
372
373 let entry = cfg
374 .entry(replica_network.to_owned())
375 .or_insert_with(|| ReplConfigData {
376 lamport_clock: 0,
377 last_clock: first.lamport_clock,
378 nodes: Default::default(),
379 });
380
381 entry.last_clock = first.lamport_clock;
383 }
384
385 Some(first)
386 }
387
388 pub async fn handle_data_confirmation(
389 &self,
390 mut core: Core,
391 replica_network: String,
392 message_id: String,
393 ) {
394 let peers_count = match self.config {
396 ReplNetworkConfig::Custom {
397 consistency_model, ..
398 } => match consistency_model {
399 ConsistencyModel::Eventual => 0,
400 ConsistencyModel::Strong(consensus_model) => match consensus_model {
401 ConsensusModel::All => {
402 core.replica_peers(&replica_network).await.len() as u64
404 },
405 ConsensusModel::MinPeers(required_peers) => required_peers,
406 },
407 },
408 ReplNetworkConfig::Default => 0,
409 };
410
411 let is_fully_confirmed = {
413 let mut flag = false;
414 let mut temporary_queue = self.temporary_queue.lock().await;
415 if let Some(temp_queue) = temporary_queue.get_mut(&replica_network) {
416 if let Some(data_entry) = temp_queue.get_mut(&message_id) {
417 if data_entry.confirmations.unwrap() < peers_count {
418 data_entry.confirmations = Some(data_entry.confirmations.unwrap_or(1) + 1);
420 }
421 flag = peers_count != 0 && data_entry.confirmations == Some(peers_count);
423 }
424 }
425
426 flag
427 };
428
429 if is_fully_confirmed {
431 let mut public_queue = self.queue.lock().await;
432 let public_queue = public_queue
433 .entry(replica_network.clone())
434 .or_insert_with(BTreeSet::new);
435
436 if let ReplNetworkConfig::Custom {
438 queue_length,
439 expiry_time,
440 ..
441 } = self.config
442 {
443 if public_queue.len() as u64 >= queue_length {
445 let current_time = SystemTime::now()
446 .duration_since(SystemTime::UNIX_EPOCH)
447 .unwrap()
448 .as_secs();
449
450 if let Some(expiry_time) = expiry_time {
452 public_queue.retain(|entry| {
453 current_time.saturating_sub(entry.outgoing_timestamp) < expiry_time
454 });
455 }
456 }
457
458 while public_queue.len() as u64 >= queue_length {
460 if let Some(first) = public_queue.iter().next().cloned() {
461 public_queue.remove(&first);
462 }
463 }
464 }
465
466 let mut temporary_queue = self.temporary_queue.lock().await;
468 if let Some(temp_queue) = temporary_queue.get_mut(&replica_network) {
469 if let Some(data_entry) = temp_queue.remove(&message_id) {
470 public_queue.insert(data_entry);
471 }
472 }
473 }
474 }
475
476 pub async fn sync_with_eventual_consistency(&self, core: Core, repl_network: String) {
478 loop {
479 let repl_network = repl_network.clone();
480 let mut core = core.clone();
481
482 let data_aging_time = match self.config {
484 ReplNetworkConfig::Default => Self::DATA_AGING_PERIOD,
485 ReplNetworkConfig::Custom {
486 data_aging_period, ..
487 } => data_aging_period,
488 };
489
490 let local_data_state = {
492 let queue = self.queue.lock().await;
493 queue.get(&repl_network).cloned()
494 };
495
496 if let Some(local_data_state) = local_data_state {
497 let local_data = local_data_state
499 .iter()
500 .filter(|&d| {
501 util::get_unix_timestamp().saturating_sub(d.incoming_timestamp)
502 > data_aging_time
503 })
504 .cloned()
505 .collect::<BTreeSet<_>>();
506
507 let (min_clock, max_clock) =
509 if let (Some(first), Some(last)) = (local_data.first(), local_data.last()) {
510 (first.lamport_clock, last.lamport_clock)
511 } else {
512 (0, 0)
514 };
515
516 let mut message_ids = local_data
518 .iter()
519 .map(|data| {
520 let id = data.message_id.clone()
523 + Core::ENTRY_DELIMITER
524 + &data.sender.to_string();
525 id.into()
526 })
527 .collect::<ByteVector>();
528
529 let mut message = vec![
531 Core::EVENTUAL_CONSISTENCY_FLAG.as_bytes().to_vec(),
533 core.peer_id().to_string().into(),
535 repl_network.clone().into(),
536 min_clock.to_string().into(),
537 max_clock.to_string().into(),
538 ];
539
540 message.append(&mut message_ids);
542
543 let gossip_request = AppData::GossipsubBroadcastMessage {
545 topic: repl_network.into(),
546 message,
547 };
548
549 let _ = core.query_network(gossip_request).await;
550 }
551
552 #[cfg(feature = "tokio-runtime")]
554 tokio::time::sleep(Duration::from_secs(Self::SYNC_WAIT_TIME)).await;
555
556 #[cfg(feature = "async-std-runtime")]
557 async_std::task::sleep(Duration::from_secs(Self::SYNC_WAIT_TIME)).await;
558 }
559 }
560
561 pub async fn sync_buffer_image(
563 &self,
564 mut core: Core,
565 repl_peer_id: PeerIdString,
566 repl_network: String,
567 lamports_clock_bound: (u64, u64),
568 replica_data_state: StringVector,
569 ) {
570 let state = core.network_info.replication.state.lock().await;
573 if let Some(state) = state.get(&repl_network) {
574 if state.last_clock >= lamports_clock_bound.1 {
575 return;
576 }
577 }
578
579 drop(state);
581
582 let replica_buffer_state = replica_data_state
587 .into_iter()
588 .filter(|id| !id.contains(&core.peer_id().to_string()))
589 .map(|id| {
590 let msg_id = id.split(Core::ENTRY_DELIMITER).collect::<Vec<_>>()[0];
592 msg_id.into()
593 })
594 .collect::<BTreeSet<_>>();
595
596 let mut missing_msgs = {
599 let mut queue = self.queue.lock().await;
600 if let Some(local_state) = queue.get_mut(&repl_network) {
601 let local_buffer_state = local_state
602 .iter()
603 .filter(|data| {
604 data.lamport_clock >= lamports_clock_bound.0
605 && data.lamport_clock <= lamports_clock_bound.1
606 })
607 .map(|data| data.message_id.clone())
608 .collect::<BTreeSet<_>>();
609
610 replica_buffer_state
612 .difference(&local_buffer_state)
613 .cloned()
614 .map(|id| id.into())
615 .collect::<ByteVector>()
616 } else {
617 return; }
619 };
620
621 if !missing_msgs.is_empty() {
622 if let Ok(repl_peer_id) = repl_peer_id.parse::<PeerId>() {
624 let mut rpc_data: ByteVector = vec![
625 Core::RPC_SYNC_PULL_FLAG.into(), repl_network.clone().into(), ];
628
629 rpc_data.append(&mut missing_msgs);
631
632 let fetch_request = AppData::SendRpc {
634 keys: rpc_data,
635 peer: repl_peer_id,
636 };
637
638 if let Ok(response) = core.query_network(fetch_request).await {
640 if let AppResponse::SendRpc(messages) = response {
641 let response = util::unmarshal_messages(messages);
643
644 let mut queue = self.queue.lock().await;
646 if let Some(local_state) = queue.get_mut(&repl_network) {
647 for missing_msg in response {
648 local_state.insert(missing_msg);
649 }
650 }
651 }
652 }
653 }
654 }
655 }
656
657 pub async fn pull_missing_data(
659 &self,
660 repl_network: String,
661 message_ids: &[Vec<u8>],
662 ) -> ByteVector {
663 let local_state = {
665 let queue = self.queue.lock().await;
666 queue.get(&repl_network).cloned()
667 };
668
669 if let Some(local_state) = local_state {
671 let requested_msgs = if message_ids[0].is_empty() {
673 local_state.iter().collect::<Vec<_>>()
675 } else {
676 local_state
678 .iter()
679 .filter(|&data| message_ids.contains(&data.message_id.as_bytes().to_vec()))
680 .collect::<Vec<_>>()
681 };
682
683 let mut result = Vec::new();
685
686 for msg in requested_msgs {
687 let joined_data = msg.data.join(Core::DATA_DELIMITER);
690
691 let mut entry = Vec::new();
693 entry.extend_from_slice(joined_data.as_bytes());
694 entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
695 entry.extend_from_slice(msg.lamport_clock.to_string().as_bytes());
696 entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
697 entry.extend_from_slice(msg.outgoing_timestamp.to_string().as_bytes());
698 entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
699 entry.extend_from_slice(msg.incoming_timestamp.to_string().as_bytes());
700 entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
701 entry.extend_from_slice(msg.message_id.as_bytes());
702 entry.extend_from_slice(Core::FIELD_DELIMITER.to_string().as_bytes());
703 entry.extend_from_slice(msg.sender.to_base58().as_bytes());
704
705 if !result.is_empty() {
707 result.extend_from_slice(Core::ENTRY_DELIMITER.to_string().as_bytes());
708 }
709 result.extend(entry);
710 }
711
712 return vec![result];
713 }
714
715 Default::default()
717 }
718
719 pub async fn replicate_buffer(
721 &self,
722 mut core: Core,
723 repl_network: String,
724 replica_node: PeerId,
725 ) -> Result<(), NetworkError> {
726 let rpc_data: ByteVector = vec![
728 Core::RPC_SYNC_PULL_FLAG.into(),
731 repl_network.clone().into(), vec![], ];
734
735 let fetch_request = AppData::SendRpc {
737 keys: rpc_data,
738 peer: replica_node,
739 };
740
741 let mut queue = self.queue.lock().await;
743 match queue.get_mut(&repl_network) {
744 Some(local_state) => {
745 match core.query_network(fetch_request).await? {
747 AppResponse::SendRpc(messages) => {
748 let response = util::unmarshal_messages(messages);
750 for missing_msg in response {
752 local_state.insert(missing_msg);
753 }
754
755 Ok(())
756 },
757 AppResponse::Error(err) => Err(err),
758 _ => Err(NetworkError::RpcDataFetchError),
759 }
760 },
761 None => Err(NetworkError::MissingReplNetwork),
762 }
763 }
764}
765
766#[cfg(test)]
767mod tests {
768
769 use super::*;
771
772 const CUSTOM_TCP_PORT: Port = 49666;
774 const CUSTOM_UDP_PORT: Port = 49852;
775
776 pub async fn setup_node(ports: (Port, Port)) -> Core {
778 let config = BootstrapConfig::default()
779 .with_tcp(ports.0)
780 .with_udp(ports.1);
781
782 CoreBuilder::with_config(config).build().await.unwrap()
784 }
785
786 #[test]
787 fn test_initialization_with_default_config() {
788 let buffer = ReplicaBufferQueue::new(ReplNetworkConfig::Default);
789
790 match buffer.consistency_model() {
791 ConsistencyModel::Eventual => assert!(true),
792 _ => panic!("Consistency model not initialized correctly"),
793 }
794 }
795
796 #[test]
797 fn test_initialization_with_custom_config() {
798 let config = ReplNetworkConfig::Custom {
799 queue_length: 200,
800 expiry_time: Some(120),
801 sync_wait_time: 10,
802 consistency_model: ConsistencyModel::Strong(ConsensusModel::All),
803 data_aging_period: 15,
804 };
805 let buffer = ReplicaBufferQueue::new(config);
806
807 match buffer.consistency_model() {
808 ConsistencyModel::Strong(ConsensusModel::All) => assert!(true),
809 _ => panic!("Consistency model not initialized correctly"),
810 }
811
812 match buffer.config {
814 ReplNetworkConfig::Custom { queue_length, .. } => {
815 assert_eq!(queue_length, 200);
816 },
817 _ => panic!("Queue length not initialized correctly"),
818 }
819
820 match buffer.config {
822 ReplNetworkConfig::Custom { expiry_time, .. } => {
823 assert_eq!(expiry_time, Some(120));
824 },
825 _ => panic!("Expiry time not initialized correctly"),
826 }
827
828 match buffer.config {
830 ReplNetworkConfig::Custom { sync_wait_time, .. } => {
831 assert_eq!(sync_wait_time, 10);
832 },
833 _ => panic!("Sync wait time not initialized correctly"),
834 }
835
836 match buffer.config {
838 ReplNetworkConfig::Custom {
839 data_aging_period, ..
840 } => {
841 assert_eq!(data_aging_period, 15);
842 },
843 _ => panic!("Data aging period not initialized correctly"),
844 }
845 }
846
847 #[test]
850 fn test_buffer_overflow_expiry_behavior() {
851 tokio::runtime::Runtime::new().unwrap().block_on(async {
852 let expiry_period: u64 = 2;
853
854 let config = ReplNetworkConfig::Custom {
855 queue_length: 4,
856 expiry_time: Some(expiry_period), sync_wait_time: 10,
858 consistency_model: ConsistencyModel::Eventual,
859 data_aging_period: 10,
860 };
861
862 let network = setup_node((CUSTOM_TCP_PORT, CUSTOM_UDP_PORT)).await;
863 let buffer = ReplicaBufferQueue::new(config);
864
865 for clock in 1..5 {
867 let data = ReplBufferData {
868 data: vec!["Data 1".into()],
869 lamport_clock: clock,
870 outgoing_timestamp: util::get_unix_timestamp(),
871 incoming_timestamp: util::get_unix_timestamp(),
872 message_id: "msg1".into(),
873 sender: PeerId::random(),
874 confirmations: None,
875 };
876
877 buffer
878 .push(network.clone(), "network1".into(), data.clone())
879 .await;
880 }
881
882 assert_eq!(
884 buffer
885 .pop_front(network.clone(), "network1")
886 .await
887 .unwrap()
888 .lamport_clock,
889 1
890 );
891
892 tokio::time::sleep(std::time::Duration::from_secs(expiry_period)).await; assert_eq!(buffer.queue.lock().await.get("network1").unwrap().len(), 3);
896
897 buffer
899 .push(
900 network.clone(),
901 "network1".into(),
902 ReplBufferData {
903 data: vec!["Data 1".into()],
904 lamport_clock: 6,
905 outgoing_timestamp: util::get_unix_timestamp(),
906 incoming_timestamp: util::get_unix_timestamp(),
907 message_id: "msg1".into(),
908 sender: PeerId::random(),
909 confirmations: None,
910 },
911 )
912 .await;
913
914 assert_eq!(buffer.queue.lock().await.get("network1").unwrap().len(), 4);
916
917 buffer
919 .push(
920 network.clone(),
921 "network1".into(),
922 ReplBufferData {
923 data: vec!["Data 1".into()],
924 lamport_clock: 42,
925 outgoing_timestamp: util::get_unix_timestamp(),
926 incoming_timestamp: util::get_unix_timestamp(),
927 message_id: "msg1".into(),
928 sender: PeerId::random(),
929 confirmations: None,
930 },
931 )
932 .await;
933
934 assert_eq!(
936 buffer
937 .pop_front(network.clone(), "network1")
938 .await
939 .unwrap()
940 .lamport_clock,
941 6
942 );
943 assert_eq!(
944 buffer
945 .pop_front(network.clone(), "network1")
946 .await
947 .unwrap()
948 .lamport_clock,
949 42
950 );
951 });
952 }
953
954 #[test]
955 fn test_buffer_overflow_no_expiry_behavior() {
956 tokio::runtime::Runtime::new().unwrap().block_on(async {
957 let config = ReplNetworkConfig::Custom {
958 queue_length: 4,
959 expiry_time: None, sync_wait_time: 10,
961 consistency_model: ConsistencyModel::Eventual,
962 data_aging_period: 10,
963 };
964
965 let network = setup_node((15555, 6666)).await;
966 let buffer = ReplicaBufferQueue::new(config);
967
968 for clock in 1..5 {
969 let data = ReplBufferData {
970 data: vec!["Data 1".into()],
971 lamport_clock: clock,
972 outgoing_timestamp: util::get_unix_timestamp(),
973 incoming_timestamp: util::get_unix_timestamp(),
974 message_id: "msg1".into(),
975 sender: PeerId::random(),
976 confirmations: None,
977 };
978
979 buffer
980 .push(network.clone(), "network1".into(), data.clone())
981 .await;
982 }
983
984 assert_eq!(
986 buffer
987 .pop_front(network.clone(), "network1")
988 .await
989 .unwrap()
990 .lamport_clock,
991 1
992 );
993
994 buffer
995 .push(
996 network.clone(),
997 "network1".into(),
998 ReplBufferData {
999 data: vec!["Data 1".into()],
1000 lamport_clock: 6,
1001 outgoing_timestamp: util::get_unix_timestamp(),
1002 incoming_timestamp: util::get_unix_timestamp(),
1003 message_id: "msg1".into(),
1004 sender: PeerId::random(),
1005 confirmations: None,
1006 },
1007 )
1008 .await;
1009
1010 assert_eq!(
1012 buffer
1013 .pop_front(network.clone(), "network1")
1014 .await
1015 .unwrap()
1016 .lamport_clock,
1017 2
1018 );
1019 assert_eq!(
1020 buffer
1021 .pop_front(network.clone(), "network1")
1022 .await
1023 .unwrap()
1024 .lamport_clock,
1025 3
1026 );
1027 });
1028 }
1029
1030 #[test]
1031 fn test_pop_from_empty_buffer() {
1032 tokio::runtime::Runtime::new().unwrap().block_on(async {
1033 let config = ReplNetworkConfig::Default;
1034 let buffer = ReplicaBufferQueue::new(config);
1035
1036 let network = setup_node((15551, 6661)).await;
1037
1038 let result = buffer.pop_front(network.clone(), "network1").await;
1039 assert_eq!(result.is_none(), true);
1040 });
1041 }
1042}