1#![doc = include_str!("../../doc/core/NetworkBuilder.md")]
7#![doc = include_str!("../../doc/core/ApplicationInteraction.md")]
8#![doc = include_str!("../../doc/core/EventHandling.md")]
9#![doc = include_str!("../../doc/core/Replication.md")]
10
11use std::{
12 cmp,
13 collections::{vec_deque::IntoIter, BTreeSet, HashMap, HashSet},
14 fs,
15 net::IpAddr,
16 num::NonZeroU32,
17 path::Path,
18 sync::Arc,
19 time::Duration,
20};
21
22use base58::FromBase58;
23
24use futures::{
25 channel::mpsc::{self, Receiver, Sender},
26 select, SinkExt, StreamExt,
27};
28use libp2p::{
29 gossipsub::{self, IdentTopic, TopicHash},
30 identify::{self},
31 kad::{self, store::MemoryStore, Mode, Record, RecordKey},
32 multiaddr::Protocol,
33 noise,
34 ping::{self, Failure},
35 request_response::{self, cbor::Behaviour, ProtocolSupport},
36 swarm::{NetworkBehaviour, SwarmEvent},
37 tcp, tls, yamux, Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
38};
39use replication::{
40 ConsistencyModel, ReplBufferData, ReplConfigData, ReplInfo, ReplNetworkConfig,
41 ReplicaBufferQueue,
42};
43
44use self::{
45 gossipsub_cfg::{Blacklist, GossipsubConfig, GossipsubInfo},
46 ping_config::*,
47 sharding::{DefaultShardStorage, ShardStorage, ShardingInfo},
48};
49
50use super::*;
51use crate::{setup::BootstrapConfig, util::*};
52
53#[cfg(feature = "async-std-runtime")]
54use async_std::sync::Mutex;
55
56#[cfg(feature = "tokio-runtime")]
57use tokio::sync::Mutex;
58
59pub(crate) mod prelude;
60pub use prelude::*;
61pub mod replication;
62pub mod sharding;
63
64#[cfg(test)]
65mod tests;
66
67#[derive(NetworkBehaviour)]
70#[behaviour(to_swarm = "CoreEvent")]
71struct CoreBehaviour {
72 request_response: request_response::cbor::Behaviour<Rpc, Rpc>,
73 kademlia: kad::Behaviour<MemoryStore>,
74 ping: ping::Behaviour,
75 identify: identify::Behaviour,
76 gossipsub: gossipsub::Behaviour,
77}
78
79#[derive(Debug)]
81enum CoreEvent {
82 Ping(ping::Event),
83 RequestResponse(request_response::Event<Rpc, Rpc>),
84 Kademlia(kad::Event),
85 Identify(identify::Event),
86 Gossipsub(gossipsub::Event),
87}
88
89impl From<ping::Event> for CoreEvent {
91 fn from(event: ping::Event) -> Self {
92 CoreEvent::Ping(event)
93 }
94}
95
96impl From<kad::Event> for CoreEvent {
98 fn from(event: kad::Event) -> Self {
99 CoreEvent::Kademlia(event)
100 }
101}
102
103impl From<identify::Event> for CoreEvent {
105 fn from(event: identify::Event) -> Self {
106 CoreEvent::Identify(event)
107 }
108}
109
110impl From<request_response::Event<Rpc, Rpc>> for CoreEvent {
112 fn from(event: request_response::Event<Rpc, Rpc>) -> Self {
113 CoreEvent::RequestResponse(event)
114 }
115}
116
117impl From<gossipsub::Event> for CoreEvent {
119 fn from(event: gossipsub::Event) -> Self {
120 CoreEvent::Gossipsub(event)
121 }
122}
123
124pub struct CoreBuilder {
126 network_id: StreamProtocol,
128 keypair: Keypair,
130 tcp_udp_port: (Port, Port),
132 boot_nodes: HashMap<PeerIdString, MultiaddrString>,
134 blacklist: Blacklist,
136 stream_size: usize,
139 ip_address: IpAddr,
141 keep_alive_duration: Seconds,
143 transport: TransportOpts,
146 ping: (ping::Behaviour, PingErrorPolicy),
148 kademlia: kad::Behaviour<kad::store::MemoryStore>,
150 identify: identify::Behaviour,
152 request_response: (Behaviour<Rpc, Rpc>, fn(RpcData) -> RpcData),
155 gossipsub: (
158 gossipsub::Behaviour,
159 fn(PeerId, MessageId, Option<PeerId>, String, Vec<String>) -> bool,
160 ),
161 replication_cfg: ReplNetworkConfig,
163 sharding: ShardingInfo,
166}
167
168impl CoreBuilder {
169 pub fn with_config(config: BootstrapConfig) -> Self {
172 let network_id = DEFAULT_NETWORK_ID;
174
175 let default_transport = TransportOpts::TcpQuic {
177 tcp_config: TcpConfig::Default,
178 };
179
180 let peer_id = config.keypair().public().to_peer_id();
182
183 let mut cfg = kad::Config::default();
185 cfg.set_protocol_names(vec![StreamProtocol::new(network_id)]);
186 let kademlia = kad::Behaviour::new(peer_id, kad::store::MemoryStore::new(peer_id));
187
188 let cfg = identify::Config::new(network_id.to_owned(), config.keypair().public())
190 .with_push_listen_addr_updates(true);
191 let identify = identify::Behaviour::new(cfg);
192
193 let request_response_behaviour = Behaviour::new(
195 [(StreamProtocol::new(network_id), ProtocolSupport::Full)],
196 request_response::Config::default(),
197 );
198
199 let cfg = gossipsub::Config::default();
201 let gossipsub_behaviour = gossipsub::Behaviour::new(
202 gossipsub::MessageAuthenticity::Signed(config.keypair()),
203 cfg,
204 )
205 .map_err(|_| SwarmNlError::GossipConfigError)
206 .unwrap();
207
208 let gossip_filter_fn = |_, _, _, _, _| true;
211
212 let rpc_handler_fn = |incoming_data: RpcData| incoming_data;
215
216 CoreBuilder {
218 network_id: StreamProtocol::new(network_id),
219 keypair: config.keypair(),
220 tcp_udp_port: config.ports(),
221 boot_nodes: config.bootnodes(),
222 blacklist: config.blacklist(),
223 stream_size: usize::MAX,
224 ip_address: IpAddr::V4(DEFAULT_IP_ADDRESS),
226 keep_alive_duration: DEFAULT_KEEP_ALIVE_DURATION,
227 transport: default_transport,
228 ping: (
230 Default::default(),
231 PingErrorPolicy::DisconnectAfterMaxTimeouts(20),
232 ),
233 kademlia,
234 identify,
235 request_response: (request_response_behaviour, rpc_handler_fn),
236 gossipsub: (gossipsub_behaviour, gossip_filter_fn),
237 replication_cfg: ReplNetworkConfig::Default,
238 sharding: ShardingInfo {
239 id: Default::default(),
240 local_storage: Arc::new(Mutex::new(DefaultShardStorage)),
241 state: Default::default(),
242 },
243 }
244 }
245
246 pub fn with_network_id(self, protocol: String) -> Self {
251 if protocol.len() > MIN_NETWORK_ID_LENGTH.into() && protocol.starts_with("/") {
252 CoreBuilder {
253 network_id: StreamProtocol::try_from_owned(protocol.clone())
254 .map_err(|_| SwarmNlError::NetworkIdParseError(protocol))
255 .unwrap(),
256 ..self
257 }
258 } else {
259 panic!("Could not parse provided network id");
260 }
261 }
262
263 pub fn listen_on(self, ip_address: IpAddr) -> Self {
268 CoreBuilder { ip_address, ..self }
269 }
270
271 pub fn with_idle_connection_timeout(self, keep_alive_duration: Seconds) -> Self {
273 CoreBuilder {
274 keep_alive_duration,
275 ..self
276 }
277 }
278
279 pub fn with_stream_size(self, size: usize) -> Self {
283 CoreBuilder {
284 stream_size: size,
285 ..self
286 }
287 }
288
289 pub fn with_ping(self, config: PingConfig) -> Self {
291 CoreBuilder {
293 ping: (
294 ping::Behaviour::new(
295 ping::Config::new()
296 .with_interval(config.interval)
297 .with_timeout(config.timeout),
298 ),
299 config.err_policy,
300 ),
301 ..self
302 }
303 }
304
305 pub fn with_replication(mut self, repl_cfg: ReplNetworkConfig) -> Self {
307 self.replication_cfg = repl_cfg;
308 CoreBuilder { ..self }
309 }
310
311 pub fn with_sharding<T: ShardStorage + 'static>(
313 self,
314 network_id: String,
315 local_shard_storage: Arc<Mutex<T>>,
316 ) -> Self {
317 CoreBuilder {
318 sharding: ShardingInfo {
319 id: network_id,
320 local_storage: local_shard_storage,
321 state: Default::default(),
322 },
323 ..self
324 }
325 }
326
327 pub fn with_rpc(self, config: RpcConfig, handler: fn(RpcData) -> RpcData) -> Self {
329 CoreBuilder {
331 request_response: (
332 match config {
333 RpcConfig::Default => self.request_response.0,
334 RpcConfig::Custom {
335 timeout,
336 max_concurrent_streams,
337 } => Behaviour::new(
338 [(self.network_id.clone(), ProtocolSupport::Full)],
339 request_response::Config::default()
340 .with_request_timeout(timeout)
341 .with_max_concurrent_streams(max_concurrent_streams),
342 ),
343 },
344 handler,
345 ),
346 ..self
347 }
348 }
349
350 pub fn with_kademlia(self, config: kad::Config) -> Self {
352 let peer_id = self.keypair.public().to_peer_id();
354 let store = kad::store::MemoryStore::new(peer_id);
355 let kademlia = kad::Behaviour::with_config(peer_id, store, config);
356
357 CoreBuilder { kademlia, ..self }
358 }
359
360 pub fn with_gossipsub(
366 self,
367 config: GossipsubConfig,
368 filter_fn: fn(PeerId, MessageId, Option<PeerId>, String, Vec<String>) -> bool,
369 ) -> Self {
370 let behaviour = match config {
371 GossipsubConfig::Default => self.gossipsub.0,
372 GossipsubConfig::Custom { config, auth } => gossipsub::Behaviour::new(auth, config)
373 .map_err(|_| SwarmNlError::GossipConfigError)
374 .unwrap(),
375 };
376
377 CoreBuilder {
378 gossipsub: (behaviour, filter_fn),
379 ..self
380 }
381 }
382
383 pub fn with_transports(self, transport: TransportOpts) -> Self {
385 CoreBuilder { transport, ..self }
386 }
387
388 pub fn network_id(&self) -> String {
390 self.network_id.to_string()
391 }
392
393 pub async fn build(self) -> SwarmNlResult<Core> {
400 #[cfg(feature = "async-std-runtime")]
401 let mut swarm = {
402 let swarm_builder: SwarmBuilder<_, _> = match self.transport {
404 TransportOpts::TcpQuic { tcp_config } => match tcp_config {
405 TcpConfig::Default => {
406 libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
407 .with_async_std()
408 .with_tcp(
409 tcp::Config::default(),
410 (tls::Config::new, noise::Config::new),
411 yamux::Config::default,
412 )
413 .map_err(|_| {
414 SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
415 tcp_config: TcpConfig::Default,
416 })
417 })?
418 .with_quic()
419 .with_dns()
420 .await
421 .map_err(|_| SwarmNlError::DNSConfigError)?
422 },
423 TcpConfig::Custom {
424 ttl,
425 nodelay,
426 backlog,
427 } => {
428 let tcp_config = tcp::Config::default()
429 .ttl(ttl)
430 .nodelay(nodelay)
431 .listen_backlog(backlog);
432
433 libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
434 .with_async_std()
435 .with_tcp(
436 tcp_config,
437 (tls::Config::new, noise::Config::new),
438 yamux::Config::default,
439 )
440 .map_err(|_| {
441 SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
442 tcp_config: TcpConfig::Custom {
443 ttl,
444 nodelay,
445 backlog,
446 },
447 })
448 })?
449 .with_quic()
450 .with_dns()
451 .await
452 .map_err(|_| SwarmNlError::DNSConfigError)?
453 },
454 },
455 };
456
457 swarm_builder
459 .with_behaviour(|_| CoreBehaviour {
460 ping: self.ping.0,
461 kademlia: self.kademlia,
462 identify: self.identify,
463 request_response: self.request_response.0,
464 gossipsub: self.gossipsub.0,
465 })
466 .map_err(|_| SwarmNlError::ProtocolConfigError)?
467 .with_swarm_config(|cfg| {
468 cfg.with_idle_connection_timeout(Duration::from_secs(self.keep_alive_duration))
469 })
470 .build()
471 };
472
473 #[cfg(feature = "tokio-runtime")]
474 let mut swarm = {
475 let swarm_builder: SwarmBuilder<_, _> = match self.transport {
476 TransportOpts::TcpQuic { tcp_config } => match tcp_config {
477 TcpConfig::Default => {
478 libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
479 .with_tokio()
480 .with_tcp(
481 tcp::Config::default(),
482 (tls::Config::new, noise::Config::new),
483 yamux::Config::default,
484 )
485 .map_err(|_| {
486 SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
487 tcp_config: TcpConfig::Default,
488 })
489 })?
490 .with_quic()
491 },
492 TcpConfig::Custom {
493 ttl,
494 nodelay,
495 backlog,
496 } => {
497 let tcp_config = tcp::Config::default()
498 .ttl(ttl)
499 .nodelay(nodelay)
500 .listen_backlog(backlog);
501
502 libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
503 .with_tokio()
504 .with_tcp(
505 tcp_config,
506 (tls::Config::new, noise::Config::new),
507 yamux::Config::default,
508 )
509 .map_err(|_| {
510 SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
511 tcp_config: TcpConfig::Custom {
512 ttl,
513 nodelay,
514 backlog,
515 },
516 })
517 })?
518 .with_quic()
519 },
520 },
521 };
522
523 swarm_builder
525 .with_behaviour(|_| CoreBehaviour {
526 ping: self.ping.0,
527 kademlia: self.kademlia,
528 identify: self.identify,
529 request_response: self.request_response.0,
530 gossipsub: self.gossipsub.0,
531 })
532 .map_err(|_| SwarmNlError::ProtocolConfigError)?
533 .with_swarm_config(|cfg| {
534 cfg.with_idle_connection_timeout(Duration::from_secs(self.keep_alive_duration))
535 })
536 .build()
537 };
538
539 match self.transport {
543 TransportOpts::TcpQuic { tcp_config: _ } => {
545 let listen_addr_tcp = Multiaddr::empty()
547 .with(match self.ip_address {
548 IpAddr::V4(address) => Protocol::from(address),
549 IpAddr::V6(address) => Protocol::from(address),
550 })
551 .with(Protocol::Tcp(self.tcp_udp_port.0));
552
553 let listen_addr_quic = Multiaddr::empty()
555 .with(match self.ip_address {
556 IpAddr::V4(address) => Protocol::from(address),
557 IpAddr::V6(address) => Protocol::from(address),
558 })
559 .with(Protocol::Udp(self.tcp_udp_port.1))
560 .with(Protocol::QuicV1);
561
562 #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
564 swarm.listen_on(listen_addr_tcp.clone()).map_err(|_| {
565 SwarmNlError::MultiaddressListenError(listen_addr_tcp.to_string())
566 })?;
567
568 #[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
569 swarm.listen_on(listen_addr_quic.clone()).map_err(|_| {
570 SwarmNlError::MultiaddressListenError(listen_addr_quic.to_string())
571 })?;
572 },
573 }
574
575 for peer_info in self.boot_nodes {
577 if let Some(peer_id) = string_to_peer_id(&peer_info.0) {
579 if let Ok(multiaddr) = peer_info.1.parse::<Multiaddr>() {
581 if !self.blacklist.list.iter().any(|&id| id == peer_id) {
583 swarm
584 .behaviour_mut()
585 .kademlia
586 .add_address(&peer_id, multiaddr.clone());
587
588 println!("Dailing {}", multiaddr);
589
590 swarm
592 .dial(multiaddr.clone().with(Protocol::P2p(peer_id)))
593 .map_err(|_| {
594 SwarmNlError::RemotePeerDialError(multiaddr.to_string())
595 })?;
596 }
597 }
598 }
599 }
600
601 let _ = swarm.behaviour_mut().kademlia.bootstrap();
603
604 swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));
606
607 for peer_id in &self.blacklist.list {
609 swarm.behaviour_mut().gossipsub.blacklist_peer(peer_id);
610 }
611
612 let (application_sender, network_receiver) =
619 mpsc::channel::<StreamData>(STREAM_BUFFER_CAPACITY);
620 let (network_sender, application_receiver) =
621 mpsc::channel::<StreamData>(STREAM_BUFFER_CAPACITY);
622
623 let peer_id = self.keypair.public().to_peer_id();
628
629 let mut timeouts = HashMap::<PeerId, u16>::new();
631 timeouts.insert(peer_id.clone(), 0);
632
633 let mut outbound_errors = HashMap::<PeerId, u16>::new();
635 outbound_errors.insert(peer_id.clone(), 0);
636
637 let manager = PingManager {
639 timeouts,
640 outbound_errors,
641 };
642
643 let ping_info = PingInfo {
645 policy: self.ping.1,
646 manager,
647 };
648
649 let gossip_info = GossipsubInfo {
651 blacklist: self.blacklist,
652 };
653
654 let repl_info = ReplInfo {
656 state: Arc::new(Mutex::new(Default::default())),
657 };
658
659 let stream_id = StreamId::new();
661 let stream_request_buffer =
662 Arc::new(Mutex::new(StreamRequestBuffer::new(self.stream_size)));
663 let stream_response_buffer =
664 Arc::new(Mutex::new(StreamResponseBuffer::new(self.stream_size)));
665
666 let network_info = NetworkInfo {
668 id: self.network_id,
669 ping: ping_info,
670 gossipsub: gossip_info,
671 rpc_handler_fn: self.request_response.1,
672 gossip_filter_fn: self.gossipsub.1,
673 replication: repl_info,
674 sharding: self.sharding.clone(),
675 };
676
677 let network_core = Core {
679 keypair: self.keypair,
680 application_sender,
681 stream_request_buffer: stream_request_buffer.clone(),
682 stream_response_buffer: stream_response_buffer.clone(),
683 current_stream_id: Arc::new(Mutex::new(stream_id)),
684 event_queue: DataQueue::new(),
686 replica_buffer: Arc::new(ReplicaBufferQueue::new(self.replication_cfg.clone())),
687 network_info,
688 };
689
690 if !self.sharding.id.is_empty() {
692 let mut core = network_core.clone();
694 #[cfg(feature = "async-std-runtime")]
695 async_std::task::spawn(
696 async move { core.init_sharding(self.sharding.id.clone()).await },
697 );
698
699 #[cfg(feature = "tokio-runtime")]
700 tokio::task::spawn(async move { core.init_sharding(self.sharding.id.clone()).await });
701 }
702
703 #[cfg(feature = "async-std-runtime")]
705 async_std::task::spawn(Core::handle_async_operations(
706 swarm,
707 network_sender,
708 network_receiver,
709 network_core.clone(),
710 ));
711
712 #[cfg(feature = "tokio-runtime")]
714 tokio::task::spawn(Core::handle_async_operations(
715 swarm,
716 network_sender,
717 network_receiver,
718 network_core.clone(),
719 ));
720
721 #[cfg(feature = "async-std-runtime")]
723 async_std::task::spawn(Core::handle_network_response(
724 application_receiver,
725 network_core.clone(),
726 ));
727
728 #[cfg(feature = "tokio-runtime")]
730 tokio::task::spawn(Core::handle_network_response(
731 application_receiver,
732 network_core.clone(),
733 ));
734
735 #[cfg(feature = "async-std-runtime")]
737 async_std::task::sleep(Duration::from_secs(BOOT_WAIT_TIME)).await;
738
739 #[cfg(feature = "tokio-runtime")]
741 tokio::time::sleep(Duration::from_secs(BOOT_WAIT_TIME)).await;
742
743 Ok(network_core)
744 }
745}
746
747#[derive(Clone)]
749pub struct Core {
750 keypair: Keypair,
751 application_sender: Sender<StreamData>,
754 stream_response_buffer: Arc<Mutex<StreamResponseBuffer>>,
763 stream_request_buffer: Arc<Mutex<StreamRequestBuffer>>,
765 current_stream_id: Arc<Mutex<StreamId>>,
767 event_queue: DataQueue<NetworkEvent>,
769 replica_buffer: Arc<ReplicaBufferQueue>,
771 network_info: NetworkInfo,
773}
774
775impl Core {
776 pub const GOSSIP_MESSAGE_SEPARATOR: &'static str = "~~##~~";
778
779 pub const REPL_GOSSIP_FLAG: &'static str = "REPL_GOSSIP_FLAG__@@";
782
783 pub const RPC_DATA_FORWARDING_FLAG: &'static str = "RPC_DATA_FORWARDING_FLAG__@@";
786
787 pub const STRONG_CONSISTENCY_FLAG: &'static str = "STRONG_CON__@@";
791
792 pub const EVENTUAL_CONSISTENCY_FLAG: &'static str = "EVENTUAL_CON_@@";
795
796 pub const RPC_SYNC_PULL_FLAG: &'static str = "RPC_SYNC_PULL_FLAG__@@";
799
800 pub const SHARD_RPC_SYNC_FLAG: &'static str = "SHARD_RPC_SYNC_FLAG__@@";
802
803 pub const SHARD_GOSSIP_JOIN_FLAG: &'static str = "SHARD_GOSSIP_JOIN_FLAG__@@";
805
806 pub const SHARD_GOSSIP_EXIT_FLAG: &'static str = "SHARD_GOSSIP_EXIT_FLAG__@@";
808
809 pub const SHARD_RPC_REQUEST_FLAG: &'static str = "SHARD_RPC_REQUEST_FLAG__@@";
811
812 pub const FIELD_DELIMITER: &'static str = "_@_";
814
815 pub const ENTRY_DELIMITER: &'static str = "@@@";
817
818 pub const DATA_DELIMITER: &'static str = "$$";
820
821 pub fn save_keypair_offline<T: AsRef<Path> + ?Sized>(&self, config_file_path: &T) -> bool {
827 if let Err(_) = fs::metadata(config_file_path) {
829 fs::File::create(config_file_path).expect("could not create config file");
830 }
831
832 if KeyType::RSA != self.keypair.key_type() {
834 if let Ok(protobuf_keypair) = self.keypair.to_protobuf_encoding() {
835 return util::write_config(
837 "auth",
838 "protobuf_keypair",
839 &format!("{:?}", protobuf_keypair),
840 config_file_path,
841 ) && util::write_config(
842 "auth",
843 "Crypto",
844 &format!("{}", self.keypair.key_type()),
845 config_file_path,
846 );
847 }
848 }
849
850 false
851 }
852
853 pub fn peer_id(&self) -> PeerId {
855 self.keypair.public().to_peer_id()
856 }
857
858 pub async fn events(&mut self) -> IntoIter<NetworkEvent> {
860 let events = self.event_queue.into_inner().await.into_iter();
861
862 self.event_queue.drain().await;
864 events
865 }
866
867 pub async fn next_event(&mut self) -> Option<NetworkEvent> {
869 self.event_queue.pop().await
870 }
871
872 pub async fn replica_peers(&mut self, replica_network: &str) -> Vec<PeerId> {
874 let mut peers = Vec::new();
875
876 let request = AppData::GossipsubGetInfo;
878 if let Ok(response) = self.query_network(request).await {
879 if let AppResponse::GossipsubGetInfo { mesh_peers, .. } = response {
880 for (peer_id, networks) in mesh_peers {
881 if networks.contains(&replica_network.to_string()) {
882 peers.push(peer_id);
883 }
884 }
885 }
886 }
887 peers
888 }
889
890 pub async fn send_to_network(&mut self, app_request: AppData) -> Option<StreamId> {
894 let stream_id = StreamId::next(*self.current_stream_id.lock().await);
896 let request = StreamData::FromApplication(stream_id, app_request.clone());
897
898 match app_request {
900 AppData::KademliaDeleteRecord { .. } | AppData::KademliaStopProviding { .. } => {
902 let _ = self.application_sender.send(request).await;
904 return None;
905 },
906 _ => {
908 let mut stream_request_buffer = self.stream_request_buffer.lock().await;
910
911 if !stream_request_buffer.insert(stream_id) {
913 return None;
915 }
916
917 if let Ok(_) = self.application_sender.send(request).await {
919 *self.current_stream_id.lock().await = stream_id;
921 return Some(stream_id);
922 } else {
923 return None;
924 }
925 },
926 }
927 }
928
929 pub async fn recv_from_network(&mut self, stream_id: StreamId) -> NetworkResult<AppResponse> {
933 #[cfg(feature = "async-std-runtime")]
934 {
935 let channel = self.clone();
936 let response_handler = async_std::task::spawn(async move {
937 let mut loop_count = 0;
938 loop {
939 let mut buffer_guard = channel.stream_response_buffer.lock().await;
941
942 if let Some(result) = buffer_guard.remove(&stream_id) {
944 return Ok(result);
945 }
946
947 if loop_count < 10 {
949 loop_count += 1;
950 } else {
951 return Err(NetworkError::NetworkReadTimeout);
952 }
953
954 async_std::task::sleep(Duration::from_secs(TASK_SLEEP_DURATION)).await;
956 }
957 });
958
959 match response_handler.await {
961 Ok(result) => result,
962 Err(_) => Err(NetworkError::NetworkReadTimeout),
963 }
964 }
965
966 #[cfg(feature = "tokio-runtime")]
967 {
968 let channel = self.clone();
969 let response_handler = tokio::task::spawn(async move {
970 let mut loop_count = 0;
971 loop {
972 let mut buffer_guard = channel.stream_response_buffer.lock().await;
974
975 if let Some(result) = buffer_guard.remove(&stream_id) {
977 return Ok(result);
978 }
979
980 if loop_count < 10 {
982 loop_count += 1;
983 } else {
984 return Err(NetworkError::NetworkReadTimeout);
985 }
986
987 tokio::time::sleep(Duration::from_secs(TASK_SLEEP_DURATION)).await;
989 }
990 });
991
992 match response_handler.await {
994 Ok(result) => result?,
995 Err(_) => Err(NetworkError::NetworkReadTimeout),
996 }
997 }
998 }
999
1000 pub async fn query_network(&mut self, request: AppData) -> NetworkResult<AppResponse> {
1008 if let Some(stream_id) = self.send_to_network(request).await {
1010 self.recv_from_network(stream_id).await
1012 } else {
1013 Err(NetworkError::StreamBufferOverflow)
1014 }
1015 }
1016
1017 async fn init_sharding(&mut self, network_id: String) {
1019 let gossip_request = AppData::GossipsubJoinNetwork(network_id.clone());
1022 let _ = self.query_network(gossip_request).await;
1023 }
1024
1025 async fn update_shard_state(&mut self, peer: PeerId, shard_id: ShardId, join: bool) {
1028 let mut shard_state = self.network_info.sharding.state.lock().await;
1030 let shard_entry = shard_state
1031 .entry(shard_id.clone())
1032 .or_insert(Default::default());
1033
1034 if join {
1036 shard_entry.insert(peer);
1037 } else {
1038 shard_entry.retain(|entry| entry != &peer);
1040
1041 if shard_entry.is_empty() {
1043 shard_state.remove(&shard_id.to_string());
1044 }
1045 }
1046 }
1047
1048 async fn publish_shard_state(&mut self, peer: PeerId) {
1051 let shard_state = self.network_info.sharding.state.lock().await.clone();
1053
1054 let bytes = shard_image_to_bytes(shard_state).unwrap_or_default();
1055 let message = vec![
1056 Core::SHARD_RPC_SYNC_FLAG.as_bytes().to_vec(), bytes, ];
1059
1060 let rpc_request = AppData::SendRpc {
1062 keys: message,
1063 peer,
1064 };
1065
1066 let _ = self.query_network(rpc_request).await;
1067 }
1068
1069 async fn handle_incoming_repl_data(&mut self, repl_network: String, repl_data: ReplBufferData) {
1072 let replica_data = repl_data.clone();
1075 self.event_queue
1076 .push(NetworkEvent::ReplicaDataIncoming {
1077 data: replica_data.data,
1078 network: repl_network.clone(),
1079 outgoing_timestamp: replica_data.outgoing_timestamp,
1080 incoming_timestamp: replica_data.incoming_timestamp,
1081 message_id: replica_data.message_id,
1082 source: replica_data.sender,
1083 })
1084 .await;
1085
1086 if let Some(repl_network_data) = self
1088 .network_info
1089 .replication
1090 .state
1091 .lock()
1092 .await
1093 .get_mut(&repl_network)
1094 {
1095 (*repl_network_data).lamport_clock =
1097 cmp::max(repl_network_data.lamport_clock, repl_data.lamport_clock)
1098 .saturating_add(1);
1099
1100 self.replica_buffer
1102 .push(self.clone(), repl_network, repl_data)
1103 .await;
1104 }
1105 }
1106
1107 async fn handle_incoming_shard_data(
1110 &mut self,
1111 shard_id: String,
1112 source: PeerId,
1113 incoming_data: ByteVector,
1114 ) {
1115 self.event_queue
1117 .push(NetworkEvent::IncomingForwardedData {
1118 data: byte_vec_to_string_vec(incoming_data.clone()),
1119 source,
1120 })
1121 .await;
1122
1123 let _ = self.replicate(incoming_data, &shard_id).await;
1125 }
1126
1127 pub async fn consume_repl_data(&mut self, replica_network: &str) -> Option<ReplBufferData> {
1129 self.replica_buffer
1130 .pop_front(self.clone(), replica_network)
1131 .await
1132 }
1133
1134 pub async fn join_repl_network(&mut self, repl_network: String) -> NetworkResult<()> {
1140 let mut cfg = self.network_info.replication.state.lock().await;
1142 cfg.entry(repl_network.clone()).or_insert(ReplConfigData {
1143 lamport_clock: 0,
1144 last_clock: 0,
1145 nodes: Default::default(),
1146 });
1147
1148 self.replica_buffer.init(repl_network.clone()).await;
1150
1151 drop(cfg);
1153
1154 let gossip_request = AppData::GossipsubJoinNetwork(repl_network.clone());
1156 let _ = self.query_network(gossip_request).await?;
1157
1158 if let ConsistencyModel::Eventual = self.replica_buffer.consistency_model() {
1160 let core = self.clone();
1162 let network = repl_network.clone();
1163 #[cfg(feature = "tokio-runtime")]
1164 tokio::task::spawn(async move {
1165 let buffer = core.replica_buffer.clone();
1166 buffer.sync_with_eventual_consistency(core, network).await;
1167 });
1168
1169 #[cfg(feature = "async-std-runtime")]
1170 async_std::task::spawn(async move {
1171 let buffer = core.replica_buffer.clone();
1172 buffer.sync_with_eventual_consistency(core, network).await;
1173 });
1174 }
1175
1176 Ok(())
1177 }
1178
1179 pub async fn leave_repl_network(&mut self, repl_network: String) -> NetworkResult<AppResponse> {
1182 let gossip_request = AppData::GossipsubExitNetwork(repl_network.clone());
1184 self.query_network(gossip_request).await
1185 }
1186
1187 pub async fn replicate_buffer(
1190 &self,
1191 repl_network: String,
1192 replica_node: PeerId,
1193 ) -> Result<(), NetworkError> {
1194 if self
1196 .network_info
1197 .replication
1198 .state
1199 .lock()
1200 .await
1201 .contains_key(&repl_network)
1202 {
1203 self.replica_buffer
1205 .replicate_buffer(self.clone(), repl_network, replica_node)
1206 .await
1207 } else {
1208 Err(NetworkError::MissingReplNetwork)
1209 }
1210 }
1211
1212 pub async fn replicate(
1214 &mut self,
1215 mut replica_data: ByteVector,
1216 replica_network: &str,
1217 ) -> NetworkResult<()> {
1218 let replica_network_data = {
1220 let mut state = self.network_info.replication.state.lock().await;
1221 if let Some(data) = state.get_mut(replica_network) {
1222 data.lamport_clock = data.lamport_clock.saturating_add(1);
1224 data.clone()
1225 } else {
1226 return Err(NetworkError::MissingReplNetwork);
1227 }
1228 };
1229
1230 let mut message = vec![
1232 Core::REPL_GOSSIP_FLAG.as_bytes().to_vec(), get_unix_timestamp().to_string().into(), replica_network_data.lamport_clock.to_string().into(), replica_network.to_owned().into(), ];
1237 message.append(&mut replica_data);
1238
1239 let gossip_request = AppData::GossipsubBroadcastMessage {
1241 topic: replica_network.to_owned(),
1242 message,
1243 };
1244
1245 self.query_network(gossip_request).await?;
1247
1248 Ok(())
1249 }
1250
1251 async fn handle_network_response(mut receiver: Receiver<StreamData>, network_core: Core) {
1254 loop {
1255 select! {
1256 response = receiver.select_next_some() => {
1257 let mut buffer_guard = network_core.stream_response_buffer.lock().await;
1259 match response {
1260 StreamData::ToApplication(stream_id, response) => match response {
1262 AppResponse::Error(error) => buffer_guard.insert(stream_id, Err(error)),
1264 res @ AppResponse::Echo(..) => buffer_guard.insert(stream_id, Ok(res)),
1266 res @ AppResponse::DailPeerSuccess(..) => buffer_guard.insert(stream_id, Ok(res)),
1267 res @ AppResponse::KademliaStoreRecordSuccess => buffer_guard.insert(stream_id, Ok(res)),
1268 res @ AppResponse::KademliaLookupSuccess(..) => buffer_guard.insert(stream_id, Ok(res)),
1269 res @ AppResponse::KademliaGetProviders{..} => buffer_guard.insert(stream_id, Ok(res)),
1270 res @ AppResponse::KademliaNoProvidersFound => buffer_guard.insert(stream_id, Ok(res)),
1271 res @ AppResponse::KademliaGetRoutingTableInfo { .. } => buffer_guard.insert(stream_id, Ok(res)),
1272 res @ AppResponse::SendRpc(..) => buffer_guard.insert(stream_id, Ok(res)),
1273 res @ AppResponse::GetNetworkInfo{..} => buffer_guard.insert(stream_id, Ok(res)),
1274 res @ AppResponse::GossipsubBroadcastSuccess => buffer_guard.insert(stream_id, Ok(res)),
1275 res @ AppResponse::GossipsubJoinSuccess => buffer_guard.insert(stream_id, Ok(res)),
1276 res @ AppResponse::GossipsubExitSuccess => buffer_guard.insert(stream_id, Ok(res)),
1277 res @ AppResponse::GossipsubBlacklistSuccess => buffer_guard.insert(stream_id, Ok(res)),
1278 res @ AppResponse::GossipsubGetInfo{..} => buffer_guard.insert(stream_id, Ok(res)),
1279 },
1280 _ => false
1281 };
1282 }
1283 }
1284 }
1285 }
1286
1287 async fn handle_async_operations(
1294 mut swarm: Swarm<CoreBehaviour>,
1295 mut network_sender: Sender<StreamData>,
1296 mut receiver: Receiver<StreamData>,
1297 mut network_core: Core,
1298 ) {
1299 let data_queue_1 = DataQueue::new();
1301 let data_queue_2 = DataQueue::new();
1302 let data_queue_3 = DataQueue::new();
1303 let data_queue_4 = DataQueue::new();
1304
1305 let mut network_info = network_core.network_info.clone();
1307
1308 loop {
1310 select! {
1311 stream_data = receiver.next() => {
1313 match stream_data {
1314 Some(incoming_data) => {
1315 match incoming_data {
1316 StreamData::FromApplication(stream_id, app_data) => {
1317 let stream_id = stream_id;
1319 match app_data {
1320 AppData::Echo(message) => {
1322 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Echo(message))).await;
1324 },
1325 AppData::DailPeer(peer_id, multiaddr) => {
1326 if let Ok(multiaddr) = multiaddr.parse::<Multiaddr>() {
1327 swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr.clone());
1329 if let Ok(_) = swarm.dial(multiaddr.clone().with(Protocol::P2p(peer_id))) {
1330 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::DailPeerSuccess(multiaddr.to_string()))).await;
1332 } else {
1333 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::DailPeerError))).await;
1335 }
1336 }
1337 },
1338 AppData::KademliaStoreRecord { key, value, expiration_time, explicit_peers } => {
1340 let mut record = Record::new(key.clone(), value);
1342
1343
1344 record.expires = expiration_time;
1346
1347
1348 if let Ok(_) = swarm.behaviour_mut().kademlia.put_record(record.clone(), kad::Quorum::One) {
1350 let _ = swarm.behaviour_mut().kademlia.start_providing(RecordKey::new(&key));
1352
1353
1354 data_queue_1.push(stream_id).await;
1356
1357
1358 if let Some(explicit_peers) = explicit_peers {
1360 let peers = explicit_peers.iter().map(|peer_id_string| {
1362 PeerId::from_bytes(&peer_id_string.from_base58().unwrap_or_default())
1363 }).filter_map(Result::ok).collect::<Vec<_>>();
1364 swarm.behaviour_mut().kademlia.put_record_to(record, peers.into_iter(), kad::Quorum::One);
1365 }
1366 } else {
1367 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadStoreRecordError(key)))).await;
1369 }
1370 },
1371 AppData::KademliaLookupRecord { key } => {
1373 let _ = swarm.behaviour_mut().kademlia.get_record(key.clone().into());
1374
1375
1376 data_queue_2.push(stream_id).await;
1378 },
1379 AppData::KademliaGetProviders { key } => {
1381 swarm.behaviour_mut().kademlia.get_providers(key.clone().into());
1382
1383
1384 data_queue_3.push(stream_id).await;
1386 }
1387 AppData::KademliaStopProviding { key } => {
1389 swarm.behaviour_mut().kademlia.stop_providing(&key.into());
1390 }
1391 AppData::KademliaDeleteRecord { key } => {
1393 swarm.behaviour_mut().kademlia.remove_record(&key.into());
1394 }
1395 AppData::KademliaGetRoutingTableInfo => {
1397 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaGetRoutingTableInfo{protocol_id: network_info.id.to_string()})).await;
1399 },
1400 AppData::SendRpc { keys, peer } => {
1402 let rpc = Rpc::ReqResponse { data: keys.clone() };
1404
1405
1406 let _ = swarm
1408 .behaviour_mut()
1409 .request_response
1410 .send_request(&peer, rpc);
1411
1412
1413 data_queue_4.push(stream_id).await;
1415 },
1416 AppData::GetNetworkInfo => {
1418 let connected_peers = swarm.connected_peers().map(|peer| peer.to_owned()).collect::<Vec<_>>();
1420
1421 let external_addresses = swarm.listeners().map(|multiaddr| multiaddr.to_string()).collect::<Vec<_>>();
1423
1424 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GetNetworkInfo { peer_id: swarm.local_peer_id().clone(), connected_peers, external_addresses })).await;
1426 },
1427 AppData::GossipsubBroadcastMessage { message, topic } => {
1429 let topic_hash = TopicHash::from_raw(topic);
1431
1432 let message = message.join(Core::GOSSIP_MESSAGE_SEPARATOR.as_bytes());
1434
1435 let is_subscribed = swarm.behaviour().gossipsub.mesh_peers(&topic_hash).any(|peer| peer == swarm.local_peer_id());
1437
1438 if swarm
1440 .behaviour_mut().gossipsub
1441 .publish(topic_hash, message).is_ok() && !is_subscribed {
1442 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBroadcastSuccess)).await;
1444 } else {
1445 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubBroadcastMessageError))).await;
1447 }
1448 },
1449 AppData::GossipsubJoinNetwork(topic) => {
1451 let topic = IdentTopic::new(topic);
1453
1454 if swarm.behaviour_mut().gossipsub.subscribe(&topic).is_ok() {
1456 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubJoinSuccess)).await;
1458 } else {
1459 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubJoinNetworkError))).await;
1461 }
1462 },
1463 AppData::GossipsubGetInfo => {
1465 let subscribed_topics = swarm.behaviour().gossipsub.topics().map(|topic| topic.clone().into_string()).collect::<Vec<_>>();
1467
1468 let mesh_peers = swarm.behaviour().gossipsub.all_peers().map(|(peer, topics)| {
1470 (peer.to_owned(), topics.iter().map(|&t| t.clone().as_str().to_owned()).collect::<Vec<_>>())
1471 }).collect::<Vec<_>>();
1472
1473 let blacklist = network_info.gossipsub.blacklist.into_inner();
1475
1476 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubGetInfo { topics: subscribed_topics, mesh_peers, blacklist })).await;
1478 },
1479 AppData::GossipsubExitNetwork(topic) => {
1481 let topic = IdentTopic::new(topic);
1483
1484 if swarm.behaviour_mut().gossipsub.unsubscribe(&topic).is_ok() {
1486 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubExitSuccess)).await;
1488 } else {
1489 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubJoinNetworkError))).await;
1491 }
1492 }
1493 AppData::GossipsubBlacklistPeer(peer) => {
1495 swarm.behaviour_mut().gossipsub.blacklist_peer(&peer);
1497
1498 network_info.gossipsub.blacklist.list.insert(peer);
1500
1501 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBlacklistSuccess)).await;
1503 },
1504 AppData::GossipsubFilterBlacklist(peer) => {
1506 swarm.behaviour_mut().gossipsub.remove_blacklisted_peer(&peer);
1508
1509 network_info.gossipsub.blacklist.list.remove(&peer);
1511
1512 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBlacklistSuccess)).await;
1514 },
1515 }
1516 }
1517 _ => {}
1518 }
1519 },
1520 _ => {}
1521 }
1522 },
1523 swarm_event = swarm.next() => {
1524 match swarm_event {
1525 Some(event) => {
1526 match event {
1527 SwarmEvent::NewListenAddr {
1528 listener_id,
1529 address,
1530 } => {
1531 network_core.event_queue.push(NetworkEvent::NewListenAddr{
1533 local_peer_id: swarm.local_peer_id().to_owned(),
1534 listener_id,
1535 address
1536 }).await;
1537 }
1538 SwarmEvent::Behaviour(event) => match event {
1539 CoreEvent::Ping(ping::Event {
1541 peer,
1542 connection: _,
1543 result,
1544 }) => {
1545 match result {
1546 Ok(duration) => {
1548 if let Some(err_count) =
1553 network_info.ping.manager.outbound_errors.get(&peer)
1554 {
1555 let new_err_count = (err_count / 2) as u16;
1556 network_info
1557 .ping
1558 .manager
1559 .outbound_errors
1560 .insert(peer, new_err_count);
1561 }
1562
1563 if let Some(timeout_err_count) =
1565 network_info.ping.manager.timeouts.get(&peer)
1566 {
1567 let new_err_count = (timeout_err_count / 2) as u16;
1568 network_info
1569 .ping
1570 .manager
1571 .timeouts
1572 .insert(peer, new_err_count);
1573 }
1574
1575 network_core.event_queue.push(NetworkEvent::OutboundPingSuccess{
1577 peer_id: peer,
1578 duration
1579 }).await;
1580 }
1581 Err(err_type) => {
1583 match network_info.ping.policy {
1585 PingErrorPolicy::NoDisconnect => {
1586 }
1588 PingErrorPolicy::DisconnectAfterMaxErrors(max_errors) => {
1589 let err_count = network_info
1593 .ping
1594 .manager
1595 .outbound_errors
1596 .entry(peer)
1597 .or_insert(0);
1598
1599 if *err_count != max_errors {
1600 let _ = swarm.disconnect_peer_id(peer);
1602
1603 network_info
1605 .ping
1606 .manager
1607 .outbound_errors
1608 .remove(&peer);
1609 } else {
1610 *err_count += 1;
1612 }
1613 }
1614 PingErrorPolicy::DisconnectAfterMaxTimeouts(
1615 max_timeout_errors,
1616 ) => {
1617 if let Failure::Timeout = err_type {
1621 let err_count = network_info
1623 .ping
1624 .manager
1625 .timeouts
1626 .entry(peer)
1627 .or_insert(0);
1628
1629 if *err_count != max_timeout_errors {
1630 let _ = swarm.disconnect_peer_id(peer);
1632
1633 network_info
1635 .ping
1636 .manager
1637 .timeouts
1638 .remove(&peer);
1639 } else {
1640 *err_count += 1;
1642 }
1643 }
1644 }
1645 }
1646
1647 network_core.event_queue.push(NetworkEvent::OutboundPingError{
1649 peer_id: peer
1650 }).await;
1651 }
1652 }
1653 }
1654 CoreEvent::Kademlia(event) => match event {
1656 kad::Event::OutboundQueryProgressed { result, .. } => match result {
1657 kad::QueryResult::GetProviders(Ok(success)) => {
1658 match success {
1659 kad::GetProvidersOk::FoundProviders { key, providers, .. } => {
1660 let peer_id_strings = providers.iter().map(|peer_id| {
1662 peer_id.to_base58()
1663 }).collect::<Vec<_>>();
1664
1665 if let Some(stream_id) = data_queue_3.pop().await {
1667 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaGetProviders{ key: key.to_vec(), providers: peer_id_strings })).await;
1669 }
1670 },
1671 kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. } => {
1673 if let Some(stream_id) = data_queue_3.pop().await {
1675 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaNoProvidersFound)).await;
1677 }
1678 }
1679 }
1680 },
1681
1682 kad::QueryResult::GetProviders(Err(_)) => {
1683 if let Some(stream_id) = data_queue_3.pop().await {
1685 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaNoProvidersFound)).await;
1687 }
1688 },
1689 kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(
1690 kad::PeerRecord { record:kad::Record{ value, .. }, .. },
1691 ))) => {
1692 if let Some(stream_id) = data_queue_2.pop().await {
1694 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaLookupSuccess(value))).await;
1696 }
1697 }
1698 kad::QueryResult::GetRecord(Ok(_)) => {
1699 if let Some(stream_id) = data_queue_2.pop().await {
1701 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadFetchRecordError(vec![])))).await;
1703 }
1704 },
1705 kad::QueryResult::GetRecord(Err(e)) => {
1706 let key = match e {
1707 kad::GetRecordError::NotFound { key, .. } => key,
1708 kad::GetRecordError::QuorumFailed { key, .. } => key,
1709 kad::GetRecordError::Timeout { key } => key,
1710 };
1711
1712 if let Some(stream_id) = data_queue_2.pop().await {
1714 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadFetchRecordError(key.to_vec())))).await;
1716 }
1717 }
1718 kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
1719 if let Some(stream_id) = data_queue_1.pop().await {
1721 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaStoreRecordSuccess)).await;
1723 }
1724
1725 network_core.event_queue.push(NetworkEvent::KademliaPutRecordSuccess{
1727 key: key.to_vec()
1728 }).await;
1729 }
1730 kad::QueryResult::PutRecord(Err(e)) => {
1731 let key = match e {
1732 kad::PutRecordError::QuorumFailed { key, .. } => key,
1733 kad::PutRecordError::Timeout { key, .. } => key,
1734 };
1735
1736 if let Some(stream_id) = data_queue_1.pop().await {
1737 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadStoreRecordError(key.to_vec())))).await;
1739 }
1740
1741 network_core.event_queue.push(NetworkEvent::KademliaPutRecordError).await;
1743 }
1744 kad::QueryResult::StartProviding(Ok(kad::AddProviderOk {
1745 key,
1746 })) => {
1747 network_core.event_queue.push(NetworkEvent::KademliaStartProvidingSuccess{
1749 key: key.to_vec()
1750 }).await;
1751 }
1752 kad::QueryResult::StartProviding(Err(_)) => {
1753 network_core.event_queue.push(NetworkEvent::KademliaStartProvidingError).await;
1755 }
1756 _ => {}
1757 }
1758 kad::Event::RoutingUpdated { peer, .. } => {
1759 network_core.event_queue.push(NetworkEvent::RoutingTableUpdated{
1761 peer_id: peer
1762 }).await;
1763 }
1764 _ => {}
1766 },
1767 CoreEvent::Identify(event) => match event {
1769 identify::Event::Received { peer_id, info } => {
1770 network_core.event_queue.push(NetworkEvent::IdentifyInfoReceived {
1772 peer_id,
1773 info: IdentifyInfo {
1774 public_key: info.public_key,
1775 listen_addrs: info.listen_addrs.clone(),
1776 protocols: info.protocols,
1777 observed_addr: info.observed_addr
1778 }
1779 }).await;
1780
1781 if info.protocol_version != network_info.id.as_ref() {
1783 let _ = swarm.disconnect_peer_id(peer_id);
1785 } else {
1786 let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, info.listen_addrs[0].clone());
1788 }
1789 }
1790 _ => {}
1792 },
1793 CoreEvent::RequestResponse(event) => match event {
1795 request_response::Event::Message { peer, message } => match message {
1796 request_response::Message::Request { request_id: _, request, channel } => {
1798 match request {
1800 Rpc::ReqResponse { data } => {
1801 let byte_str = String::from_utf8_lossy(&data[0]);
1802 match byte_str.as_ref() {
1803 Core::RPC_SYNC_PULL_FLAG => {
1805 let repl_network = String::from_utf8(data[1].clone()).unwrap_or_default();
1807
1808 let requested_msgs = network_core.replica_buffer.pull_missing_data(repl_network, &data[2..]).await;
1810
1811 let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: requested_msgs });
1813 }
1814 Core::SHARD_RPC_SYNC_FLAG => {
1816 let incoming_state = bytes_to_shard_image(data[1].clone());
1818
1819 let mut current_shard_state = network_core.network_info.sharding.state.lock().await;
1821 merge_shard_states(&mut current_shard_state, incoming_state);
1822
1823 let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: Default::default() });
1825 }
1826 Core::RPC_DATA_FORWARDING_FLAG => {
1828 let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: Default::default() });
1830
1831 let shard_id = String::from_utf8_lossy(&data[1]).to_string();
1833 let mut core = network_core.clone();
1834 let incoming_data: ByteVector = data[2..].into();
1835
1836 #[cfg(feature = "tokio-runtime")]
1837 tokio::task::spawn(async move {
1838 let _ = core.handle_incoming_shard_data(shard_id, peer, incoming_data).await;
1839 });
1840
1841 #[cfg(feature = "async-std-runtime")]
1842 async_std::task::spawn(async move {
1843 let _ = core.handle_incoming_shard_data(shard_id, peer, incoming_data).await;
1844 });
1845 }
1846 Core::SHARD_RPC_REQUEST_FLAG => {
1848 let response_data = network_info.sharding.local_storage.lock().await.fetch_data(data[1..].into());
1850 let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: response_data });
1852 }
1853 _ => {
1855 let response_data = (network_info.rpc_handler_fn)(data);
1857 let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: response_data });
1859 }
1860 }
1861 }
1862 }
1863 },
1864 request_response::Message::Response { response, .. } => {
1866 if let Some(stream_id) = data_queue_4.pop().await {
1868 match response {
1869 Rpc::ReqResponse { data } => {
1870 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::SendRpc(data))).await;
1872 },
1873 }
1874 }
1875 },
1876 },
1877 request_response::Event::OutboundFailure { .. } => {
1878 if let Some(stream_id) = data_queue_4.pop().await {
1880 let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::RpcDataFetchError))).await;
1882 }
1883 },
1884 _ => {}
1885 },
1886 CoreEvent::Gossipsub(event) => match event {
1888 gossipsub::Event::Message { propagation_source, message_id, message } => {
1890 let data_string = String::from_utf8_lossy(&message.data).to_string();
1893 let gossip_data = data_string.split(Core::GOSSIP_MESSAGE_SEPARATOR).map(|msg| msg.to_string()).collect::<Vec<_>>();
1894 match gossip_data[0].as_str() {
1895 Core::REPL_GOSSIP_FLAG => {
1897 let queue_data = ReplBufferData {
1899 data: gossip_data[4..].to_owned(),
1900 lamport_clock: gossip_data[2].parse::<u64>().unwrap_or(0), outgoing_timestamp: gossip_data[1].parse::<u64>().unwrap_or(0),
1902 incoming_timestamp: get_unix_timestamp(),
1903 message_id: message_id.to_string(),
1904 sender: if let Some(peer) = message.source { peer.clone() } else { propagation_source.clone() },
1905 confirmations: if network_core.replica_buffer.consistency_model() == ConsistencyModel::Eventual {
1906 None
1908 } else {
1909 Some(1)
1911 }
1912 };
1913
1914 let mut core = network_core.clone();
1916 let queue_data = queue_data.clone();
1917 let data = gossip_data[3].clone().into();
1918
1919 #[cfg(feature = "tokio-runtime")]
1920 tokio::task::spawn(async move {
1921 let _ = core.handle_incoming_repl_data(data, queue_data).await;
1922 });
1923
1924 #[cfg(feature = "async-std-runtime")]
1925 async_std::task::spawn(async move {
1926 let _ = core.handle_incoming_repl_data(data, queue_data).await;
1927 });
1928 },
1929 Core::STRONG_CONSISTENCY_FLAG => {
1931 let core = network_core.clone();
1933 let data = gossip_data[2].clone().into();
1934 let network = gossip_data[1].to_owned();
1935
1936 #[cfg(feature = "tokio-runtime")]
1937 tokio::task::spawn(async move {
1938 let _ = core.replica_buffer.handle_data_confirmation(core.clone(), network, data).await;
1939 });
1940
1941 #[cfg(feature = "async-std-runtime")]
1942 async_std::task::spawn(async move {
1943 let _ = core.replica_buffer.handle_data_confirmation(core.clone(), network, data).await;
1944 });
1945 },
1946 Core::EVENTUAL_CONSISTENCY_FLAG => {
1948 let min_clock = gossip_data[3].parse::<u64>().unwrap_or_default();
1950 let max_clock = gossip_data[4].parse::<u64>().unwrap_or_default();
1952
1953 let core = network_core.clone();
1955 let repl_peer_id = gossip_data[1].clone();
1956 let repl_network = gossip_data[2].clone();
1957 let replica_data_state = gossip_data[5..].to_owned();
1958
1959 #[cfg(feature = "tokio-runtime")]
1960 tokio::task::spawn(async move {
1961 core.replica_buffer.sync_buffer_image(core.clone(), repl_peer_id, repl_network, (min_clock, max_clock), replica_data_state).await;
1962 });
1963
1964 #[cfg(feature = "async-std-runtime")]
1965 async_std::task::spawn(async move {
1966 core.replica_buffer.sync_buffer_image(core.clone(), repl_peer_id, repl_network, (min_clock, max_clock), replica_data_state).await;
1967 });
1968 }
1969 Core::SHARD_GOSSIP_JOIN_FLAG => {
1971 if let Ok(peer_id) = gossip_data[1].parse::<PeerId>() {
1973 let mut core = network_core.clone();
1975
1976 #[cfg(feature = "tokio-runtime")]
1977 tokio::task::spawn(async move {
1978 core.publish_shard_state(peer_id).await;
1979 });
1980
1981 #[cfg(feature = "async-std-runtime")]
1982 async_std::task::spawn(async move {
1983 core.publish_shard_state(peer_id).await;
1984 });
1985
1986 let _ = network_core.update_shard_state(peer_id, gossip_data[2].clone(), true ).await;
1988 }
1989 }
1990 Core::SHARD_GOSSIP_EXIT_FLAG => {
1992 if let Ok(peer_id) = gossip_data[1].parse::<PeerId>() {
1994 let _ = network_core.update_shard_state(peer_id, gossip_data[2].clone(), false ).await;
1995 }
1996 }
1997 _ => {
1999 if (network_info.gossip_filter_fn)(propagation_source.clone(), message_id, message.source, message.topic.to_string(), gossip_data.clone()) {
2001 network_core.event_queue.push(NetworkEvent::GossipsubIncomingMessageHandled { source: propagation_source, data: gossip_data }).await;
2003 }
2004 }
2006 }
2007 },
2008 gossipsub::Event::Subscribed { peer_id, topic } => {
2010 network_core.event_queue.push(NetworkEvent::GossipsubSubscribeMessageReceived { peer_id, topic: topic.to_string() }).await;
2012 },
2013 gossipsub::Event::Unsubscribed { peer_id, topic } => {
2015 network_core.event_queue.push(NetworkEvent::GossipsubUnsubscribeMessageReceived { peer_id, topic: topic.to_string() }).await;
2017 },
2018 _ => {},
2019 }
2020 },
2021 SwarmEvent::ConnectionEstablished {
2022 peer_id,
2023 connection_id,
2024 endpoint,
2025 num_established,
2026 concurrent_dial_errors: _,
2027 established_in,
2028 } => {
2029 if let ConnectedPoint::Listener { send_back_addr, .. } = endpoint.clone() {
2032 let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, send_back_addr);
2034 }
2035 network_core.event_queue.push(NetworkEvent::ConnectionEstablished {
2037 peer_id,
2038 connection_id,
2039 endpoint,
2040 num_established,
2041 established_in,
2042 }).await;
2043 }
2044 SwarmEvent::ConnectionClosed {
2045 peer_id,
2046 connection_id,
2047 endpoint,
2048 num_established,
2049 cause: _,
2050 } => {
2051 network_core.event_queue.push(NetworkEvent::ConnectionClosed {
2053 peer_id,
2054 connection_id,
2055 endpoint,
2056 num_established
2057 }).await;
2058 }
2059 SwarmEvent::ExpiredListenAddr {
2060 listener_id,
2061 address,
2062 } => {
2063 network_core.event_queue.push(NetworkEvent::ExpiredListenAddr {
2065 listener_id,
2066 address
2067 }).await;
2068 }
2069 SwarmEvent::ListenerClosed {
2070 listener_id,
2071 addresses,
2072 reason: _,
2073 } => {
2074 network_core.event_queue.push(NetworkEvent::ListenerClosed {
2076 listener_id,
2077 addresses
2078 }).await;
2079 }
2080 SwarmEvent::ListenerError {
2081 listener_id,
2082 error: _,
2083 } => {
2084 network_core.event_queue.push(NetworkEvent::ListenerError {
2086 listener_id,
2087 }).await;
2088 }
2089 SwarmEvent::Dialing {
2090 peer_id,
2091 connection_id,
2092 } => {
2093 network_core.event_queue.push(NetworkEvent::Dialing { peer_id, connection_id }).await;
2095 }
2096 SwarmEvent::NewExternalAddrCandidate { address } => {
2097 network_core.event_queue.push(NetworkEvent::NewExternalAddrCandidate { address }).await;
2099 }
2100 SwarmEvent::ExternalAddrConfirmed { address } => {
2101 network_core.event_queue.push(NetworkEvent::ExternalAddrConfirmed { address }).await;
2103 }
2104 SwarmEvent::ExternalAddrExpired { address } => {
2105 network_core.event_queue.push(NetworkEvent::ExternalAddrExpired { address }).await;
2107 }
2108 SwarmEvent::IncomingConnection {
2109 connection_id,
2110 local_addr,
2111 send_back_addr,
2112 } => {
2113 network_core.event_queue.push(NetworkEvent::IncomingConnection { connection_id, local_addr, send_back_addr }).await;
2115 }
2116 SwarmEvent::IncomingConnectionError {
2117 connection_id,
2118 local_addr,
2119 send_back_addr,
2120 error: _,
2121 } => {
2122 network_core.event_queue.push(NetworkEvent::IncomingConnectionError { connection_id, local_addr, send_back_addr }).await;
2124 }
2125 SwarmEvent::OutgoingConnectionError {
2126 connection_id,
2127 peer_id,
2128 error: _,
2129 } => {
2130 network_core.event_queue.push(NetworkEvent::OutgoingConnectionError { connection_id, peer_id }).await;
2132 }
2133 _ => {},
2134 }
2135 },
2136 _ => {}
2137 }
2138 }
2139 }
2140 }
2141 }
2142}