#![doc = include_str!("../../doc/core/NetworkBuilder.md")]
#![doc = include_str!("../../doc/core/ApplicationInteraction.md")]
#![doc = include_str!("../../doc/core/EventHandling.md")]
#![doc = include_str!("../../doc/core/Replication.md")]
use std::{
cmp,
collections::{vec_deque::IntoIter, BTreeSet, HashMap, HashSet},
fs,
net::IpAddr,
num::NonZeroU32,
path::Path,
sync::Arc,
time::Duration,
};
use base58::FromBase58;
use futures::{
channel::mpsc::{self, Receiver, Sender},
select, SinkExt, StreamExt,
};
use libp2p::{
gossipsub::{self, IdentTopic, TopicHash},
identify::{self},
kad::{self, store::MemoryStore, Mode, Record, RecordKey},
multiaddr::Protocol,
noise,
ping::{self, Failure},
request_response::{self, cbor::Behaviour, ProtocolSupport},
swarm::{NetworkBehaviour, SwarmEvent},
tcp, tls, yamux, Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
};
use replication::{
ConsistencyModel, ReplBufferData, ReplConfigData, ReplInfo, ReplNetworkConfig,
ReplicaBufferQueue,
};
use self::{
gossipsub_cfg::{Blacklist, GossipsubConfig, GossipsubInfo},
ping_config::*,
sharding::{DefaultShardStorage, ShardStorage, ShardingInfo},
};
use super::*;
use crate::{setup::BootstrapConfig, util::*};
#[cfg(feature = "async-std-runtime")]
use async_std::sync::Mutex;
#[cfg(feature = "tokio-runtime")]
use tokio::sync::Mutex;
pub(crate) mod prelude;
pub use prelude::*;
pub mod replication;
pub mod sharding;
#[cfg(test)]
mod tests;
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "CoreEvent")]
struct CoreBehaviour {
request_response: request_response::cbor::Behaviour<Rpc, Rpc>,
kademlia: kad::Behaviour<MemoryStore>,
ping: ping::Behaviour,
identify: identify::Behaviour,
gossipsub: gossipsub::Behaviour,
}
#[derive(Debug)]
enum CoreEvent {
Ping(ping::Event),
RequestResponse(request_response::Event<Rpc, Rpc>),
Kademlia(kad::Event),
Identify(identify::Event),
Gossipsub(gossipsub::Event),
}
impl From<ping::Event> for CoreEvent {
fn from(event: ping::Event) -> Self {
CoreEvent::Ping(event)
}
}
impl From<kad::Event> for CoreEvent {
fn from(event: kad::Event) -> Self {
CoreEvent::Kademlia(event)
}
}
impl From<identify::Event> for CoreEvent {
fn from(event: identify::Event) -> Self {
CoreEvent::Identify(event)
}
}
impl From<request_response::Event<Rpc, Rpc>> for CoreEvent {
fn from(event: request_response::Event<Rpc, Rpc>) -> Self {
CoreEvent::RequestResponse(event)
}
}
impl From<gossipsub::Event> for CoreEvent {
fn from(event: gossipsub::Event) -> Self {
CoreEvent::Gossipsub(event)
}
}
pub struct CoreBuilder {
network_id: StreamProtocol,
keypair: Keypair,
tcp_udp_port: (Port, Port),
boot_nodes: HashMap<PeerIdString, MultiaddrString>,
blacklist: Blacklist,
stream_size: usize,
ip_address: IpAddr,
keep_alive_duration: Seconds,
transport: TransportOpts,
ping: (ping::Behaviour, PingErrorPolicy),
kademlia: kad::Behaviour<kad::store::MemoryStore>,
identify: identify::Behaviour,
request_response: (Behaviour<Rpc, Rpc>, fn(RpcData) -> RpcData),
gossipsub: (
gossipsub::Behaviour,
fn(PeerId, MessageId, Option<PeerId>, String, Vec<String>) -> bool,
),
replication_cfg: ReplNetworkConfig,
sharding: ShardingInfo,
}
impl CoreBuilder {
pub fn with_config(config: BootstrapConfig) -> Self {
let network_id = DEFAULT_NETWORK_ID;
let default_transport = TransportOpts::TcpQuic {
tcp_config: TcpConfig::Default,
};
let peer_id = config.keypair().public().to_peer_id();
let mut cfg = kad::Config::default();
cfg.set_protocol_names(vec![StreamProtocol::new(network_id)]);
let kademlia = kad::Behaviour::new(peer_id, kad::store::MemoryStore::new(peer_id));
let cfg = identify::Config::new(network_id.to_owned(), config.keypair().public())
.with_push_listen_addr_updates(true);
let identify = identify::Behaviour::new(cfg);
let request_response_behaviour = Behaviour::new(
[(StreamProtocol::new(network_id), ProtocolSupport::Full)],
request_response::Config::default(),
);
let cfg = gossipsub::Config::default();
let gossipsub_behaviour = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(config.keypair()),
cfg,
)
.map_err(|_| SwarmNlError::GossipConfigError)
.unwrap();
let gossip_filter_fn = |_, _, _, _, _| true;
let rpc_handler_fn = |incoming_data: RpcData| incoming_data;
CoreBuilder {
network_id: StreamProtocol::new(network_id),
keypair: config.keypair(),
tcp_udp_port: config.ports(),
boot_nodes: config.bootnodes(),
blacklist: config.blacklist(),
stream_size: usize::MAX,
ip_address: IpAddr::V4(DEFAULT_IP_ADDRESS),
keep_alive_duration: DEFAULT_KEEP_ALIVE_DURATION,
transport: default_transport,
ping: (
Default::default(),
PingErrorPolicy::DisconnectAfterMaxTimeouts(20),
),
kademlia,
identify,
request_response: (request_response_behaviour, rpc_handler_fn),
gossipsub: (gossipsub_behaviour, gossip_filter_fn),
replication_cfg: ReplNetworkConfig::Default,
sharding: ShardingInfo {
id: Default::default(),
local_storage: Arc::new(Mutex::new(DefaultShardStorage)),
state: Default::default(),
},
}
}
pub fn with_network_id(self, protocol: String) -> Self {
if protocol.len() > MIN_NETWORK_ID_LENGTH.into() && protocol.starts_with("/") {
CoreBuilder {
network_id: StreamProtocol::try_from_owned(protocol.clone())
.map_err(|_| SwarmNlError::NetworkIdParseError(protocol))
.unwrap(),
..self
}
} else {
panic!("Could not parse provided network id");
}
}
pub fn listen_on(self, ip_address: IpAddr) -> Self {
CoreBuilder { ip_address, ..self }
}
pub fn with_idle_connection_timeout(self, keep_alive_duration: Seconds) -> Self {
CoreBuilder {
keep_alive_duration,
..self
}
}
pub fn with_stream_size(self, size: usize) -> Self {
CoreBuilder {
stream_size: size,
..self
}
}
pub fn with_ping(self, config: PingConfig) -> Self {
CoreBuilder {
ping: (
ping::Behaviour::new(
ping::Config::new()
.with_interval(config.interval)
.with_timeout(config.timeout),
),
config.err_policy,
),
..self
}
}
pub fn with_replication(mut self, repl_cfg: ReplNetworkConfig) -> Self {
self.replication_cfg = repl_cfg;
CoreBuilder { ..self }
}
pub fn with_sharding<T: ShardStorage + 'static>(
self,
network_id: String,
local_shard_storage: Arc<Mutex<T>>,
) -> Self {
CoreBuilder {
sharding: ShardingInfo {
id: network_id,
local_storage: local_shard_storage,
state: Default::default(),
},
..self
}
}
pub fn with_rpc(self, config: RpcConfig, handler: fn(RpcData) -> RpcData) -> Self {
CoreBuilder {
request_response: (
match config {
RpcConfig::Default => self.request_response.0,
RpcConfig::Custom {
timeout,
max_concurrent_streams,
} => Behaviour::new(
[(self.network_id.clone(), ProtocolSupport::Full)],
request_response::Config::default()
.with_request_timeout(timeout)
.with_max_concurrent_streams(max_concurrent_streams),
),
},
handler,
),
..self
}
}
pub fn with_kademlia(self, config: kad::Config) -> Self {
let peer_id = self.keypair.public().to_peer_id();
let store = kad::store::MemoryStore::new(peer_id);
let kademlia = kad::Behaviour::with_config(peer_id, store, config);
CoreBuilder { kademlia, ..self }
}
pub fn with_gossipsub(
self,
config: GossipsubConfig,
filter_fn: fn(PeerId, MessageId, Option<PeerId>, String, Vec<String>) -> bool,
) -> Self {
let behaviour = match config {
GossipsubConfig::Default => self.gossipsub.0,
GossipsubConfig::Custom { config, auth } => gossipsub::Behaviour::new(auth, config)
.map_err(|_| SwarmNlError::GossipConfigError)
.unwrap(),
};
CoreBuilder {
gossipsub: (behaviour, filter_fn),
..self
}
}
pub fn with_transports(self, transport: TransportOpts) -> Self {
CoreBuilder { transport, ..self }
}
pub fn network_id(&self) -> String {
self.network_id.to_string()
}
pub async fn build(self) -> SwarmNlResult<Core> {
#[cfg(feature = "async-std-runtime")]
let mut swarm = {
let swarm_builder: SwarmBuilder<_, _> = match self.transport {
TransportOpts::TcpQuic { tcp_config } => match tcp_config {
TcpConfig::Default => {
libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
.with_async_std()
.with_tcp(
tcp::Config::default(),
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.map_err(|_| {
SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
tcp_config: TcpConfig::Default,
})
})?
.with_quic()
.with_dns()
.await
.map_err(|_| SwarmNlError::DNSConfigError)?
},
TcpConfig::Custom {
ttl,
nodelay,
backlog,
} => {
let tcp_config = tcp::Config::default()
.ttl(ttl)
.nodelay(nodelay)
.listen_backlog(backlog);
libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
.with_async_std()
.with_tcp(
tcp_config,
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.map_err(|_| {
SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
tcp_config: TcpConfig::Custom {
ttl,
nodelay,
backlog,
},
})
})?
.with_quic()
.with_dns()
.await
.map_err(|_| SwarmNlError::DNSConfigError)?
},
},
};
swarm_builder
.with_behaviour(|_| CoreBehaviour {
ping: self.ping.0,
kademlia: self.kademlia,
identify: self.identify,
request_response: self.request_response.0,
gossipsub: self.gossipsub.0,
})
.map_err(|_| SwarmNlError::ProtocolConfigError)?
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(self.keep_alive_duration))
})
.build()
};
#[cfg(feature = "tokio-runtime")]
let mut swarm = {
let swarm_builder: SwarmBuilder<_, _> = match self.transport {
TransportOpts::TcpQuic { tcp_config } => match tcp_config {
TcpConfig::Default => {
libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
.with_tokio()
.with_tcp(
tcp::Config::default(),
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.map_err(|_| {
SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
tcp_config: TcpConfig::Default,
})
})?
.with_quic()
},
TcpConfig::Custom {
ttl,
nodelay,
backlog,
} => {
let tcp_config = tcp::Config::default()
.ttl(ttl)
.nodelay(nodelay)
.listen_backlog(backlog);
libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
.with_tokio()
.with_tcp(
tcp_config,
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.map_err(|_| {
SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
tcp_config: TcpConfig::Custom {
ttl,
nodelay,
backlog,
},
})
})?
.with_quic()
},
},
};
swarm_builder
.with_behaviour(|_| CoreBehaviour {
ping: self.ping.0,
kademlia: self.kademlia,
identify: self.identify,
request_response: self.request_response.0,
gossipsub: self.gossipsub.0,
})
.map_err(|_| SwarmNlError::ProtocolConfigError)?
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(self.keep_alive_duration))
})
.build()
};
match self.transport {
TransportOpts::TcpQuic { tcp_config: _ } => {
let listen_addr_tcp = Multiaddr::empty()
.with(match self.ip_address {
IpAddr::V4(address) => Protocol::from(address),
IpAddr::V6(address) => Protocol::from(address),
})
.with(Protocol::Tcp(self.tcp_udp_port.0));
let listen_addr_quic = Multiaddr::empty()
.with(match self.ip_address {
IpAddr::V4(address) => Protocol::from(address),
IpAddr::V6(address) => Protocol::from(address),
})
.with(Protocol::Udp(self.tcp_udp_port.1))
.with(Protocol::QuicV1);
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
swarm.listen_on(listen_addr_tcp.clone()).map_err(|_| {
SwarmNlError::MultiaddressListenError(listen_addr_tcp.to_string())
})?;
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
swarm.listen_on(listen_addr_quic.clone()).map_err(|_| {
SwarmNlError::MultiaddressListenError(listen_addr_quic.to_string())
})?;
},
}
for peer_info in self.boot_nodes {
if let Some(peer_id) = string_to_peer_id(&peer_info.0) {
if let Ok(multiaddr) = peer_info.1.parse::<Multiaddr>() {
if !self.blacklist.list.iter().any(|&id| id == peer_id) {
swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, multiaddr.clone());
println!("Dailing {}", multiaddr);
swarm
.dial(multiaddr.clone().with(Protocol::P2p(peer_id)))
.map_err(|_| {
SwarmNlError::RemotePeerDialError(multiaddr.to_string())
})?;
}
}
}
}
let _ = swarm.behaviour_mut().kademlia.bootstrap();
swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));
for peer_id in &self.blacklist.list {
swarm.behaviour_mut().gossipsub.blacklist_peer(peer_id);
}
let (application_sender, network_receiver) =
mpsc::channel::<StreamData>(STREAM_BUFFER_CAPACITY);
let (network_sender, application_receiver) =
mpsc::channel::<StreamData>(STREAM_BUFFER_CAPACITY);
let peer_id = self.keypair.public().to_peer_id();
let mut timeouts = HashMap::<PeerId, u16>::new();
timeouts.insert(peer_id.clone(), 0);
let mut outbound_errors = HashMap::<PeerId, u16>::new();
outbound_errors.insert(peer_id.clone(), 0);
let manager = PingManager {
timeouts,
outbound_errors,
};
let ping_info = PingInfo {
policy: self.ping.1,
manager,
};
let gossip_info = GossipsubInfo {
blacklist: self.blacklist,
};
let repl_info = ReplInfo {
state: Arc::new(Mutex::new(Default::default())),
};
let stream_id = StreamId::new();
let stream_request_buffer =
Arc::new(Mutex::new(StreamRequestBuffer::new(self.stream_size)));
let stream_response_buffer =
Arc::new(Mutex::new(StreamResponseBuffer::new(self.stream_size)));
let network_info = NetworkInfo {
id: self.network_id,
ping: ping_info,
gossipsub: gossip_info,
rpc_handler_fn: self.request_response.1,
gossip_filter_fn: self.gossipsub.1,
replication: repl_info,
sharding: self.sharding.clone(),
};
let network_core = Core {
keypair: self.keypair,
application_sender,
stream_request_buffer: stream_request_buffer.clone(),
stream_response_buffer: stream_response_buffer.clone(),
current_stream_id: Arc::new(Mutex::new(stream_id)),
event_queue: DataQueue::new(),
replica_buffer: Arc::new(ReplicaBufferQueue::new(self.replication_cfg.clone())),
network_info,
};
if !self.sharding.id.is_empty() {
let mut core = network_core.clone();
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(
async move { core.init_sharding(self.sharding.id.clone()).await },
);
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(async move { core.init_sharding(self.sharding.id.clone()).await });
}
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(Core::handle_async_operations(
swarm,
network_sender,
network_receiver,
network_core.clone(),
));
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(Core::handle_async_operations(
swarm,
network_sender,
network_receiver,
network_core.clone(),
));
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(Core::handle_network_response(
application_receiver,
network_core.clone(),
));
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(Core::handle_network_response(
application_receiver,
network_core.clone(),
));
#[cfg(feature = "async-std-runtime")]
async_std::task::sleep(Duration::from_secs(BOOT_WAIT_TIME)).await;
#[cfg(feature = "tokio-runtime")]
tokio::time::sleep(Duration::from_secs(BOOT_WAIT_TIME)).await;
Ok(network_core)
}
}
#[derive(Clone)]
pub struct Core {
keypair: Keypair,
application_sender: Sender<StreamData>,
stream_response_buffer: Arc<Mutex<StreamResponseBuffer>>,
stream_request_buffer: Arc<Mutex<StreamRequestBuffer>>,
current_stream_id: Arc<Mutex<StreamId>>,
event_queue: DataQueue<NetworkEvent>,
replica_buffer: Arc<ReplicaBufferQueue>,
network_info: NetworkInfo,
}
impl Core {
pub const GOSSIP_MESSAGE_SEPARATOR: &'static str = "~~##~~";
pub const REPL_GOSSIP_FLAG: &'static str = "REPL_GOSSIP_FLAG__@@";
pub const RPC_DATA_FORWARDING_FLAG: &'static str = "RPC_DATA_FORWARDING_FLAG__@@";
pub const STRONG_CONSISTENCY_FLAG: &'static str = "STRONG_CON__@@";
pub const EVENTUAL_CONSISTENCY_FLAG: &'static str = "EVENTUAL_CON_@@";
pub const RPC_SYNC_PULL_FLAG: &'static str = "RPC_SYNC_PULL_FLAG__@@";
pub const SHARD_RPC_SYNC_FLAG: &'static str = "SHARD_RPC_SYNC_FLAG__@@";
pub const SHARD_GOSSIP_JOIN_FLAG: &'static str = "SHARD_GOSSIP_JOIN_FLAG__@@";
pub const SHARD_GOSSIP_EXIT_FLAG: &'static str = "SHARD_GOSSIP_EXIT_FLAG__@@";
pub const SHARD_RPC_REQUEST_FLAG: &'static str = "SHARD_RPC_REQUEST_FLAG__@@";
pub const FIELD_DELIMITER: &'static str = "_@_";
pub const ENTRY_DELIMITER: &'static str = "@@@";
pub const DATA_DELIMITER: &'static str = "$$";
pub fn save_keypair_offline<T: AsRef<Path> + ?Sized>(&self, config_file_path: &T) -> bool {
if let Err(_) = fs::metadata(config_file_path) {
fs::File::create(config_file_path).expect("could not create config file");
}
if KeyType::RSA != self.keypair.key_type() {
if let Ok(protobuf_keypair) = self.keypair.to_protobuf_encoding() {
return util::write_config(
"auth",
"protobuf_keypair",
&format!("{:?}", protobuf_keypair),
config_file_path,
) && util::write_config(
"auth",
"Crypto",
&format!("{}", self.keypair.key_type()),
config_file_path,
);
}
}
false
}
pub fn peer_id(&self) -> PeerId {
self.keypair.public().to_peer_id()
}
pub async fn events(&mut self) -> IntoIter<NetworkEvent> {
let events = self.event_queue.into_inner().await.into_iter();
self.event_queue.drain().await;
events
}
pub async fn next_event(&mut self) -> Option<NetworkEvent> {
self.event_queue.pop().await
}
pub async fn replica_peers(&mut self, replica_network: &str) -> Vec<PeerId> {
let mut peers = Vec::new();
let request = AppData::GossipsubGetInfo;
if let Ok(response) = self.query_network(request).await {
if let AppResponse::GossipsubGetInfo { mesh_peers, .. } = response {
for (peer_id, networks) in mesh_peers {
if networks.contains(&replica_network.to_string()) {
peers.push(peer_id);
}
}
}
}
peers
}
pub async fn send_to_network(&mut self, app_request: AppData) -> Option<StreamId> {
let stream_id = StreamId::next(*self.current_stream_id.lock().await);
let request = StreamData::FromApplication(stream_id, app_request.clone());
match app_request {
AppData::KademliaDeleteRecord { .. } | AppData::KademliaStopProviding { .. } => {
let _ = self.application_sender.send(request).await;
return None;
},
_ => {
let mut stream_request_buffer = self.stream_request_buffer.lock().await;
if !stream_request_buffer.insert(stream_id) {
return None;
}
if let Ok(_) = self.application_sender.send(request).await {
*self.current_stream_id.lock().await = stream_id;
return Some(stream_id);
} else {
return None;
}
},
}
}
pub async fn recv_from_network(&mut self, stream_id: StreamId) -> NetworkResult<AppResponse> {
#[cfg(feature = "async-std-runtime")]
{
let channel = self.clone();
let response_handler = async_std::task::spawn(async move {
let mut loop_count = 0;
loop {
let mut buffer_guard = channel.stream_response_buffer.lock().await;
if let Some(result) = buffer_guard.remove(&stream_id) {
return Ok(result);
}
if loop_count < 10 {
loop_count += 1;
} else {
return Err(NetworkError::NetworkReadTimeout);
}
async_std::task::sleep(Duration::from_secs(TASK_SLEEP_DURATION)).await;
}
});
match response_handler.await {
Ok(result) => result,
Err(_) => Err(NetworkError::NetworkReadTimeout),
}
}
#[cfg(feature = "tokio-runtime")]
{
let channel = self.clone();
let response_handler = tokio::task::spawn(async move {
let mut loop_count = 0;
loop {
let mut buffer_guard = channel.stream_response_buffer.lock().await;
if let Some(result) = buffer_guard.remove(&stream_id) {
return Ok(result);
}
if loop_count < 10 {
loop_count += 1;
} else {
return Err(NetworkError::NetworkReadTimeout);
}
tokio::time::sleep(Duration::from_secs(TASK_SLEEP_DURATION)).await;
}
});
match response_handler.await {
Ok(result) => result?,
Err(_) => Err(NetworkError::NetworkReadTimeout),
}
}
}
pub async fn query_network(&mut self, request: AppData) -> NetworkResult<AppResponse> {
if let Some(stream_id) = self.send_to_network(request).await {
self.recv_from_network(stream_id).await
} else {
Err(NetworkError::StreamBufferOverflow)
}
}
async fn init_sharding(&mut self, network_id: String) {
let gossip_request = AppData::GossipsubJoinNetwork(network_id.clone());
let _ = self.query_network(gossip_request).await;
}
async fn update_shard_state(&mut self, peer: PeerId, shard_id: ShardId, join: bool) {
let mut shard_state = self.network_info.sharding.state.lock().await;
let shard_entry = shard_state
.entry(shard_id.clone())
.or_insert(Default::default());
if join {
shard_entry.insert(peer);
} else {
shard_entry.retain(|entry| entry != &peer);
if shard_entry.is_empty() {
shard_state.remove(&shard_id.to_string());
}
}
}
async fn publish_shard_state(&mut self, peer: PeerId) {
let shard_state = self.network_info.sharding.state.lock().await.clone();
let bytes = shard_image_to_bytes(shard_state).unwrap_or_default();
let message = vec![
Core::SHARD_RPC_SYNC_FLAG.as_bytes().to_vec(), bytes, ];
let rpc_request = AppData::SendRpc {
keys: message,
peer,
};
let _ = self.query_network(rpc_request).await;
}
async fn handle_incoming_repl_data(&mut self, repl_network: String, repl_data: ReplBufferData) {
let replica_data = repl_data.clone();
self.event_queue
.push(NetworkEvent::ReplicaDataIncoming {
data: replica_data.data,
network: repl_network.clone(),
outgoing_timestamp: replica_data.outgoing_timestamp,
incoming_timestamp: replica_data.incoming_timestamp,
message_id: replica_data.message_id,
source: replica_data.sender,
})
.await;
if let Some(repl_network_data) = self
.network_info
.replication
.state
.lock()
.await
.get_mut(&repl_network)
{
(*repl_network_data).lamport_clock =
cmp::max(repl_network_data.lamport_clock, repl_data.lamport_clock)
.saturating_add(1);
self.replica_buffer
.push(self.clone(), repl_network, repl_data)
.await;
}
}
async fn handle_incoming_shard_data(
&mut self,
shard_id: String,
source: PeerId,
incoming_data: ByteVector,
) {
self.event_queue
.push(NetworkEvent::IncomingForwardedData {
data: byte_vec_to_string_vec(incoming_data.clone()),
source,
})
.await;
let _ = self.replicate(incoming_data, &shard_id).await;
}
pub async fn consume_repl_data(&mut self, replica_network: &str) -> Option<ReplBufferData> {
self.replica_buffer
.pop_front(self.clone(), replica_network)
.await
}
pub async fn join_repl_network(&mut self, repl_network: String) -> NetworkResult<()> {
let mut cfg = self.network_info.replication.state.lock().await;
cfg.entry(repl_network.clone()).or_insert(ReplConfigData {
lamport_clock: 0,
last_clock: 0,
nodes: Default::default(),
});
self.replica_buffer.init(repl_network.clone()).await;
drop(cfg);
let gossip_request = AppData::GossipsubJoinNetwork(repl_network.clone());
let _ = self.query_network(gossip_request).await?;
if let ConsistencyModel::Eventual = self.replica_buffer.consistency_model() {
let core = self.clone();
let network = repl_network.clone();
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(async move {
let buffer = core.replica_buffer.clone();
buffer.sync_with_eventual_consistency(core, network).await;
});
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(async move {
let buffer = core.replica_buffer.clone();
buffer.sync_with_eventual_consistency(core, network).await;
});
}
Ok(())
}
pub async fn leave_repl_network(&mut self, repl_network: String) -> NetworkResult<AppResponse> {
let gossip_request = AppData::GossipsubExitNetwork(repl_network.clone());
self.query_network(gossip_request).await
}
pub async fn replicate_buffer(
&self,
repl_network: String,
replica_node: PeerId,
) -> Result<(), NetworkError> {
if self
.network_info
.replication
.state
.lock()
.await
.contains_key(&repl_network)
{
self.replica_buffer
.replicate_buffer(self.clone(), repl_network, replica_node)
.await
} else {
Err(NetworkError::MissingReplNetwork)
}
}
pub async fn replicate(
&mut self,
mut replica_data: ByteVector,
replica_network: &str,
) -> NetworkResult<()> {
let replica_network_data = {
let mut state = self.network_info.replication.state.lock().await;
if let Some(data) = state.get_mut(replica_network) {
data.lamport_clock = data.lamport_clock.saturating_add(1);
data.clone()
} else {
return Err(NetworkError::MissingReplNetwork);
}
};
let mut message = vec![
Core::REPL_GOSSIP_FLAG.as_bytes().to_vec(), get_unix_timestamp().to_string().into(), replica_network_data.lamport_clock.to_string().into(), replica_network.to_owned().into(), ];
message.append(&mut replica_data);
let gossip_request = AppData::GossipsubBroadcastMessage {
topic: replica_network.to_owned(),
message,
};
self.query_network(gossip_request).await?;
Ok(())
}
async fn handle_network_response(mut receiver: Receiver<StreamData>, network_core: Core) {
loop {
select! {
response = receiver.select_next_some() => {
let mut buffer_guard = network_core.stream_response_buffer.lock().await;
match response {
StreamData::ToApplication(stream_id, response) => match response {
AppResponse::Error(error) => buffer_guard.insert(stream_id, Err(error)),
res @ AppResponse::Echo(..) => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::DailPeerSuccess(..) => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaStoreRecordSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaLookupSuccess(..) => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaGetProviders{..} => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaNoProvidersFound => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaGetRoutingTableInfo { .. } => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::SendRpc(..) => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GetNetworkInfo{..} => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubBroadcastSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubJoinSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubExitSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubBlacklistSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubGetInfo{..} => buffer_guard.insert(stream_id, Ok(res)),
},
_ => false
};
}
}
}
}
async fn handle_async_operations(
mut swarm: Swarm<CoreBehaviour>,
mut network_sender: Sender<StreamData>,
mut receiver: Receiver<StreamData>,
mut network_core: Core,
) {
let data_queue_1 = DataQueue::new();
let data_queue_2 = DataQueue::new();
let data_queue_3 = DataQueue::new();
let data_queue_4 = DataQueue::new();
let mut network_info = network_core.network_info.clone();
loop {
select! {
stream_data = receiver.next() => {
match stream_data {
Some(incoming_data) => {
match incoming_data {
StreamData::FromApplication(stream_id, app_data) => {
let stream_id = stream_id;
match app_data {
AppData::Echo(message) => {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Echo(message))).await;
},
AppData::DailPeer(peer_id, multiaddr) => {
if let Ok(multiaddr) = multiaddr.parse::<Multiaddr>() {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr.clone());
if let Ok(_) = swarm.dial(multiaddr.clone().with(Protocol::P2p(peer_id))) {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::DailPeerSuccess(multiaddr.to_string()))).await;
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::DailPeerError))).await;
}
}
},
AppData::KademliaStoreRecord { key, value, expiration_time, explicit_peers } => {
let mut record = Record::new(key.clone(), value);
record.expires = expiration_time;
if let Ok(_) = swarm.behaviour_mut().kademlia.put_record(record.clone(), kad::Quorum::One) {
let _ = swarm.behaviour_mut().kademlia.start_providing(RecordKey::new(&key));
data_queue_1.push(stream_id).await;
if let Some(explicit_peers) = explicit_peers {
let peers = explicit_peers.iter().map(|peer_id_string| {
PeerId::from_bytes(&peer_id_string.from_base58().unwrap_or_default())
}).filter_map(Result::ok).collect::<Vec<_>>();
swarm.behaviour_mut().kademlia.put_record_to(record, peers.into_iter(), kad::Quorum::One);
}
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadStoreRecordError(key)))).await;
}
},
AppData::KademliaLookupRecord { key } => {
let _ = swarm.behaviour_mut().kademlia.get_record(key.clone().into());
data_queue_2.push(stream_id).await;
},
AppData::KademliaGetProviders { key } => {
swarm.behaviour_mut().kademlia.get_providers(key.clone().into());
data_queue_3.push(stream_id).await;
}
AppData::KademliaStopProviding { key } => {
swarm.behaviour_mut().kademlia.stop_providing(&key.into());
}
AppData::KademliaDeleteRecord { key } => {
swarm.behaviour_mut().kademlia.remove_record(&key.into());
}
AppData::KademliaGetRoutingTableInfo => {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaGetRoutingTableInfo{protocol_id: network_info.id.to_string()})).await;
},
AppData::SendRpc { keys, peer } => {
let rpc = Rpc::ReqResponse { data: keys.clone() };
let _ = swarm
.behaviour_mut()
.request_response
.send_request(&peer, rpc);
data_queue_4.push(stream_id).await;
},
AppData::GetNetworkInfo => {
let connected_peers = swarm.connected_peers().map(|peer| peer.to_owned()).collect::<Vec<_>>();
let external_addresses = swarm.listeners().map(|multiaddr| multiaddr.to_string()).collect::<Vec<_>>();
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GetNetworkInfo { peer_id: swarm.local_peer_id().clone(), connected_peers, external_addresses })).await;
},
AppData::GossipsubBroadcastMessage { message, topic } => {
let topic_hash = TopicHash::from_raw(topic);
let message = message.join(Core::GOSSIP_MESSAGE_SEPARATOR.as_bytes());
let is_subscribed = swarm.behaviour().gossipsub.mesh_peers(&topic_hash).any(|peer| peer == swarm.local_peer_id());
if swarm
.behaviour_mut().gossipsub
.publish(topic_hash, message).is_ok() && !is_subscribed {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBroadcastSuccess)).await;
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubBroadcastMessageError))).await;
}
},
AppData::GossipsubJoinNetwork(topic) => {
let topic = IdentTopic::new(topic);
if swarm.behaviour_mut().gossipsub.subscribe(&topic).is_ok() {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubJoinSuccess)).await;
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubJoinNetworkError))).await;
}
},
AppData::GossipsubGetInfo => {
let subscribed_topics = swarm.behaviour().gossipsub.topics().map(|topic| topic.clone().into_string()).collect::<Vec<_>>();
let mesh_peers = swarm.behaviour().gossipsub.all_peers().map(|(peer, topics)| {
(peer.to_owned(), topics.iter().map(|&t| t.clone().as_str().to_owned()).collect::<Vec<_>>())
}).collect::<Vec<_>>();
let blacklist = network_info.gossipsub.blacklist.into_inner();
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubGetInfo { topics: subscribed_topics, mesh_peers, blacklist })).await;
},
AppData::GossipsubExitNetwork(topic) => {
let topic = IdentTopic::new(topic);
if swarm.behaviour_mut().gossipsub.unsubscribe(&topic).is_ok() {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubExitSuccess)).await;
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubJoinNetworkError))).await;
}
}
AppData::GossipsubBlacklistPeer(peer) => {
swarm.behaviour_mut().gossipsub.blacklist_peer(&peer);
network_info.gossipsub.blacklist.list.insert(peer);
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBlacklistSuccess)).await;
},
AppData::GossipsubFilterBlacklist(peer) => {
swarm.behaviour_mut().gossipsub.remove_blacklisted_peer(&peer);
network_info.gossipsub.blacklist.list.remove(&peer);
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBlacklistSuccess)).await;
},
}
}
_ => {}
}
},
_ => {}
}
},
swarm_event = swarm.next() => {
match swarm_event {
Some(event) => {
match event {
SwarmEvent::NewListenAddr {
listener_id,
address,
} => {
network_core.event_queue.push(NetworkEvent::NewListenAddr{
local_peer_id: swarm.local_peer_id().to_owned(),
listener_id,
address
}).await;
}
SwarmEvent::Behaviour(event) => match event {
CoreEvent::Ping(ping::Event {
peer,
connection: _,
result,
}) => {
match result {
Ok(duration) => {
if let Some(err_count) =
network_info.ping.manager.outbound_errors.get(&peer)
{
let new_err_count = (err_count / 2) as u16;
network_info
.ping
.manager
.outbound_errors
.insert(peer, new_err_count);
}
if let Some(timeout_err_count) =
network_info.ping.manager.timeouts.get(&peer)
{
let new_err_count = (timeout_err_count / 2) as u16;
network_info
.ping
.manager
.timeouts
.insert(peer, new_err_count);
}
network_core.event_queue.push(NetworkEvent::OutboundPingSuccess{
peer_id: peer,
duration
}).await;
}
Err(err_type) => {
match network_info.ping.policy {
PingErrorPolicy::NoDisconnect => {
}
PingErrorPolicy::DisconnectAfterMaxErrors(max_errors) => {
let err_count = network_info
.ping
.manager
.outbound_errors
.entry(peer)
.or_insert(0);
if *err_count != max_errors {
let _ = swarm.disconnect_peer_id(peer);
network_info
.ping
.manager
.outbound_errors
.remove(&peer);
} else {
*err_count += 1;
}
}
PingErrorPolicy::DisconnectAfterMaxTimeouts(
max_timeout_errors,
) => {
if let Failure::Timeout = err_type {
let err_count = network_info
.ping
.manager
.timeouts
.entry(peer)
.or_insert(0);
if *err_count != max_timeout_errors {
let _ = swarm.disconnect_peer_id(peer);
network_info
.ping
.manager
.timeouts
.remove(&peer);
} else {
*err_count += 1;
}
}
}
}
network_core.event_queue.push(NetworkEvent::OutboundPingError{
peer_id: peer
}).await;
}
}
}
CoreEvent::Kademlia(event) => match event {
kad::Event::OutboundQueryProgressed { result, .. } => match result {
kad::QueryResult::GetProviders(Ok(success)) => {
match success {
kad::GetProvidersOk::FoundProviders { key, providers, .. } => {
let peer_id_strings = providers.iter().map(|peer_id| {
peer_id.to_base58()
}).collect::<Vec<_>>();
if let Some(stream_id) = data_queue_3.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaGetProviders{ key: key.to_vec(), providers: peer_id_strings })).await;
}
},
kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. } => {
if let Some(stream_id) = data_queue_3.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaNoProvidersFound)).await;
}
}
}
},
kad::QueryResult::GetProviders(Err(_)) => {
if let Some(stream_id) = data_queue_3.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaNoProvidersFound)).await;
}
},
kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(
kad::PeerRecord { record:kad::Record{ value, .. }, .. },
))) => {
if let Some(stream_id) = data_queue_2.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaLookupSuccess(value))).await;
}
}
kad::QueryResult::GetRecord(Ok(_)) => {
if let Some(stream_id) = data_queue_2.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadFetchRecordError(vec![])))).await;
}
},
kad::QueryResult::GetRecord(Err(e)) => {
let key = match e {
kad::GetRecordError::NotFound { key, .. } => key,
kad::GetRecordError::QuorumFailed { key, .. } => key,
kad::GetRecordError::Timeout { key } => key,
};
if let Some(stream_id) = data_queue_2.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadFetchRecordError(key.to_vec())))).await;
}
}
kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
if let Some(stream_id) = data_queue_1.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaStoreRecordSuccess)).await;
}
network_core.event_queue.push(NetworkEvent::KademliaPutRecordSuccess{
key: key.to_vec()
}).await;
}
kad::QueryResult::PutRecord(Err(e)) => {
let key = match e {
kad::PutRecordError::QuorumFailed { key, .. } => key,
kad::PutRecordError::Timeout { key, .. } => key,
};
if let Some(stream_id) = data_queue_1.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadStoreRecordError(key.to_vec())))).await;
}
network_core.event_queue.push(NetworkEvent::KademliaPutRecordError).await;
}
kad::QueryResult::StartProviding(Ok(kad::AddProviderOk {
key,
})) => {
network_core.event_queue.push(NetworkEvent::KademliaStartProvidingSuccess{
key: key.to_vec()
}).await;
}
kad::QueryResult::StartProviding(Err(_)) => {
network_core.event_queue.push(NetworkEvent::KademliaStartProvidingError).await;
}
_ => {}
}
kad::Event::RoutingUpdated { peer, .. } => {
network_core.event_queue.push(NetworkEvent::RoutingTableUpdated{
peer_id: peer
}).await;
}
_ => {}
},
CoreEvent::Identify(event) => match event {
identify::Event::Received { peer_id, info } => {
network_core.event_queue.push(NetworkEvent::IdentifyInfoReceived {
peer_id,
info: IdentifyInfo {
public_key: info.public_key,
listen_addrs: info.listen_addrs.clone(),
protocols: info.protocols,
observed_addr: info.observed_addr
}
}).await;
if info.protocol_version != network_info.id.as_ref() {
let _ = swarm.disconnect_peer_id(peer_id);
} else {
let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, info.listen_addrs[0].clone());
}
}
_ => {}
},
CoreEvent::RequestResponse(event) => match event {
request_response::Event::Message { peer, message } => match message {
request_response::Message::Request { request_id: _, request, channel } => {
match request {
Rpc::ReqResponse { data } => {
let byte_str = String::from_utf8_lossy(&data[0]);
match byte_str.as_ref() {
Core::RPC_SYNC_PULL_FLAG => {
let repl_network = String::from_utf8(data[1].clone()).unwrap_or_default();
let requested_msgs = network_core.replica_buffer.pull_missing_data(repl_network, &data[2..]).await;
let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: requested_msgs });
}
Core::SHARD_RPC_SYNC_FLAG => {
let incoming_state = bytes_to_shard_image(data[1].clone());
let mut current_shard_state = network_core.network_info.sharding.state.lock().await;
merge_shard_states(&mut current_shard_state, incoming_state);
let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: Default::default() });
}
Core::RPC_DATA_FORWARDING_FLAG => {
let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: Default::default() });
let shard_id = String::from_utf8_lossy(&data[1]).to_string();
let mut core = network_core.clone();
let incoming_data: ByteVector = data[2..].into();
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(async move {
let _ = core.handle_incoming_shard_data(shard_id, peer, incoming_data).await;
});
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(async move {
let _ = core.handle_incoming_shard_data(shard_id, peer, incoming_data).await;
});
}
Core::SHARD_RPC_REQUEST_FLAG => {
let response_data = network_info.sharding.local_storage.lock().await.fetch_data(data[1..].into());
let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: response_data });
}
_ => {
let response_data = (network_info.rpc_handler_fn)(data);
let _ = swarm.behaviour_mut().request_response.send_response(channel, Rpc::ReqResponse { data: response_data });
}
}
}
}
},
request_response::Message::Response { response, .. } => {
if let Some(stream_id) = data_queue_4.pop().await {
match response {
Rpc::ReqResponse { data } => {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::SendRpc(data))).await;
},
}
}
},
},
request_response::Event::OutboundFailure { .. } => {
if let Some(stream_id) = data_queue_4.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::RpcDataFetchError))).await;
}
},
_ => {}
},
CoreEvent::Gossipsub(event) => match event {
gossipsub::Event::Message { propagation_source, message_id, message } => {
let data_string = String::from_utf8_lossy(&message.data).to_string();
let gossip_data = data_string.split(Core::GOSSIP_MESSAGE_SEPARATOR).map(|msg| msg.to_string()).collect::<Vec<_>>();
match gossip_data[0].as_str() {
Core::REPL_GOSSIP_FLAG => {
let queue_data = ReplBufferData {
data: gossip_data[4..].to_owned(),
lamport_clock: gossip_data[2].parse::<u64>().unwrap_or(0), outgoing_timestamp: gossip_data[1].parse::<u64>().unwrap_or(0),
incoming_timestamp: get_unix_timestamp(),
message_id: message_id.to_string(),
sender: if let Some(peer) = message.source { peer.clone() } else { propagation_source.clone() },
confirmations: if network_core.replica_buffer.consistency_model() == ConsistencyModel::Eventual {
None
} else {
Some(1)
}
};
let mut core = network_core.clone();
let queue_data = queue_data.clone();
let data = gossip_data[3].clone().into();
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(async move {
let _ = core.handle_incoming_repl_data(data, queue_data).await;
});
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(async move {
let _ = core.handle_incoming_repl_data(data, queue_data).await;
});
},
Core::STRONG_CONSISTENCY_FLAG => {
let core = network_core.clone();
let data = gossip_data[2].clone().into();
let network = gossip_data[1].to_owned();
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(async move {
let _ = core.replica_buffer.handle_data_confirmation(core.clone(), network, data).await;
});
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(async move {
let _ = core.replica_buffer.handle_data_confirmation(core.clone(), network, data).await;
});
},
Core::EVENTUAL_CONSISTENCY_FLAG => {
let min_clock = gossip_data[3].parse::<u64>().unwrap_or_default();
let max_clock = gossip_data[4].parse::<u64>().unwrap_or_default();
let core = network_core.clone();
let repl_peer_id = gossip_data[1].clone();
let repl_network = gossip_data[2].clone();
let replica_data_state = gossip_data[5..].to_owned();
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(async move {
core.replica_buffer.sync_buffer_image(core.clone(), repl_peer_id, repl_network, (min_clock, max_clock), replica_data_state).await;
});
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(async move {
core.replica_buffer.sync_buffer_image(core.clone(), repl_peer_id, repl_network, (min_clock, max_clock), replica_data_state).await;
});
}
Core::SHARD_GOSSIP_JOIN_FLAG => {
if let Ok(peer_id) = gossip_data[1].parse::<PeerId>() {
let mut core = network_core.clone();
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(async move {
core.publish_shard_state(peer_id).await;
});
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(async move {
core.publish_shard_state(peer_id).await;
});
let _ = network_core.update_shard_state(peer_id, gossip_data[2].clone(), true ).await;
}
}
Core::SHARD_GOSSIP_EXIT_FLAG => {
if let Ok(peer_id) = gossip_data[1].parse::<PeerId>() {
let _ = network_core.update_shard_state(peer_id, gossip_data[2].clone(), false ).await;
}
}
_ => {
if (network_info.gossip_filter_fn)(propagation_source.clone(), message_id, message.source, message.topic.to_string(), gossip_data.clone()) {
network_core.event_queue.push(NetworkEvent::GossipsubIncomingMessageHandled { source: propagation_source, data: gossip_data }).await;
}
}
}
},
gossipsub::Event::Subscribed { peer_id, topic } => {
network_core.event_queue.push(NetworkEvent::GossipsubSubscribeMessageReceived { peer_id, topic: topic.to_string() }).await;
},
gossipsub::Event::Unsubscribed { peer_id, topic } => {
network_core.event_queue.push(NetworkEvent::GossipsubUnsubscribeMessageReceived { peer_id, topic: topic.to_string() }).await;
},
_ => {},
}
},
SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
num_established,
concurrent_dial_errors: _,
established_in,
} => {
if let ConnectedPoint::Listener { send_back_addr, .. } = endpoint.clone() {
let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, send_back_addr);
}
network_core.event_queue.push(NetworkEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
num_established,
established_in,
}).await;
}
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause: _,
} => {
network_core.event_queue.push(NetworkEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established
}).await;
}
SwarmEvent::ExpiredListenAddr {
listener_id,
address,
} => {
network_core.event_queue.push(NetworkEvent::ExpiredListenAddr {
listener_id,
address
}).await;
}
SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason: _,
} => {
network_core.event_queue.push(NetworkEvent::ListenerClosed {
listener_id,
addresses
}).await;
}
SwarmEvent::ListenerError {
listener_id,
error: _,
} => {
network_core.event_queue.push(NetworkEvent::ListenerError {
listener_id,
}).await;
}
SwarmEvent::Dialing {
peer_id,
connection_id,
} => {
network_core.event_queue.push(NetworkEvent::Dialing { peer_id, connection_id }).await;
}
SwarmEvent::NewExternalAddrCandidate { address } => {
network_core.event_queue.push(NetworkEvent::NewExternalAddrCandidate { address }).await;
}
SwarmEvent::ExternalAddrConfirmed { address } => {
network_core.event_queue.push(NetworkEvent::ExternalAddrConfirmed { address }).await;
}
SwarmEvent::ExternalAddrExpired { address } => {
network_core.event_queue.push(NetworkEvent::ExternalAddrExpired { address }).await;
}
SwarmEvent::IncomingConnection {
connection_id,
local_addr,
send_back_addr,
} => {
network_core.event_queue.push(NetworkEvent::IncomingConnection { connection_id, local_addr, send_back_addr }).await;
}
SwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error: _,
} => {
network_core.event_queue.push(NetworkEvent::IncomingConnectionError { connection_id, local_addr, send_back_addr }).await;
}
SwarmEvent::OutgoingConnectionError {
connection_id,
peer_id,
error: _,
} => {
network_core.event_queue.push(NetworkEvent::OutgoingConnectionError { connection_id, peer_id }).await;
}
_ => {},
}
},
_ => {}
}
}
}
}
}
}