swarm_nl/core/
mod.rs

1// Copyright 2024 Algorealm, Inc.
2// Apache 2.0 License
3
4//! Core data structures and protocol implementations for building a swarm.
5
6#![doc = include_str!("../../doc/core/NetworkBuilder.md")]
7#![doc = include_str!("../../doc/core/ApplicationInteraction.md")]
8#![doc = include_str!("../../doc/core/EventHandling.md")]
9#![doc = include_str!("../../doc/core/Replication.md")]
10
11use std::{
12	cmp,
13	collections::{vec_deque::IntoIter, BTreeSet, HashMap, HashSet},
14	fs,
15	net::IpAddr,
16	num::NonZeroU32,
17	path::Path,
18	sync::Arc,
19	time::Duration,
20};
21
22use base58::FromBase58;
23
24use futures::{
25	channel::mpsc::{self, Receiver, Sender},
26	select, SinkExt, StreamExt,
27};
28use libp2p::{
29	gossipsub::{self, IdentTopic, TopicHash},
30	identify::{self},
31	kad::{self, store::MemoryStore, Mode, Record, RecordKey},
32	multiaddr::Protocol,
33	noise,
34	ping::{self, Failure},
35	request_response::{self, cbor::Behaviour, ProtocolSupport},
36	swarm::{NetworkBehaviour, SwarmEvent},
37	tcp, tls, yamux, Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
38};
39use replication::{
40	ConsistencyModel, ReplBufferData, ReplConfigData, ReplInfo, ReplNetworkConfig,
41	ReplicaBufferQueue,
42};
43
44use self::{
45	gossipsub_cfg::{Blacklist, GossipsubConfig, GossipsubInfo},
46	ping_config::*,
47	sharding::{DefaultShardStorage, ShardStorage, ShardingInfo},
48};
49
50use super::*;
51use crate::{setup::BootstrapConfig, util::*};
52
53#[cfg(feature = "async-std-runtime")]
54use async_std::sync::Mutex;
55
56#[cfg(feature = "tokio-runtime")]
57use tokio::sync::Mutex;
58
59pub(crate) mod prelude;
60pub use prelude::*;
61pub mod replication;
62pub mod sharding;
63
64#[cfg(test)]
65mod tests;
66
67/// The Core Behaviour implemented which highlights the various protocols
68/// we'll be adding support for.
69#[derive(NetworkBehaviour)]
70#[behaviour(to_swarm = "CoreEvent")]
71struct CoreBehaviour {
72	request_response: request_response::cbor::Behaviour<Rpc, Rpc>,
73	kademlia: kad::Behaviour<MemoryStore>,
74	ping: ping::Behaviour,
75	identify: identify::Behaviour,
76	gossipsub: gossipsub::Behaviour,
77}
78
79/// Network events generated as a result of supported and configured [`NetworkBehaviour`]'s
80#[derive(Debug)]
81enum CoreEvent {
82	Ping(ping::Event),
83	RequestResponse(request_response::Event<Rpc, Rpc>),
84	Kademlia(kad::Event),
85	Identify(identify::Event),
86	Gossipsub(gossipsub::Event),
87}
88
89/// Implement ping events for [`CoreEvent`].
90impl From<ping::Event> for CoreEvent {
91	fn from(event: ping::Event) -> Self {
92		CoreEvent::Ping(event)
93	}
94}
95
96/// Implement kademlia events for [`CoreEvent`].
97impl From<kad::Event> for CoreEvent {
98	fn from(event: kad::Event) -> Self {
99		CoreEvent::Kademlia(event)
100	}
101}
102
103/// Implement identify events for [`CoreEvent`].
104impl From<identify::Event> for CoreEvent {
105	fn from(event: identify::Event) -> Self {
106		CoreEvent::Identify(event)
107	}
108}
109
110/// Implement request_response events for [`CoreEvent`].
111impl From<request_response::Event<Rpc, Rpc>> for CoreEvent {
112	fn from(event: request_response::Event<Rpc, Rpc>) -> Self {
113		CoreEvent::RequestResponse(event)
114	}
115}
116
117/// Implement gossipsub events for [`CoreEvent`].
118impl From<gossipsub::Event> for CoreEvent {
119	fn from(event: gossipsub::Event) -> Self {
120		CoreEvent::Gossipsub(event)
121	}
122}
123
124/// Structure containing necessary data to build [`Core`].
125pub struct CoreBuilder {
126	/// The network ID of the network.
127	network_id: StreamProtocol,
128	/// The cryptographic keypair of the node.
129	keypair: Keypair,
130	/// The TCP and UDP ports to listen on.
131	tcp_udp_port: (Port, Port),
132	/// The bootnodes to connect to.
133	boot_nodes: HashMap<PeerIdString, MultiaddrString>,
134	/// The blacklist of peers to ignore.
135	blacklist: Blacklist,
136	/// The size of the stream buffers to use to track application requests to the network layer
137	/// internally.
138	stream_size: usize,
139	/// The IP address to listen on.f
140	ip_address: IpAddr,
141	/// Connection keep-alive duration while idle.
142	keep_alive_duration: Seconds,
143	/// The transport protocols being used.
144	/// TODO: This can be a collection in the future to support additive transports.
145	transport: TransportOpts,
146	/// The `Behaviour` of the `Ping` protocol.
147	ping: (ping::Behaviour, PingErrorPolicy),
148	/// The `Behaviour` of the `Kademlia` protocol.
149	kademlia: kad::Behaviour<kad::store::MemoryStore>,
150	/// The `Behaviour` of the `Identify` protocol.
151	identify: identify::Behaviour,
152	/// The `Behaviour` of the `Request-Response` protocol. The second field value is the function
153	/// to handle an incoming request from a peer.
154	request_response: (Behaviour<Rpc, Rpc>, fn(RpcData) -> RpcData),
155	/// The `Behaviour` of the `GossipSub` protocol. The second tuple value is a filter function
156	/// that filters incoming gossip messages before passing them to the application.
157	gossipsub: (
158		gossipsub::Behaviour,
159		fn(PeerId, MessageId, Option<PeerId>, String, Vec<String>) -> bool,
160	),
161	/// The network data for replication operations
162	replication_cfg: ReplNetworkConfig,
163	/// The name of the entire shard network. This is important for quick broadcasting of changes
164	/// in the shard network as a whole.
165	sharding: ShardingInfo,
166}
167
168impl CoreBuilder {
169	/// Return a [`CoreBuilder`] struct configured with [`BootstrapConfig`] and default values.
170	/// Here, it is certain that [`BootstrapConfig`] contains valid data.
171	pub fn with_config(config: BootstrapConfig) -> Self {
172		// The default network id
173		let network_id = DEFAULT_NETWORK_ID;
174
175		// The default transports (TCP/IP and QUIC)
176		let default_transport = TransportOpts::TcpQuic {
177			tcp_config: TcpConfig::Default,
178		};
179
180		// The peer ID of the node
181		let peer_id = config.keypair().public().to_peer_id();
182
183		// Set up default config for Kademlia
184		let mut cfg = kad::Config::default();
185		cfg.set_protocol_names(vec![StreamProtocol::new(network_id)]);
186		let kademlia = kad::Behaviour::new(peer_id, kad::store::MemoryStore::new(peer_id));
187
188		// Set up default config for Kademlia
189		let cfg = identify::Config::new(network_id.to_owned(), config.keypair().public())
190			.with_push_listen_addr_updates(true);
191		let identify = identify::Behaviour::new(cfg);
192
193		// Set up default config for Request-Response
194		let request_response_behaviour = Behaviour::new(
195			[(StreamProtocol::new(network_id), ProtocolSupport::Full)],
196			request_response::Config::default(),
197		);
198
199		// Set up default config for gossiping
200		let cfg = gossipsub::Config::default();
201		let gossipsub_behaviour = gossipsub::Behaviour::new(
202			gossipsub::MessageAuthenticity::Signed(config.keypair()),
203			cfg,
204		)
205		.map_err(|_| SwarmNlError::GossipConfigError)
206		.unwrap();
207
208		// The filter function to handle incoming gossip data.
209		// The default behaviour is to allow all incoming messages.
210		let gossip_filter_fn = |_, _, _, _, _| true;
211
212		// Set up default config for RPC handling.
213		// The incoming RPC will simply be forwarded back to its sender.
214		let rpc_handler_fn = |incoming_data: RpcData| incoming_data;
215
216		// Initialize struct with information from `BootstrapConfig`
217		CoreBuilder {
218			network_id: StreamProtocol::new(network_id),
219			keypair: config.keypair(),
220			tcp_udp_port: config.ports(),
221			boot_nodes: config.bootnodes(),
222			blacklist: config.blacklist(),
223			stream_size: usize::MAX,
224			// Default is to listen on all interfaces (ipv4).
225			ip_address: IpAddr::V4(DEFAULT_IP_ADDRESS),
226			keep_alive_duration: DEFAULT_KEEP_ALIVE_DURATION,
227			transport: default_transport,
228			// The peer will be disconnected after 20 successive timeout errors are recorded
229			ping: (
230				Default::default(),
231				PingErrorPolicy::DisconnectAfterMaxTimeouts(20),
232			),
233			kademlia,
234			identify,
235			request_response: (request_response_behaviour, rpc_handler_fn),
236			gossipsub: (gossipsub_behaviour, gossip_filter_fn),
237			replication_cfg: ReplNetworkConfig::Default,
238			sharding: ShardingInfo {
239				id: Default::default(),
240				local_storage: Arc::new(Mutex::new(DefaultShardStorage)),
241				state: Default::default(),
242			},
243		}
244	}
245
246	/// Explicitly configure the network (protocol) id.
247	///
248	/// Note that it must be of the format "/protocol-name/version" otherwise it will default to
249	/// "/swarmnl/1.0". See: [`DEFAULT_NETWORK_ID`].
250	pub fn with_network_id(self, protocol: String) -> Self {
251		if protocol.len() > MIN_NETWORK_ID_LENGTH.into() && protocol.starts_with("/") {
252			CoreBuilder {
253				network_id: StreamProtocol::try_from_owned(protocol.clone())
254					.map_err(|_| SwarmNlError::NetworkIdParseError(protocol))
255					.unwrap(),
256				..self
257			}
258		} else {
259			panic!("Could not parse provided network id");
260		}
261	}
262
263	/// Configure the IP address to listen on.
264	///
265	/// If none is specified, the default value is `Ipv4Addr::new(0, 0, 0, 0)`. See:
266	/// [`DEFAULT_IP_ADDRESS`].
267	pub fn listen_on(self, ip_address: IpAddr) -> Self {
268		CoreBuilder { ip_address, ..self }
269	}
270
271	/// Configure how long to keep a connection alive (in seconds) once it is idling.
272	pub fn with_idle_connection_timeout(self, keep_alive_duration: Seconds) -> Self {
273		CoreBuilder {
274			keep_alive_duration,
275			..self
276		}
277	}
278
279	/// Configure the size of the stream buffers to use to track application requests to the network
280	/// layer internally. This should be as large an possible to prevent dropping off requests to
281	/// the network layer. Defaults to [`usize::MAX`].
282	pub fn with_stream_size(self, size: usize) -> Self {
283		CoreBuilder {
284			stream_size: size,
285			..self
286		}
287	}
288
289	/// Configure the `Ping` protocol for the network.
290	pub fn with_ping(self, config: PingConfig) -> Self {
291		// Set the ping protocol
292		CoreBuilder {
293			ping: (
294				ping::Behaviour::new(
295					ping::Config::new()
296						.with_interval(config.interval)
297						.with_timeout(config.timeout),
298				),
299				config.err_policy,
300			),
301			..self
302		}
303	}
304
305	/// Configure the `Replication` protocol for the network.
306	pub fn with_replication(mut self, repl_cfg: ReplNetworkConfig) -> Self {
307		self.replication_cfg = repl_cfg;
308		CoreBuilder { ..self }
309	}
310
311	/// Configure the `Sharding` protocol for the network.
312	pub fn with_sharding<T: ShardStorage + 'static>(
313		self,
314		network_id: String,
315		local_shard_storage: Arc<Mutex<T>>,
316	) -> Self {
317		CoreBuilder {
318			sharding: ShardingInfo {
319				id: network_id,
320				local_storage: local_shard_storage,
321				state: Default::default(),
322			},
323			..self
324		}
325	}
326
327	/// Configure the RPC protocol for the network.
328	pub fn with_rpc(self, config: RpcConfig, handler: fn(RpcData) -> RpcData) -> Self {
329		// Set the request-response protocol
330		CoreBuilder {
331			request_response: (
332				match config {
333					RpcConfig::Default => self.request_response.0,
334					RpcConfig::Custom {
335						timeout,
336						max_concurrent_streams,
337					} => Behaviour::new(
338						[(self.network_id.clone(), ProtocolSupport::Full)],
339						request_response::Config::default()
340							.with_request_timeout(timeout)
341							.with_max_concurrent_streams(max_concurrent_streams),
342					),
343				},
344				handler,
345			),
346			..self
347		}
348	}
349
350	/// Configure the `Kademlia` protocol for the network.
351	pub fn with_kademlia(self, config: kad::Config) -> Self {
352		// PeerId
353		let peer_id = self.keypair.public().to_peer_id();
354		let store = kad::store::MemoryStore::new(peer_id);
355		let kademlia = kad::Behaviour::with_config(peer_id, store, config);
356
357		CoreBuilder { kademlia, ..self }
358	}
359
360	/// Configure the `Gossipsub` protocol for the network.
361	///
362	/// # Panics
363	///
364	/// This function panics if `Gossipsub` cannot be configured properly.
365	pub fn with_gossipsub(
366		self,
367		config: GossipsubConfig,
368		filter_fn: fn(PeerId, MessageId, Option<PeerId>, String, Vec<String>) -> bool,
369	) -> Self {
370		let behaviour = match config {
371			GossipsubConfig::Default => self.gossipsub.0,
372			GossipsubConfig::Custom { config, auth } => gossipsub::Behaviour::new(auth, config)
373				.map_err(|_| SwarmNlError::GossipConfigError)
374				.unwrap(),
375		};
376
377		CoreBuilder {
378			gossipsub: (behaviour, filter_fn),
379			..self
380		}
381	}
382
383	/// Configure the transports to support.
384	pub fn with_transports(self, transport: TransportOpts) -> Self {
385		CoreBuilder { transport, ..self }
386	}
387
388	/// Return the id of the network.
389	pub fn network_id(&self) -> String {
390		self.network_id.to_string()
391	}
392
393	/// Build the [`Core`] data structure.
394	///
395	/// Handles the configuration of the libp2p Swarm structure and the selected transport
396	/// protocols, behaviours and node identity for tokio and async-std runtimes. The Swarm is
397	/// wrapped in the Core construct which serves as the interface to interact with the internal
398	/// networking layer.
399	pub async fn build(self) -> SwarmNlResult<Core> {
400		#[cfg(feature = "async-std-runtime")]
401		let mut swarm = {
402			// Configure transports for default and custom configurations
403			let swarm_builder: SwarmBuilder<_, _> = match self.transport {
404				TransportOpts::TcpQuic { tcp_config } => match tcp_config {
405					TcpConfig::Default => {
406						libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
407							.with_async_std()
408							.with_tcp(
409								tcp::Config::default(),
410								(tls::Config::new, noise::Config::new),
411								yamux::Config::default,
412							)
413							.map_err(|_| {
414								SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
415									tcp_config: TcpConfig::Default,
416								})
417							})?
418							.with_quic()
419							.with_dns()
420							.await
421							.map_err(|_| SwarmNlError::DNSConfigError)?
422					},
423					TcpConfig::Custom {
424						ttl,
425						nodelay,
426						backlog,
427					} => {
428						let tcp_config = tcp::Config::default()
429							.ttl(ttl)
430							.nodelay(nodelay)
431							.listen_backlog(backlog);
432
433						libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
434							.with_async_std()
435							.with_tcp(
436								tcp_config,
437								(tls::Config::new, noise::Config::new),
438								yamux::Config::default,
439							)
440							.map_err(|_| {
441								SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
442									tcp_config: TcpConfig::Custom {
443										ttl,
444										nodelay,
445										backlog,
446									},
447								})
448							})?
449							.with_quic()
450							.with_dns()
451							.await
452							.map_err(|_| SwarmNlError::DNSConfigError)?
453					},
454				},
455			};
456
457			// Configure the selected protocols and their corresponding behaviours
458			swarm_builder
459				.with_behaviour(|_| CoreBehaviour {
460					ping: self.ping.0,
461					kademlia: self.kademlia,
462					identify: self.identify,
463					request_response: self.request_response.0,
464					gossipsub: self.gossipsub.0,
465				})
466				.map_err(|_| SwarmNlError::ProtocolConfigError)?
467				.with_swarm_config(|cfg| {
468					cfg.with_idle_connection_timeout(Duration::from_secs(self.keep_alive_duration))
469				})
470				.build()
471		};
472
473		#[cfg(feature = "tokio-runtime")]
474		let mut swarm = {
475			let swarm_builder: SwarmBuilder<_, _> = match self.transport {
476				TransportOpts::TcpQuic { tcp_config } => match tcp_config {
477					TcpConfig::Default => {
478						libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
479							.with_tokio()
480							.with_tcp(
481								tcp::Config::default(),
482								(tls::Config::new, noise::Config::new),
483								yamux::Config::default,
484							)
485							.map_err(|_| {
486								SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
487									tcp_config: TcpConfig::Default,
488								})
489							})?
490							.with_quic()
491					},
492					TcpConfig::Custom {
493						ttl,
494						nodelay,
495						backlog,
496					} => {
497						let tcp_config = tcp::Config::default()
498							.ttl(ttl)
499							.nodelay(nodelay)
500							.listen_backlog(backlog);
501
502						libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
503							.with_tokio()
504							.with_tcp(
505								tcp_config,
506								(tls::Config::new, noise::Config::new),
507								yamux::Config::default,
508							)
509							.map_err(|_| {
510								SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
511									tcp_config: TcpConfig::Custom {
512										ttl,
513										nodelay,
514										backlog,
515									},
516								})
517							})?
518							.with_quic()
519					},
520				},
521			};
522
523			// Configure the selected protocols and their corresponding behaviours
524			swarm_builder
525				.with_behaviour(|_| CoreBehaviour {
526					ping: self.ping.0,
527					kademlia: self.kademlia,
528					identify: self.identify,
529					request_response: self.request_response.0,
530					gossipsub: self.gossipsub.0,
531				})
532				.map_err(|_| SwarmNlError::ProtocolConfigError)?
533				.with_swarm_config(|cfg| {
534					cfg.with_idle_connection_timeout(Duration::from_secs(self.keep_alive_duration))
535				})
536				.build()
537		};
538
539		// Configure the transport multiaddress and begin listening.
540		// It can handle multiple future tranports based on configuration e.g, in the future,
541		// WebRTC.
542		match self.transport {
543			// TCP/IP and QUIC
544			TransportOpts::TcpQuic { tcp_config: _ } => {
545				// Configure TCP/IP multiaddress
546				let listen_addr_tcp = Multiaddr::empty()
547					.with(match self.ip_address {
548						IpAddr::V4(address) => Protocol::from(address),
549						IpAddr::V6(address) => Protocol::from(address),
550					})
551					.with(Protocol::Tcp(self.tcp_udp_port.0));
552
553				// Configure QUIC multiaddress
554				let listen_addr_quic = Multiaddr::empty()
555					.with(match self.ip_address {
556						IpAddr::V4(address) => Protocol::from(address),
557						IpAddr::V6(address) => Protocol::from(address),
558					})
559					.with(Protocol::Udp(self.tcp_udp_port.1))
560					.with(Protocol::QuicV1);
561
562				// Begin listening
563				#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
564				swarm.listen_on(listen_addr_tcp.clone()).map_err(|_| {
565					SwarmNlError::MultiaddressListenError(listen_addr_tcp.to_string())
566				})?;
567
568				#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
569				swarm.listen_on(listen_addr_quic.clone()).map_err(|_| {
570					SwarmNlError::MultiaddressListenError(listen_addr_quic.to_string())
571				})?;
572			},
573		}
574
575		// Add bootnodes to local routing table, if any
576		for peer_info in self.boot_nodes {
577			// PeerId
578			if let Some(peer_id) = string_to_peer_id(&peer_info.0) {
579				// Multiaddress
580				if let Ok(multiaddr) = peer_info.1.parse::<Multiaddr>() {
581					// Strange but make sure the peers are not a part of our blacklist
582					if !self.blacklist.list.iter().any(|&id| id == peer_id) {
583						swarm
584							.behaviour_mut()
585							.kademlia
586							.add_address(&peer_id, multiaddr.clone());
587
588						println!("Dailing {}", multiaddr);
589
590						// Dial them
591						swarm
592							.dial(multiaddr.clone().with(Protocol::P2p(peer_id)))
593							.map_err(|_| {
594								SwarmNlError::RemotePeerDialError(multiaddr.to_string())
595							})?;
596					}
597				}
598			}
599		}
600
601		// Begin DHT bootstrap, hopefully bootnodes were supplied
602		let _ = swarm.behaviour_mut().kademlia.bootstrap();
603
604		// Set node as SERVER
605		swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));
606
607		// Register and inform swarm of our blacklist
608		for peer_id in &self.blacklist.list {
609			swarm.behaviour_mut().gossipsub.blacklist_peer(peer_id);
610		}
611
612		// There must be a way for the application to communicate with the underlying networking
613		// core. This will involve accepting and pushing data to the application layer.
614		// Two streams will be opened: The first mpsc stream will allow SwarmNL push data to the
615		// application and the application will consume it (single consumer). The second stream
616		// will have SwarmNl (being the consumer) recieve data and commands from multiple areas
617		// in the application.
618		let (application_sender, network_receiver) =
619			mpsc::channel::<StreamData>(STREAM_BUFFER_CAPACITY);
620		let (network_sender, application_receiver) =
621			mpsc::channel::<StreamData>(STREAM_BUFFER_CAPACITY);
622
623		// Set up the ping network info.
624		// `PeerId` does not implement `Default` so we will add the peerId of this node as seed
625		// and set the count to 0. The count can NEVER increase because we cannot `Ping`
626		// ourselves.
627		let peer_id = self.keypair.public().to_peer_id();
628
629		// Timeouts
630		let mut timeouts = HashMap::<PeerId, u16>::new();
631		timeouts.insert(peer_id.clone(), 0);
632
633		// Outbound errors
634		let mut outbound_errors = HashMap::<PeerId, u16>::new();
635		outbound_errors.insert(peer_id.clone(), 0);
636
637		// Ping manager
638		let manager = PingManager {
639			timeouts,
640			outbound_errors,
641		};
642
643		// Set up Ping network information
644		let ping_info = PingInfo {
645			policy: self.ping.1,
646			manager,
647		};
648
649		// Set up Gossipsub network information
650		let gossip_info = GossipsubInfo {
651			blacklist: self.blacklist,
652		};
653
654		// Set up Replication information
655		let repl_info = ReplInfo {
656			state: Arc::new(Mutex::new(Default::default())),
657		};
658
659		// Initials stream id
660		let stream_id = StreamId::new();
661		let stream_request_buffer =
662			Arc::new(Mutex::new(StreamRequestBuffer::new(self.stream_size)));
663		let stream_response_buffer =
664			Arc::new(Mutex::new(StreamResponseBuffer::new(self.stream_size)));
665
666		// Aggregate the useful network information
667		let network_info = NetworkInfo {
668			id: self.network_id,
669			ping: ping_info,
670			gossipsub: gossip_info,
671			rpc_handler_fn: self.request_response.1,
672			gossip_filter_fn: self.gossipsub.1,
673			replication: repl_info,
674			sharding: self.sharding.clone(),
675		};
676
677		// Build the network core
678		let network_core = Core {
679			keypair: self.keypair,
680			application_sender,
681			stream_request_buffer: stream_request_buffer.clone(),
682			stream_response_buffer: stream_response_buffer.clone(),
683			current_stream_id: Arc::new(Mutex::new(stream_id)),
684			// Initialize an empty event queue
685			event_queue: DataQueue::new(),
686			replica_buffer: Arc::new(ReplicaBufferQueue::new(self.replication_cfg.clone())),
687			network_info,
688		};
689
690		// Check if sharding is configured
691		if !self.sharding.id.is_empty() {
692			// Spin up task to init the network
693			let mut core = network_core.clone();
694			#[cfg(feature = "async-std-runtime")]
695			async_std::task::spawn(
696				async move { core.init_sharding(self.sharding.id.clone()).await },
697			);
698
699			#[cfg(feature = "tokio-runtime")]
700			tokio::task::spawn(async move { core.init_sharding(self.sharding.id.clone()).await });
701		}
702
703		// Spin up task to handle async operations and data on the network
704		#[cfg(feature = "async-std-runtime")]
705		async_std::task::spawn(Core::handle_async_operations(
706			swarm,
707			network_sender,
708			network_receiver,
709			network_core.clone(),
710		));
711
712		// Spin up task to handle async operations and data on the network
713		#[cfg(feature = "tokio-runtime")]
714		tokio::task::spawn(Core::handle_async_operations(
715			swarm,
716			network_sender,
717			network_receiver,
718			network_core.clone(),
719		));
720
721		// Spin up task to listen for responses from the network layer
722		#[cfg(feature = "async-std-runtime")]
723		async_std::task::spawn(Core::handle_network_response(
724			application_receiver,
725			network_core.clone(),
726		));
727
728		// Spin up task to listen for responses from the network layer
729		#[cfg(feature = "tokio-runtime")]
730		tokio::task::spawn(Core::handle_network_response(
731			application_receiver,
732			network_core.clone(),
733		));
734
735		// Wait for a few seconds before passing control to the application
736		#[cfg(feature = "async-std-runtime")]
737		async_std::task::sleep(Duration::from_secs(BOOT_WAIT_TIME)).await;
738
739		// Wait for a few seconds before passing control to the application
740		#[cfg(feature = "tokio-runtime")]
741		tokio::time::sleep(Duration::from_secs(BOOT_WAIT_TIME)).await;
742
743		Ok(network_core)
744	}
745}
746
747/// The core interface for the application layer to interface with the networking layer.
748#[derive(Clone)]
749pub struct Core {
750	keypair: Keypair,
751	/// The producing end of the stream that sends data to the network layer from the
752	/// application.
753	application_sender: Sender<StreamData>,
754	// The consuming end of the stream that recieves data from the network layer.
755	// application_receiver: Receiver<StreamData>,
756	// The producing end of the stream that sends data from the network layer to the application.
757	// network_sender: Sender<StreamData>,
758	/// This serves as a buffer for the results of the requests to the network layer.
759	/// With this, applications can make async requests and fetch their results at a later time
760	/// without waiting. This is made possible by storing a [`StreamId`] for a particular stream
761	/// request.
762	stream_response_buffer: Arc<Mutex<StreamResponseBuffer>>,
763	/// Store a [`StreamId`] representing a network request
764	stream_request_buffer: Arc<Mutex<StreamRequestBuffer>>,
765	/// Current stream id. Useful for opening new streams, we just have to bump the number by 1
766	current_stream_id: Arc<Mutex<StreamId>>,
767	/// The network event queue
768	event_queue: DataQueue<NetworkEvent>,
769	/// The internal buffer storing incoming replicated content before they are expired or consumed
770	replica_buffer: Arc<ReplicaBufferQueue>,
771	/// Important information about the network
772	network_info: NetworkInfo,
773}
774
775impl Core {
776	/// The delimeter that separates the messages to gossip.
777	pub const GOSSIP_MESSAGE_SEPARATOR: &'static str = "~~##~~";
778
779	/// The gossip flag to indicate that incoming gossipsub message is actually data sent for
780	/// replication.
781	pub const REPL_GOSSIP_FLAG: &'static str = "REPL_GOSSIP_FLAG__@@";
782
783	/// The RPC flag to indicate that incoming message is data that has been forwarded to the node
784	/// because it is a member of the logical shard to store the data.
785	pub const RPC_DATA_FORWARDING_FLAG: &'static str = "RPC_DATA_FORWARDING_FLAG__@@";
786
787	/// The gossip flag to indicate that incoming (or outgoing) gossipsub message is a part of the
788	/// strong consistency algorithm, intending to increase the confirmation count of a particular
789	/// data item in the replicas temporary buffer.
790	pub const STRONG_CONSISTENCY_FLAG: &'static str = "STRONG_CON__@@";
791
792	/// The gossip flag to indicate that incoming (or outgoing) gossipsub message is a part of the
793	/// eventual consistency algorithm seeking to synchronize data across nodes.
794	pub const EVENTUAL_CONSISTENCY_FLAG: &'static str = "EVENTUAL_CON_@@";
795
796	/// The RPC flag to pull missing data from a replica node for eventual consistency
797	/// synchronization.
798	pub const RPC_SYNC_PULL_FLAG: &'static str = "RPC_SYNC_PULL_FLAG__@@";
799
800	/// The RPC flag to update the shard network state of a joining node.
801	pub const SHARD_RPC_SYNC_FLAG: &'static str = "SHARD_RPC_SYNC_FLAG__@@";
802
803	/// The sharding gossip flag to indicate that a node has joined a shard network.
804	pub const SHARD_GOSSIP_JOIN_FLAG: &'static str = "SHARD_GOSSIP_JOIN_FLAG__@@";
805
806	/// The sharding gossip flag to indicate that a node has exited a shard network.
807	pub const SHARD_GOSSIP_EXIT_FLAG: &'static str = "SHARD_GOSSIP_EXIT_FLAG__@@";
808
809	/// The RPC flag to request a data from a node in a logical shard.
810	pub const SHARD_RPC_REQUEST_FLAG: &'static str = "SHARD_RPC_REQUEST_FLAG__@@";
811
812	/// The delimeter between the data fields of an entry in a dataset requested by a replica peer.
813	pub const FIELD_DELIMITER: &'static str = "_@_";
814
815	/// The delimeter between the data entries that has been requested by a replica peer.
816	pub const ENTRY_DELIMITER: &'static str = "@@@";
817
818	/// The delimeter to separate messages during RPC data marshalling
819	pub const DATA_DELIMITER: &'static str = "$$";
820
821	/// Serialize keypair to protobuf format and write to config file on disk. This could be useful
822	/// for saving a keypair for future use when going offline.
823	///
824	/// It returns a boolean to indicate success of operation. Only key types other than RSA can be
825	/// serialized to protobuf format and only a single keypair can be saved at a time.
826	pub fn save_keypair_offline<T: AsRef<Path> + ?Sized>(&self, config_file_path: &T) -> bool {
827		// Check the file exists, and create one if not
828		if let Err(_) = fs::metadata(config_file_path) {
829			fs::File::create(config_file_path).expect("could not create config file");
830		}
831
832		// Check if key type is something other than RSA
833		if KeyType::RSA != self.keypair.key_type() {
834			if let Ok(protobuf_keypair) = self.keypair.to_protobuf_encoding() {
835				// Write key type and serialized array key to config file
836				return util::write_config(
837					"auth",
838					"protobuf_keypair",
839					&format!("{:?}", protobuf_keypair),
840					config_file_path,
841				) && util::write_config(
842					"auth",
843					"Crypto",
844					&format!("{}", self.keypair.key_type()),
845					config_file_path,
846				);
847			}
848		}
849
850		false
851	}
852
853	/// Return the node's `PeerId`.
854	pub fn peer_id(&self) -> PeerId {
855		self.keypair.public().to_peer_id()
856	}
857
858	/// Return an iterator to the buffered network layer events and consume them.
859	pub async fn events(&mut self) -> IntoIter<NetworkEvent> {
860		let events = self.event_queue.into_inner().await.into_iter();
861
862		// Clear all buffered events
863		self.event_queue.drain().await;
864		events
865	}
866
867	/// Return the next event in the network event queue.
868	pub async fn next_event(&mut self) -> Option<NetworkEvent> {
869		self.event_queue.pop().await
870	}
871
872	/// Return the number of replica peers in a network, with the node exclusive.
873	pub async fn replica_peers(&mut self, replica_network: &str) -> Vec<PeerId> {
874		let mut peers = Vec::new();
875
876		// Check gossip group
877		let request = AppData::GossipsubGetInfo;
878		if let Ok(response) = self.query_network(request).await {
879			if let AppResponse::GossipsubGetInfo { mesh_peers, .. } = response {
880				for (peer_id, networks) in mesh_peers {
881					if networks.contains(&replica_network.to_string()) {
882						peers.push(peer_id);
883					}
884				}
885			}
886		}
887		peers
888	}
889
890	/// Send data to the network layer and recieve a unique `StreamId` to track the request.
891	///
892	/// If the internal stream buffer is full, `None` will be returned.
893	pub async fn send_to_network(&mut self, app_request: AppData) -> Option<StreamId> {
894		// Generate stream id
895		let stream_id = StreamId::next(*self.current_stream_id.lock().await);
896		let request = StreamData::FromApplication(stream_id, app_request.clone());
897
898		// Only a few requests should be tracked internally
899		match app_request {
900			// Doesn't need any tracking
901			AppData::KademliaDeleteRecord { .. } | AppData::KademliaStopProviding { .. } => {
902				// Send request
903				let _ = self.application_sender.send(request).await;
904				return None;
905			},
906			// Okay with the rest
907			_ => {
908				// Acquire lock on stream_request_buffer
909				let mut stream_request_buffer = self.stream_request_buffer.lock().await;
910
911				// Add to request buffer
912				if !stream_request_buffer.insert(stream_id) {
913					// Buffer appears to be full
914					return None;
915				}
916
917				// Send request
918				if let Ok(_) = self.application_sender.send(request).await {
919					// Store latest stream id
920					*self.current_stream_id.lock().await = stream_id;
921					return Some(stream_id);
922				} else {
923					return None;
924				}
925			},
926		}
927	}
928
929	/// Explicitly retrieve the reponse to a request sent to the network layer.
930	/// This function is decoupled from the [`Core::send_to_network()`] method so as to prevent
931	/// blocking until the response is returned.
932	pub async fn recv_from_network(&mut self, stream_id: StreamId) -> NetworkResult<AppResponse> {
933		#[cfg(feature = "async-std-runtime")]
934		{
935			let channel = self.clone();
936			let response_handler = async_std::task::spawn(async move {
937				let mut loop_count = 0;
938				loop {
939					// Attempt to acquire the lock without blocking
940					let mut buffer_guard = channel.stream_response_buffer.lock().await;
941
942					// Check if the value is available in the response buffer
943					if let Some(result) = buffer_guard.remove(&stream_id) {
944						return Ok(result);
945					}
946
947					// Timeout after 10 trials
948					if loop_count < 10 {
949						loop_count += 1;
950					} else {
951						return Err(NetworkError::NetworkReadTimeout);
952					}
953
954					// Response has not arrived, sleep and retry
955					async_std::task::sleep(Duration::from_secs(TASK_SLEEP_DURATION)).await;
956				}
957			});
958
959			// Wait for the spawned task to complete
960			match response_handler.await {
961				Ok(result) => result,
962				Err(_) => Err(NetworkError::NetworkReadTimeout),
963			}
964		}
965
966		#[cfg(feature = "tokio-runtime")]
967		{
968			let channel = self.clone();
969			let response_handler = tokio::task::spawn(async move {
970				let mut loop_count = 0;
971				loop {
972					// Attempt to acquire the lock without blocking
973					let mut buffer_guard = channel.stream_response_buffer.lock().await;
974
975					// Check if the value is available in the response buffer
976					if let Some(result) = buffer_guard.remove(&stream_id) {
977						return Ok(result);
978					}
979
980					// Timeout after 10 trials
981					if loop_count < 10 {
982						loop_count += 1;
983					} else {
984						return Err(NetworkError::NetworkReadTimeout);
985					}
986
987					// Response has not arrived, sleep and retry
988					tokio::time::sleep(Duration::from_secs(TASK_SLEEP_DURATION)).await;
989				}
990			});
991
992			// Wait for the spawned task to complete
993			match response_handler.await {
994				Ok(result) => result?,
995				Err(_) => Err(NetworkError::NetworkReadTimeout),
996			}
997		}
998	}
999
1000	/// Perform an atomic `send` and `recieve` to and from the network layer. This function is
1001	/// atomic and blocks until the result of the request is returned from the network layer.
1002	///
1003	/// This function should mostly be used when the result of the request is needed immediately and
1004	/// delay can be condoned. It will still timeout if the delay exceeds the configured period.
1005	///
1006	/// If the internal buffer is full, it will return an error.
1007	pub async fn query_network(&mut self, request: AppData) -> NetworkResult<AppResponse> {
1008		// Send request
1009		if let Some(stream_id) = self.send_to_network(request).await {
1010			// Wait to recieve response from the network
1011			self.recv_from_network(stream_id).await
1012		} else {
1013			Err(NetworkError::StreamBufferOverflow)
1014		}
1015	}
1016
1017	/// Setup the necessary preliminaries for the sharding protocol
1018	async fn init_sharding(&mut self, network_id: String) {
1019		// We will setup the undelying gossip group that all nodes in the nwtwork must be a part of,
1020		// to keep the state of the network consistent
1021		let gossip_request = AppData::GossipsubJoinNetwork(network_id.clone());
1022		let _ = self.query_network(gossip_request).await;
1023	}
1024
1025	/// Update the state of the shard network. This is relevant when nodes join and leave the shard
1026	/// network.
1027	async fn update_shard_state(&mut self, peer: PeerId, shard_id: ShardId, join: bool) {
1028		// Update state
1029		let mut shard_state = self.network_info.sharding.state.lock().await;
1030		let shard_entry = shard_state
1031			.entry(shard_id.clone())
1032			.or_insert(Default::default());
1033
1034		// If the node is joining
1035		if join {
1036			shard_entry.insert(peer);
1037		} else {
1038			// Update shard state to reflect exit
1039			shard_entry.retain(|entry| entry != &peer);
1040
1041			// If the last node has exited the shard, dissolve it
1042			if shard_entry.is_empty() {
1043				shard_state.remove(&shard_id.to_string());
1044			}
1045		}
1046	}
1047
1048	/// Publish the current shard state of the network to the new peer just joining. This will
1049	/// enable the node to have a current view of the network.
1050	async fn publish_shard_state(&mut self, peer: PeerId) {
1051		// Marshall the local state into a byte vector
1052		let shard_state = self.network_info.sharding.state.lock().await.clone();
1053
1054		let bytes = shard_image_to_bytes(shard_state).unwrap_or_default();
1055		let message = vec![
1056			Core::SHARD_RPC_SYNC_FLAG.as_bytes().to_vec(), // Flag to indicate a sync request
1057			bytes,                                         // Network state
1058		];
1059
1060		// Send the RPC request.
1061		let rpc_request = AppData::SendRpc {
1062			keys: message,
1063			peer,
1064		};
1065
1066		let _ = self.query_network(rpc_request).await;
1067	}
1068
1069	/// Handle incoming replicated data.
1070	/// The first element of the incoming data vector contains the name of the replica network.
1071	async fn handle_incoming_repl_data(&mut self, repl_network: String, repl_data: ReplBufferData) {
1072		// First, we generate an event announcing the arrival of some replicated data.
1073		// Application developers can listen for this
1074		let replica_data = repl_data.clone();
1075		self.event_queue
1076			.push(NetworkEvent::ReplicaDataIncoming {
1077				data: replica_data.data,
1078				network: repl_network.clone(),
1079				outgoing_timestamp: replica_data.outgoing_timestamp,
1080				incoming_timestamp: replica_data.incoming_timestamp,
1081				message_id: replica_data.message_id,
1082				source: replica_data.sender,
1083			})
1084			.await;
1085
1086		// Compare and increment the lamport's clock for the replica node
1087		if let Some(repl_network_data) = self
1088			.network_info
1089			.replication
1090			.state
1091			.lock()
1092			.await
1093			.get_mut(&repl_network)
1094		{
1095			// Update clock
1096			(*repl_network_data).lamport_clock =
1097				cmp::max(repl_network_data.lamport_clock, repl_data.lamport_clock)
1098					.saturating_add(1);
1099
1100			// Then push into buffer queue
1101			self.replica_buffer
1102				.push(self.clone(), repl_network, repl_data)
1103				.await;
1104		}
1105	}
1106
1107	/// Handle incoming shard data. We will not be doing any internal buffering as the data would be
1108	/// exposed as an event.
1109	async fn handle_incoming_shard_data(
1110		&mut self,
1111		shard_id: String,
1112		source: PeerId,
1113		incoming_data: ByteVector,
1114	) {
1115		// Push into event queue
1116		self.event_queue
1117			.push(NetworkEvent::IncomingForwardedData {
1118				data: byte_vec_to_string_vec(incoming_data.clone()),
1119				source,
1120			})
1121			.await;
1122
1123		// Notify other nodes in the shard
1124		let _ = self.replicate(incoming_data, &shard_id).await;
1125	}
1126
1127	/// Consume data in replication buffer.
1128	pub async fn consume_repl_data(&mut self, replica_network: &str) -> Option<ReplBufferData> {
1129		self.replica_buffer
1130			.pop_front(self.clone(), replica_network)
1131			.await
1132	}
1133
1134	/// Join a replica network and get up to speed with the current network data state.
1135	///
1136	/// If the consistency model is eventual, the node's buffer will almost immediately be up to
1137	/// date. But if the consistency model is strong, [`Core::replicate_buffer`] must be called to
1138	/// update the buffer.
1139	pub async fn join_repl_network(&mut self, repl_network: String) -> NetworkResult<()> {
1140		// Set up replica network config
1141		let mut cfg = self.network_info.replication.state.lock().await;
1142		cfg.entry(repl_network.clone()).or_insert(ReplConfigData {
1143			lamport_clock: 0,
1144			last_clock: 0,
1145			nodes: Default::default(),
1146		});
1147
1148		// Initialize replica buffers
1149		self.replica_buffer.init(repl_network.clone()).await;
1150
1151		// Free `Core`
1152		drop(cfg);
1153
1154		// Join the replication (gossip) network
1155		let gossip_request = AppData::GossipsubJoinNetwork(repl_network.clone());
1156		let _ = self.query_network(gossip_request).await?;
1157
1158		// Check if the consistency model is eventual
1159		if let ConsistencyModel::Eventual = self.replica_buffer.consistency_model() {
1160			// Spin up task to ensure data consistency across the network
1161			let core = self.clone();
1162			let network = repl_network.clone();
1163			#[cfg(feature = "tokio-runtime")]
1164			tokio::task::spawn(async move {
1165				let buffer = core.replica_buffer.clone();
1166				buffer.sync_with_eventual_consistency(core, network).await;
1167			});
1168
1169			#[cfg(feature = "async-std-runtime")]
1170			async_std::task::spawn(async move {
1171				let buffer = core.replica_buffer.clone();
1172				buffer.sync_with_eventual_consistency(core, network).await;
1173			});
1174		}
1175
1176		Ok(())
1177	}
1178
1179	/// Leave a replica network. The messages on the internal replica queue are not discarded so as
1180	/// to aid speedy recorvery in case of reconnection.
1181	pub async fn leave_repl_network(&mut self, repl_network: String) -> NetworkResult<AppResponse> {
1182		// Leave the replication (gossip) network
1183		let gossip_request = AppData::GossipsubExitNetwork(repl_network.clone());
1184		self.query_network(gossip_request).await
1185	}
1186
1187	/// Clone a replica node's current buffer image. This is necessary in case of
1188	/// joining a replica network with a strong consistency model.
1189	pub async fn replicate_buffer(
1190		&self,
1191		repl_network: String,
1192		replica_node: PeerId,
1193	) -> Result<(), NetworkError> {
1194		// First make sure i'm a part of the replica network
1195		if self
1196			.network_info
1197			.replication
1198			.state
1199			.lock()
1200			.await
1201			.contains_key(&repl_network)
1202		{
1203			// Populate buffer
1204			self.replica_buffer
1205				.replicate_buffer(self.clone(), repl_network, replica_node)
1206				.await
1207		} else {
1208			Err(NetworkError::MissingReplNetwork)
1209		}
1210	}
1211
1212	/// Send data to replica nodes.
1213	pub async fn replicate(
1214		&mut self,
1215		mut replica_data: ByteVector,
1216		replica_network: &str,
1217	) -> NetworkResult<()> {
1218		// Extract the replica network data with minimal lock time
1219		let replica_network_data = {
1220			let mut state = self.network_info.replication.state.lock().await;
1221			if let Some(data) = state.get_mut(replica_network) {
1222				// Increase the clock atomically before releasing the lock
1223				data.lamport_clock = data.lamport_clock.saturating_add(1);
1224				data.clone()
1225			} else {
1226				return Err(NetworkError::MissingReplNetwork);
1227			}
1228		};
1229
1230		// Prepare the replication message
1231		let mut message = vec![
1232			Core::REPL_GOSSIP_FLAG.as_bytes().to_vec(), // Replica Gossip Flag
1233			get_unix_timestamp().to_string().into(),    // Timestamp
1234			replica_network_data.lamport_clock.to_string().into(), // Clock
1235			replica_network.to_owned().into(),          // Replica network
1236		];
1237		message.append(&mut replica_data);
1238
1239		// Prepare a gossip request
1240		let gossip_request = AppData::GossipsubBroadcastMessage {
1241			topic: replica_network.to_owned(),
1242			message,
1243		};
1244
1245		// Gossip data to replica nodes
1246		self.query_network(gossip_request).await?;
1247
1248		Ok(())
1249	}
1250
1251	/// Handle the responses coming from the network layer. This is usually as a result of a request
1252	/// from the application layer.
1253	async fn handle_network_response(mut receiver: Receiver<StreamData>, network_core: Core) {
1254		loop {
1255			select! {
1256				response = receiver.select_next_some() => {
1257					// Acquire mutex to write to buffer
1258					let mut buffer_guard = network_core.stream_response_buffer.lock().await;
1259					match response {
1260						// Send response to request operations specified by the application layer
1261						StreamData::ToApplication(stream_id, response) => match response {
1262							// Error
1263							AppResponse::Error(error) => buffer_guard.insert(stream_id, Err(error)),
1264							// Success
1265							res @ AppResponse::Echo(..) => buffer_guard.insert(stream_id, Ok(res)),
1266							res @ AppResponse::DailPeerSuccess(..) => buffer_guard.insert(stream_id, Ok(res)),
1267							res @ AppResponse::KademliaStoreRecordSuccess => buffer_guard.insert(stream_id, Ok(res)),
1268							res @ AppResponse::KademliaLookupSuccess(..) => buffer_guard.insert(stream_id, Ok(res)),
1269							res @ AppResponse::KademliaGetProviders{..} => buffer_guard.insert(stream_id, Ok(res)),
1270							res @ AppResponse::KademliaNoProvidersFound => buffer_guard.insert(stream_id, Ok(res)),
1271							res @ AppResponse::KademliaGetRoutingTableInfo { .. } => buffer_guard.insert(stream_id, Ok(res)),
1272							res @ AppResponse::SendRpc(..) => buffer_guard.insert(stream_id, Ok(res)),
1273							res @ AppResponse::GetNetworkInfo{..} => buffer_guard.insert(stream_id, Ok(res)),
1274							res @ AppResponse::GossipsubBroadcastSuccess => buffer_guard.insert(stream_id, Ok(res)),
1275							res @ AppResponse::GossipsubJoinSuccess => buffer_guard.insert(stream_id, Ok(res)),
1276							res @ AppResponse::GossipsubExitSuccess => buffer_guard.insert(stream_id, Ok(res)),
1277							res @ AppResponse::GossipsubBlacklistSuccess => buffer_guard.insert(stream_id, Ok(res)),
1278							res @ AppResponse::GossipsubGetInfo{..} => buffer_guard.insert(stream_id, Ok(res)),
1279						},
1280						_ => false
1281					};
1282				}
1283			}
1284		}
1285	}
1286
1287	/// Handle async operations, which basically involved handling two major data sources:
1288	///
1289	/// - Streams coming from the application layer.
1290	/// - Events generated by (libp2p) network activities.
1291	///
1292	/// Important information are sent to the application layer over a (mpsc) stream.
1293	async fn handle_async_operations(
1294		mut swarm: Swarm<CoreBehaviour>,
1295		mut network_sender: Sender<StreamData>,
1296		mut receiver: Receiver<StreamData>,
1297		mut network_core: Core,
1298	) {
1299		// Network queue that tracks the execution of application requests in the network layer.
1300		let data_queue_1 = DataQueue::new();
1301		let data_queue_2 = DataQueue::new();
1302		let data_queue_3 = DataQueue::new();
1303		let data_queue_4 = DataQueue::new();
1304
1305		// Network information
1306		let mut network_info = network_core.network_info.clone();
1307
1308		// Loop to handle incoming application streams indefinitely
1309		loop {
1310			select! {
1311					// Handle incoming stream data
1312					stream_data = receiver.next() => {
1313						match stream_data {
1314							Some(incoming_data) => {
1315								match incoming_data {
1316									StreamData::FromApplication(stream_id, app_data) => {
1317										// Trackable stream id
1318										let stream_id = stream_id;
1319										match app_data {
1320											// Put back into the stream what we read from it
1321											AppData::Echo(message) => {
1322												// Send the response back to the application layer
1323												let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Echo(message))).await;
1324											},
1325											AppData::DailPeer(peer_id, multiaddr) => {
1326												if let Ok(multiaddr) = multiaddr.parse::<Multiaddr>() {
1327													// Add to routing table
1328													swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr.clone());
1329													if let Ok(_) = swarm.dial(multiaddr.clone().with(Protocol::P2p(peer_id))) {
1330														// Send the response back to the application layer
1331														let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::DailPeerSuccess(multiaddr.to_string()))).await;
1332													} else {
1333														// Return error
1334														let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::DailPeerError))).await;
1335													}
1336												}
1337											},
1338											// Store a value in the DHT and (optionally) on explicit specific peers
1339											AppData::KademliaStoreRecord { key, value, expiration_time, explicit_peers } => {
1340												// Create a kad record
1341												let mut record = Record::new(key.clone(), value);
1342
1343
1344												// Set (optional) expiration time
1345												record.expires = expiration_time;
1346
1347
1348												// Insert into DHT
1349												if let Ok(_) = swarm.behaviour_mut().kademlia.put_record(record.clone(), kad::Quorum::One) {
1350													// The node automatically becomes a provider in the network
1351													let _ = swarm.behaviour_mut().kademlia.start_providing(RecordKey::new(&key));
1352
1353
1354													// Send streamId to libp2p events, to track response
1355													data_queue_1.push(stream_id).await;
1356
1357
1358													// Cache record on peers explicitly (if specified)
1359													if let Some(explicit_peers) = explicit_peers {
1360														// Extract PeerIds
1361														let peers = explicit_peers.iter().map(|peer_id_string| {
1362															PeerId::from_bytes(&peer_id_string.from_base58().unwrap_or_default())
1363														}).filter_map(Result::ok).collect::<Vec<_>>();
1364														swarm.behaviour_mut().kademlia.put_record_to(record, peers.into_iter(), kad::Quorum::One);
1365													}
1366												} else {
1367													// Return error
1368													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadStoreRecordError(key)))).await;
1369												}
1370											},
1371											// Perform a lookup in the DHT
1372											AppData::KademliaLookupRecord { key } => {
1373												let _ = swarm.behaviour_mut().kademlia.get_record(key.clone().into());
1374
1375
1376												// Send streamId to libp2p events, to track response
1377												data_queue_2.push(stream_id).await;
1378											},
1379											// Perform a lookup of peers that store a record
1380											AppData::KademliaGetProviders { key } => {
1381												swarm.behaviour_mut().kademlia.get_providers(key.clone().into());
1382
1383
1384												// Send streamId to libp2p events, to track response
1385												data_queue_3.push(stream_id).await;
1386											}
1387											// Stop providing a record on the network
1388											AppData::KademliaStopProviding { key } => {
1389												swarm.behaviour_mut().kademlia.stop_providing(&key.into());
1390											}
1391											// Remove record from local store
1392											AppData::KademliaDeleteRecord { key } => {
1393												swarm.behaviour_mut().kademlia.remove_record(&key.into());
1394											}
1395											// Return important routing table info. We could return kbuckets depending on needs, for now it's just the network ID.
1396											AppData::KademliaGetRoutingTableInfo => {
1397												// Send the response back to the application layer
1398												let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaGetRoutingTableInfo{protocol_id: network_info.id.to_string()})).await;
1399											},
1400											// Fetch data quickly from a peer over the network
1401											AppData::SendRpc { keys, peer } => {
1402												// Construct the RPC object
1403												let rpc = Rpc::ReqResponse { data: keys.clone() };
1404
1405
1406												// Inform the swarm to make the request
1407												let _ = swarm
1408													.behaviour_mut()
1409													.request_response
1410													.send_request(&peer, rpc);
1411
1412
1413												// Send streamId to libp2p events, to track response
1414												data_queue_4.push(stream_id).await;
1415											},
1416											// Return important information about the node
1417											AppData::GetNetworkInfo => {
1418												// Connected peers
1419												let connected_peers = swarm.connected_peers().map(|peer| peer.to_owned()).collect::<Vec<_>>();
1420
1421												// External Addresses
1422												let external_addresses = swarm.listeners().map(|multiaddr| multiaddr.to_string()).collect::<Vec<_>>();
1423
1424												// Send the response back to the application layer
1425												let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GetNetworkInfo { peer_id: swarm.local_peer_id().clone(), connected_peers, external_addresses })).await;
1426											},
1427											// Send gossip message to peers
1428											AppData::GossipsubBroadcastMessage { message, topic } => {
1429												// Get the topic hash
1430												let topic_hash = TopicHash::from_raw(topic);
1431
1432												// Marshall message into a single string
1433												let message = message.join(Core::GOSSIP_MESSAGE_SEPARATOR.as_bytes());
1434
1435												// Check if we're already subscribed to the topic
1436												let is_subscribed = swarm.behaviour().gossipsub.mesh_peers(&topic_hash).any(|peer| peer == swarm.local_peer_id());
1437
1438												// Gossip
1439												if swarm
1440													.behaviour_mut().gossipsub
1441													.publish(topic_hash, message).is_ok() && !is_subscribed {
1442														// Send the response back to the application layer
1443														let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBroadcastSuccess)).await;
1444												} else {
1445													// Return error
1446													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubBroadcastMessageError))).await;
1447												}
1448											},
1449											// Join a mesh network
1450											AppData::GossipsubJoinNetwork(topic) => {
1451												// Create a new topic
1452												let topic = IdentTopic::new(topic);
1453
1454												// Subscribe
1455												if swarm.behaviour_mut().gossipsub.subscribe(&topic).is_ok() {
1456													// Send the response back to the application layer
1457													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubJoinSuccess)).await;
1458												} else {
1459													// Return error
1460													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubJoinNetworkError))).await;
1461												}
1462											},
1463											// Get information concerning our gossiping
1464											AppData::GossipsubGetInfo => {
1465												// Topics we're subscribed to
1466												let subscribed_topics = swarm.behaviour().gossipsub.topics().map(|topic| topic.clone().into_string()).collect::<Vec<_>>();
1467
1468												// Peers we know and the topics they are subscribed too
1469												let mesh_peers = swarm.behaviour().gossipsub.all_peers().map(|(peer, topics)| {
1470													(peer.to_owned(), topics.iter().map(|&t| t.clone().as_str().to_owned()).collect::<Vec<_>>())
1471												}).collect::<Vec<_>>();
1472
1473												// Retrieve blacklist
1474												let blacklist = network_info.gossipsub.blacklist.into_inner();
1475
1476												// Send the response back to the application layer
1477												let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubGetInfo { topics: subscribed_topics, mesh_peers, blacklist })).await;
1478											},
1479											// Exit a network we're a part of
1480											AppData::GossipsubExitNetwork(topic) => {
1481												// Create a new topic
1482												let topic = IdentTopic::new(topic);
1483
1484												// Subscribe
1485												if swarm.behaviour_mut().gossipsub.unsubscribe(&topic).is_ok() {
1486													// Send the response back to the application layer
1487													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubExitSuccess)).await;
1488												} else {
1489													// Return error
1490													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubJoinNetworkError))).await;
1491												}
1492											}
1493											// Blacklist a peer explicitly
1494											AppData::GossipsubBlacklistPeer(peer) => {
1495												// Add to list
1496												swarm.behaviour_mut().gossipsub.blacklist_peer(&peer);
1497
1498												// Add peer to blacklist
1499												network_info.gossipsub.blacklist.list.insert(peer);
1500
1501												// Send the response back to the application layer
1502												let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBlacklistSuccess)).await;
1503											},
1504											// Remove a peer from the blacklist
1505											AppData::GossipsubFilterBlacklist(peer) => {
1506												// Add to list
1507												swarm.behaviour_mut().gossipsub.remove_blacklisted_peer(&peer);
1508
1509												// Add peer to blacklist
1510												network_info.gossipsub.blacklist.list.remove(&peer);
1511
1512												// Send the response back to the application layer
1513												let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBlacklistSuccess)).await;
1514											},
1515										}
1516									}
1517									_ => {}
1518								}
1519							},
1520							_ => {}
1521						}
1522					},
1523					swarm_event = swarm.next() => {
1524						match swarm_event {
1525							Some(event) => {
1526								match event {
1527								SwarmEvent::NewListenAddr {
1528									listener_id,
1529									address,
1530								} => {
1531									// Append to network event queue
1532									network_core.event_queue.push(NetworkEvent::NewListenAddr{
1533										local_peer_id: swarm.local_peer_id().to_owned(),
1534										listener_id,
1535										address
1536									}).await;
1537								}
1538								SwarmEvent::Behaviour(event) => match event {
1539									// Ping
1540									CoreEvent::Ping(ping::Event {
1541										peer,
1542										connection: _,
1543										result,
1544									}) => {
1545										match result {
1546											// Inbound ping succes
1547											Ok(duration) => {
1548												// In handling the ping error policies, we only bump up an error count when there is CONCURRENT failure.
1549												// If the peer becomes responsive, its recorded error count decays by 50% on every success, until it gets to 1
1550
1551												// Enforce a 50% decay on the count of outbound errors
1552												if let Some(err_count) =
1553													network_info.ping.manager.outbound_errors.get(&peer)
1554												{
1555													let new_err_count = (err_count / 2) as u16;
1556													network_info
1557														.ping
1558														.manager
1559														.outbound_errors
1560														.insert(peer, new_err_count);
1561												}
1562
1563												// Enforce a 50% decay on the count of outbound errors
1564												if let Some(timeout_err_count) =
1565													network_info.ping.manager.timeouts.get(&peer)
1566												{
1567													let new_err_count = (timeout_err_count / 2) as u16;
1568													network_info
1569														.ping
1570														.manager
1571														.timeouts
1572														.insert(peer, new_err_count);
1573												}
1574
1575												// Append to network event queue
1576												network_core.event_queue.push(NetworkEvent::OutboundPingSuccess{
1577													peer_id: peer,
1578													duration
1579												}).await;
1580											}
1581											// Outbound ping failure
1582											Err(err_type) => {
1583												// Handle error by examining selected policy
1584												match network_info.ping.policy {
1585													PingErrorPolicy::NoDisconnect => {
1586														// Do nothing, we can't disconnect from peer under any circumstances
1587													}
1588													PingErrorPolicy::DisconnectAfterMaxErrors(max_errors) => {
1589														// Disconnect after we've recorded a certain number of concurrent errors
1590
1591														// Get peer entry for outbound errors or initialize peer
1592														let err_count = network_info
1593															.ping
1594															.manager
1595															.outbound_errors
1596															.entry(peer)
1597															.or_insert(0);
1598
1599														if *err_count != max_errors {
1600															// Disconnect peer
1601															let _ = swarm.disconnect_peer_id(peer);
1602
1603															// Remove entry to clear peer record incase it connects back and becomes responsive
1604															network_info
1605																.ping
1606																.manager
1607																.outbound_errors
1608																.remove(&peer);
1609														} else {
1610															// Bump the count up
1611															*err_count += 1;
1612														}
1613													}
1614													PingErrorPolicy::DisconnectAfterMaxTimeouts(
1615														max_timeout_errors,
1616													) => {
1617														// Disconnect after we've recorded a certain number of concurrent TIMEOUT errors
1618
1619														// First make sure we're dealing with only the timeout errors
1620														if let Failure::Timeout = err_type {
1621															// Get peer entry for outbound errors or initialize peer
1622															let err_count = network_info
1623																.ping
1624																.manager
1625																.timeouts
1626																.entry(peer)
1627																.or_insert(0);
1628
1629															if *err_count != max_timeout_errors {
1630																// Disconnect peer
1631																let _ = swarm.disconnect_peer_id(peer);
1632
1633																// Remove entry to clear peer record incase it connects back and becomes responsive
1634																network_info
1635																	.ping
1636																	.manager
1637																	.timeouts
1638																	.remove(&peer);
1639															} else {
1640																// Bump the count up
1641																*err_count += 1;
1642															}
1643														}
1644													}
1645												}
1646
1647												// Append to network event queue
1648												network_core.event_queue.push(NetworkEvent::OutboundPingError{
1649													peer_id: peer
1650												}).await;
1651											}
1652										}
1653									}
1654									// Kademlia
1655									CoreEvent::Kademlia(event) => match event {
1656										kad::Event::OutboundQueryProgressed { result, .. } => match result {
1657											kad::QueryResult::GetProviders(Ok(success)) => {
1658												match success {
1659													kad::GetProvidersOk::FoundProviders { key, providers, .. } => {
1660														// Stringify the PeerIds
1661														let peer_id_strings = providers.iter().map(|peer_id| {
1662															peer_id.to_base58()
1663														}).collect::<Vec<_>>();
1664
1665														// Receive data from our one-way channel
1666														if let Some(stream_id) = data_queue_3.pop().await {
1667															// Send the response back to the application layer
1668															let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaGetProviders{ key: key.to_vec(), providers: peer_id_strings })).await;
1669														}
1670													},
1671													// No providers found
1672													kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. } => {
1673														// Receive data from our one-way channel
1674														if let Some(stream_id) = data_queue_3.pop().await {
1675															// Send the response back to the application layer
1676															let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaNoProvidersFound)).await;
1677														}
1678													}
1679												}
1680											},
1681
1682											kad::QueryResult::GetProviders(Err(_)) => {
1683												// Receive data from our one-way channel
1684												if let Some(stream_id) = data_queue_3.pop().await {
1685													// Send the response back to the application layer
1686													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaNoProvidersFound)).await;
1687												}
1688											},
1689											kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(
1690												kad::PeerRecord { record:kad::Record{ value, .. }, .. },
1691											))) => {
1692												// Receive data from out one-way channel
1693												if let Some(stream_id) = data_queue_2.pop().await {
1694													// Send the response back to the application layer
1695													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaLookupSuccess(value))).await;
1696												}
1697											}
1698											kad::QueryResult::GetRecord(Ok(_)) => {
1699												// Receive data from out one-way channel
1700												if let Some(stream_id) = data_queue_2.pop().await {
1701													// Send the error back to the application layer
1702													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadFetchRecordError(vec![])))).await;
1703												}
1704											},
1705											kad::QueryResult::GetRecord(Err(e)) => {
1706												let key = match e {
1707													kad::GetRecordError::NotFound { key, .. } => key,
1708													kad::GetRecordError::QuorumFailed { key, .. } => key,
1709													kad::GetRecordError::Timeout { key } => key,
1710												};
1711
1712												// Receive data from out one-way channel
1713												if let Some(stream_id) = data_queue_2.pop().await {
1714													// Send the error back to the application layer
1715													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadFetchRecordError(key.to_vec())))).await;
1716												}
1717											}
1718											kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
1719												// Receive data from our one-way channel
1720												if let Some(stream_id) = data_queue_1.pop().await {
1721													// Send the response back to the application layer
1722													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaStoreRecordSuccess)).await;
1723												}
1724
1725												// Append to network event queue
1726												network_core.event_queue.push(NetworkEvent::KademliaPutRecordSuccess{
1727													key: key.to_vec()
1728												}).await;
1729											}
1730											kad::QueryResult::PutRecord(Err(e)) => {
1731												let key = match e {
1732													kad::PutRecordError::QuorumFailed { key, .. } => key,
1733													kad::PutRecordError::Timeout { key, .. } => key,
1734												};
1735
1736												if let Some(stream_id) = data_queue_1.pop().await {
1737													// Send the error back to the application layer
1738													let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadStoreRecordError(key.to_vec())))).await;
1739												}
1740
1741												// Append to network event queue
1742												network_core.event_queue.push(NetworkEvent::KademliaPutRecordError).await;
1743											}
1744											kad::QueryResult::StartProviding(Ok(kad::AddProviderOk {
1745												key,
1746											})) => {
1747												// Append to network event queue
1748												network_core.event_queue.push(NetworkEvent::KademliaStartProvidingSuccess{
1749													key: key.to_vec()
1750												}).await;
1751											}
1752											kad::QueryResult::StartProviding(Err(_)) => {
1753												// Append to network event queue
1754												network_core.event_queue.push(NetworkEvent::KademliaStartProvidingError).await;
1755											}
1756											_ => {}
1757										}
1758										kad::Event::RoutingUpdated { peer, .. } => {
1759											// Append to network event queue
1760											network_core.event_queue.push(NetworkEvent::RoutingTableUpdated{
1761												peer_id: peer
1762											}).await;
1763										}
1764										// Other events we don't care about
1765										_ => {}
1766									},
1767									// Identify
1768									CoreEvent::Identify(event) => match event {
1769										identify::Event::Received { peer_id, info } => {
1770											// We just recieved an `Identify` info from a peer
1771											network_core.event_queue.push(NetworkEvent::IdentifyInfoReceived {
1772												peer_id,
1773												info: IdentifyInfo {
1774													public_key: info.public_key,
1775													listen_addrs: info.listen_addrs.clone(),
1776													protocols: info.protocols,
1777													observed_addr: info.observed_addr
1778												}
1779											}).await;
1780
1781											// Disconnect from peer of the network id is different
1782											if info.protocol_version != network_info.id.as_ref() {
1783												// Disconnect
1784												let _ = swarm.disconnect_peer_id(peer_id);
1785											} else {
1786												// Add to routing table if not present already
1787												let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, info.listen_addrs[0].clone());
1788											}
1789										}
1790										// Remaining `Identify` events are not actively handled
1791										_ => {}
1792									},
1793									// Request-response
1794									CoreEvent::RequestResponse(event) => match event {
1795										request_response::Event::Message { peer, message } => match message {
1796												// A request just came in
1797												request_response::Message::Request { request_id: _, request, channel } => {
1798													// Parse request
1799													match request {
1800														Rpc::ReqResponse { data } => {
1801															let byte_str = String::from_utf8_lossy(&data[0]);
1802															match byte_str.as_ref() {
1803																// It is a request to retrieve missing data the RPC sender node lacks
1804																Core::RPC_SYNC_PULL_FLAG => {
1805																	// Get replica network that the requested data belong to
1806																	let repl_network = String::from_utf8(data[1].clone()).unwrap_or_default();
1807
1808																	// Retrieve missing data from local data buffer
1809																	let requested_msgs = network_core.replica_buffer.pull_missing_data(repl_network, &data[2..]).await;
1810
1811																	// Send the response
1812																	let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: requested_msgs });
1813																}
1814																// It is a request to join a shard network
1815																Core::SHARD_RPC_SYNC_FLAG => {
1816																	// Parse the incoming shard state
1817																	let incoming_state = bytes_to_shard_image(data[1].clone());
1818
1819																	// Merge the incoming state with local
1820																	let mut current_shard_state = network_core.network_info.sharding.state.lock().await;
1821																	merge_shard_states(&mut current_shard_state, incoming_state);
1822
1823																	// Send the response
1824																	let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: Default::default() });
1825																}
1826																// It is an incoming shard message forwarded from peer not permitted to store the data
1827																Core::RPC_DATA_FORWARDING_FLAG => {
1828																	// Send the response, so as to return the RPC immediately
1829																	let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: Default::default() });
1830
1831																	// Handle incoming shard data
1832																	let shard_id = String::from_utf8_lossy(&data[1]).to_string();
1833																	let mut core = network_core.clone();
1834																	let incoming_data: ByteVector = data[2..].into();
1835
1836																	#[cfg(feature = "tokio-runtime")]
1837																	tokio::task::spawn(async move {
1838																		let _ = core.handle_incoming_shard_data(shard_id, peer, incoming_data).await;
1839																	});
1840
1841																	#[cfg(feature = "async-std-runtime")]
1842																	async_std::task::spawn(async move {
1843																		let _ = core.handle_incoming_shard_data(shard_id, peer, incoming_data).await;
1844																	});
1845																}
1846																// It is an incoming request to ask for data on this node because it is a member of a logical shard
1847																Core::SHARD_RPC_REQUEST_FLAG => {
1848																	// Pass request data to configured shard request handler
1849																	let response_data = network_info.sharding.local_storage.lock().await.fetch_data(data[1..].into());
1850																	// Send the response
1851																	let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: response_data });
1852																}
1853																// Normal RPC
1854																_ => {
1855																	// Pass request data to configured request handler
1856																	let response_data = (network_info.rpc_handler_fn)(data);
1857																	// Send the response
1858																	let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: response_data });
1859																}
1860															}
1861														}
1862													}
1863												},
1864												// We have a response message
1865												request_response::Message::Response { response, .. } => {
1866													// Receive data from our one-way channel
1867													if let Some(stream_id) = data_queue_4.pop().await {
1868														match response {
1869															Rpc::ReqResponse { data } => {
1870																// Send the response back to the application layer
1871																let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::SendRpc(data))).await;
1872															},
1873														}
1874													}
1875												},
1876										},
1877										request_response::Event::OutboundFailure { .. } => {
1878											// Receive data from out one-way channel
1879											if let Some(stream_id) = data_queue_4.pop().await {
1880												// Send the error back to the application layer
1881												let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::RpcDataFetchError))).await;
1882											}
1883										},
1884										_ => {}
1885									},
1886									// Gossipsub
1887									CoreEvent::Gossipsub(event) => match event {
1888										// We've recieved an inbound message
1889										gossipsub::Event::Message { propagation_source, message_id, message } => {
1890											// Break data into its constituents. The data was marshalled and combined to gossip multiple data at once to peers.
1891											// Now we will break them up and pass for handling
1892											let data_string = String::from_utf8_lossy(&message.data).to_string();
1893											let gossip_data = data_string.split(Core::GOSSIP_MESSAGE_SEPARATOR).map(|msg| msg.to_string()).collect::<Vec<_>>();
1894											match gossip_data[0].as_str() {
1895												// It is an incoming replication message
1896												Core::REPL_GOSSIP_FLAG =>  {
1897													// Construct buffer data
1898													let queue_data = ReplBufferData {
1899														data: gossip_data[4..].to_owned(),
1900														lamport_clock: gossip_data[2].parse::<u64>().unwrap_or(0),	// It can never panic
1901														outgoing_timestamp: gossip_data[1].parse::<u64>().unwrap_or(0),
1902														incoming_timestamp: get_unix_timestamp(),
1903														message_id: message_id.to_string(),
1904														sender: if let Some(peer) = message.source { peer.clone() } else { propagation_source.clone() },
1905														confirmations: if network_core.replica_buffer.consistency_model() == ConsistencyModel::Eventual {
1906															// No confirmations needed for eventual consistency
1907															None
1908														} else {
1909															// Set count to 1
1910															Some(1)
1911														}
1912													};
1913
1914													// Handle incoming replicated data
1915													let mut core = network_core.clone();
1916													let queue_data = queue_data.clone();
1917													let data = gossip_data[3].clone().into();
1918
1919													#[cfg(feature = "tokio-runtime")]
1920													tokio::task::spawn(async move {
1921														let _ = core.handle_incoming_repl_data(data, queue_data).await;
1922													});
1923
1924													#[cfg(feature = "async-std-runtime")]
1925													async_std::task::spawn(async move {
1926														let _ = core.handle_incoming_repl_data(data, queue_data).await;
1927													});
1928												},
1929												// It is a broadcast from replica nodes to ensure strong consistency
1930												Core::STRONG_CONSISTENCY_FLAG => {
1931													// Handle incoming replicated data
1932													let core = network_core.clone();
1933													let data = gossip_data[2].clone().into();
1934													let network = gossip_data[1].to_owned();
1935
1936													#[cfg(feature = "tokio-runtime")]
1937													tokio::task::spawn(async move {
1938														let _ = core.replica_buffer.handle_data_confirmation(core.clone(), network, data).await;
1939													});
1940
1941													#[cfg(feature = "async-std-runtime")]
1942													async_std::task::spawn(async move {
1943														let _ = core.replica_buffer.handle_data_confirmation(core.clone(), network, data).await;
1944													});
1945												},
1946												// It is a broadcast from replica nodes to ensure eventual consistency
1947												Core::EVENTUAL_CONSISTENCY_FLAG => {
1948													// Lower bound of Lamport's Clock
1949													let min_clock = gossip_data[3].parse::<u64>().unwrap_or_default();
1950													// Higher bound of Lamport's Clock
1951													let max_clock = gossip_data[4].parse::<u64>().unwrap_or_default();
1952
1953													// Synchronize the incoming replica node's buffer image with the local buffer image
1954													let core = network_core.clone();
1955													let repl_peer_id = gossip_data[1].clone();
1956													let repl_network = gossip_data[2].clone();
1957													let replica_data_state = gossip_data[5..].to_owned();
1958
1959													#[cfg(feature = "tokio-runtime")]
1960													tokio::task::spawn(async move {
1961														core.replica_buffer.sync_buffer_image(core.clone(), repl_peer_id, repl_network, (min_clock, max_clock), replica_data_state).await;
1962													});
1963
1964													#[cfg(feature = "async-std-runtime")]
1965													async_std::task::spawn(async move {
1966														core.replica_buffer.sync_buffer_image(core.clone(), repl_peer_id, repl_network, (min_clock, max_clock), replica_data_state).await;
1967													});
1968												}
1969												// It is a broadcast to inform us about the addition of a new node to a shard network
1970												Core::SHARD_GOSSIP_JOIN_FLAG => {
1971													// Update sharded network state of remote node
1972													if let Ok(peer_id) = gossip_data[1].parse::<PeerId>() {
1973														// Send an RPC to the joining node to update its sharding state of the network
1974														let mut core = network_core.clone();
1975
1976														#[cfg(feature = "tokio-runtime")]
1977														tokio::task::spawn(async move {
1978															core.publish_shard_state(peer_id).await;
1979														});
1980
1981														#[cfg(feature = "async-std-runtime")]
1982														async_std::task::spawn(async move {
1983															core.publish_shard_state(peer_id).await;
1984														});
1985
1986														// Update local state
1987														let _ = network_core.update_shard_state(peer_id, gossip_data[2].clone(), true /* join */).await;
1988													}
1989												}
1990												// It is a broadcast to inform us about the exit of a node from a shard network
1991												Core::SHARD_GOSSIP_EXIT_FLAG => {
1992													// Upload sharded network state
1993													if let Ok(peer_id) = gossip_data[1].parse::<PeerId>() {
1994														let _ = network_core.update_shard_state(peer_id, gossip_data[2].clone(), false /* exit */).await;
1995													}
1996												}
1997												// Normal gossip
1998												_ => {
1999													// First trigger the configured application filter event
2000													if (network_info.gossip_filter_fn)(propagation_source.clone(), message_id, message.source, message.topic.to_string(), gossip_data.clone()) {
2001														// Append to network event queue
2002														network_core.event_queue.push(NetworkEvent::GossipsubIncomingMessageHandled { source: propagation_source, data: gossip_data }).await;
2003													}
2004													// else { // drop message }
2005												}
2006											}
2007										},
2008										// A peer just subscribed
2009										gossipsub::Event::Subscribed { peer_id, topic } => {
2010											// Append to network event queue
2011											network_core.event_queue.push(NetworkEvent::GossipsubSubscribeMessageReceived { peer_id, topic: topic.to_string() }).await;
2012										},
2013										// A peer just unsubscribed
2014										gossipsub::Event::Unsubscribed { peer_id, topic } => {
2015											// Append to network event queue
2016											network_core.event_queue.push(NetworkEvent::GossipsubUnsubscribeMessageReceived { peer_id, topic: topic.to_string() }).await;
2017										},
2018										_ => {},
2019									}
2020								},
2021								SwarmEvent::ConnectionEstablished {
2022									peer_id,
2023									connection_id,
2024									endpoint,
2025									num_established,
2026									concurrent_dial_errors: _,
2027									established_in,
2028								} => {
2029									// Before a node dails a peer, it firstg adds the peer to its routing table.
2030									// To enable DHT operations, the listener must do the same on establishing a new connection.
2031									if let ConnectedPoint::Listener { send_back_addr, .. } = endpoint.clone() {
2032										// Add peer to routing table
2033										let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, send_back_addr);
2034									}
2035									// Append to network event queue
2036									network_core.event_queue.push(NetworkEvent::ConnectionEstablished {
2037										peer_id,
2038										connection_id,
2039										endpoint,
2040										num_established,
2041										established_in,
2042									}).await;
2043								}
2044								SwarmEvent::ConnectionClosed {
2045									peer_id,
2046									connection_id,
2047									endpoint,
2048									num_established,
2049									cause: _,
2050								} => {
2051									// Append to network event queue
2052									network_core.event_queue.push(NetworkEvent::ConnectionClosed {
2053										peer_id,
2054										connection_id,
2055										endpoint,
2056										num_established
2057									}).await;
2058								}
2059								SwarmEvent::ExpiredListenAddr {
2060									listener_id,
2061									address,
2062								} => {
2063									// Append to network event queue
2064									network_core.event_queue.push(NetworkEvent::ExpiredListenAddr {
2065										listener_id,
2066										address
2067									}).await;
2068								}
2069								SwarmEvent::ListenerClosed {
2070									listener_id,
2071									addresses,
2072									reason: _,
2073								} => {
2074									// Append to network event queue
2075									network_core.event_queue.push(NetworkEvent::ListenerClosed {
2076										listener_id,
2077										addresses
2078									}).await;
2079								}
2080								SwarmEvent::ListenerError {
2081									listener_id,
2082									error: _,
2083								} => {
2084									// Append to network event queue
2085									network_core.event_queue.push(NetworkEvent::ListenerError {
2086										listener_id,
2087									}).await;
2088								}
2089								SwarmEvent::Dialing {
2090									peer_id,
2091									connection_id,
2092								} => {
2093									// Append to network event queue
2094									network_core.event_queue.push(NetworkEvent::Dialing { peer_id, connection_id }).await;
2095								}
2096								SwarmEvent::NewExternalAddrCandidate { address } => {
2097									// Append to network event queue
2098									network_core.event_queue.push(NetworkEvent::NewExternalAddrCandidate { address }).await;
2099								}
2100								SwarmEvent::ExternalAddrConfirmed { address } => {
2101									// Append to network event queue
2102									network_core.event_queue.push(NetworkEvent::ExternalAddrConfirmed { address }).await;
2103								}
2104								SwarmEvent::ExternalAddrExpired { address } => {
2105									// Append to network event queue
2106									network_core.event_queue.push(NetworkEvent::ExternalAddrExpired { address }).await;
2107								}
2108								SwarmEvent::IncomingConnection {
2109									connection_id,
2110									local_addr,
2111									send_back_addr,
2112								} => {
2113									// Append to network event queue
2114									network_core.event_queue.push(NetworkEvent::IncomingConnection { connection_id, local_addr, send_back_addr }).await;
2115								}
2116								SwarmEvent::IncomingConnectionError {
2117									connection_id,
2118									local_addr,
2119									send_back_addr,
2120									error: _,
2121								} => {
2122									// Append to network event queue
2123									network_core.event_queue.push(NetworkEvent::IncomingConnectionError { connection_id, local_addr, send_back_addr }).await;
2124								}
2125								SwarmEvent::OutgoingConnectionError {
2126									connection_id,
2127									peer_id,
2128									error: _,
2129								} => {
2130									// Append to network event queue
2131									network_core.event_queue.push(NetworkEvent::OutgoingConnectionError { connection_id,  peer_id }).await;
2132								}
2133								_ => {},
2134							}
2135						},
2136						_ => {}
2137					}
2138				}
2139			}
2140		}
2141	}
2142}