#![doc = include_str!("../../doc/core/NetworkBuilder.md")]
#![doc = include_str!("../../doc/core/ApplicationInteraction.md")]
use std::{
collections::{HashMap, HashSet},
fs,
net::IpAddr,
num::NonZeroU32,
sync::Arc,
time::Duration,
};
use base58::FromBase58;
use futures::{
channel::mpsc::{self, Receiver, Sender},
select, SinkExt, StreamExt,
};
use libp2p::{
gossipsub::{self, IdentTopic, TopicHash},
identify::{self, Info},
kad::{self, store::MemoryStore, Mode, Record, RecordKey},
multiaddr::Protocol,
noise,
ping::{self, Failure},
request_response::{self, cbor::Behaviour, ProtocolSupport},
swarm::{ConnectionError, NetworkBehaviour, SwarmEvent},
tcp, tls, yamux, Multiaddr, StreamProtocol, Swarm, SwarmBuilder,
};
use self::{
gossipsub_cfg::{Blacklist, GossipsubInfo},
ping_config::*,
};
use super::*;
use crate::{setup::BootstrapConfig, util::string_to_peer_id};
#[cfg(feature = "async-std-runtime")]
use async_std::sync::Mutex;
#[cfg(feature = "tokio-runtime")]
use tokio::sync::Mutex;
mod prelude;
pub use prelude::*;
mod tests;
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "CoreEvent")]
struct CoreBehaviour {
request_response: request_response::cbor::Behaviour<Rpc, Rpc>,
kademlia: kad::Behaviour<MemoryStore>,
ping: ping::Behaviour,
identify: identify::Behaviour,
gossipsub: gossipsub::Behaviour,
}
#[derive(Debug)]
enum CoreEvent {
Ping(ping::Event),
RequestResponse(request_response::Event<Rpc, Rpc>),
Kademlia(kad::Event),
Identify(identify::Event),
Gossipsub(gossipsub::Event),
}
impl From<ping::Event> for CoreEvent {
fn from(event: ping::Event) -> Self {
CoreEvent::Ping(event)
}
}
impl From<kad::Event> for CoreEvent {
fn from(event: kad::Event) -> Self {
CoreEvent::Kademlia(event)
}
}
impl From<identify::Event> for CoreEvent {
fn from(event: identify::Event) -> Self {
CoreEvent::Identify(event)
}
}
impl From<request_response::Event<Rpc, Rpc>> for CoreEvent {
fn from(event: request_response::Event<Rpc, Rpc>) -> Self {
CoreEvent::RequestResponse(event)
}
}
impl From<gossipsub::Event> for CoreEvent {
fn from(event: gossipsub::Event) -> Self {
CoreEvent::Gossipsub(event)
}
}
pub struct CoreBuilder<T: EventHandler + Clone + Send + Sync + 'static> {
network_id: StreamProtocol,
keypair: Keypair,
tcp_udp_port: (Port, Port),
boot_nodes: HashMap<PeerIdString, MultiaddrString>,
blacklist: Blacklist,
handler: T,
stream_size: usize,
ip_address: IpAddr,
keep_alive_duration: Seconds,
transport: TransportOpts,
ping: (ping::Behaviour, PingErrorPolicy),
kademlia: kad::Behaviour<kad::store::MemoryStore>,
identify: identify::Behaviour,
request_response: Behaviour<Rpc, Rpc>,
gossipsub: gossipsub::Behaviour,
}
impl<T: EventHandler + Clone + Send + Sync + 'static> CoreBuilder<T> {
pub fn with_config(config: BootstrapConfig, handler: T) -> Self {
let network_id = DEFAULT_NETWORK_ID;
let default_transport = TransportOpts::TcpQuic {
tcp_config: TcpConfig::Default,
};
let peer_id = config.keypair().public().to_peer_id();
let mut cfg = kad::Config::default();
cfg.set_protocol_names(vec![StreamProtocol::new(network_id)]);
let kademlia = kad::Behaviour::new(peer_id, kad::store::MemoryStore::new(peer_id));
let cfg = identify::Config::new(network_id.to_owned(), config.keypair().public())
.with_push_listen_addr_updates(true);
let identify = identify::Behaviour::new(cfg);
let request_response = Behaviour::new(
[(StreamProtocol::new(network_id), ProtocolSupport::Full)],
request_response::Config::default(),
);
let cfg = gossipsub::Config::default();
let gossipsub = gossipsub::Behaviour::new(
gossipsub::MessageAuthenticity::Signed(config.keypair()),
cfg,
)
.map_err(|_| SwarmNlError::GossipConfigError)
.unwrap();
CoreBuilder {
network_id: StreamProtocol::new(network_id),
keypair: config.keypair(),
tcp_udp_port: config.ports(),
boot_nodes: config.bootnodes(),
blacklist: config.blacklist(),
handler,
stream_size: usize::MAX,
ip_address: IpAddr::V4(DEFAULT_IP_ADDRESS),
keep_alive_duration: DEFAULT_KEEP_ALIVE_DURATION,
transport: default_transport,
ping: (
Default::default(),
PingErrorPolicy::DisconnectAfterMaxTimeouts(20),
),
kademlia,
identify,
request_response,
gossipsub,
}
}
pub fn with_network_id(self, protocol: String) -> Self {
if protocol.len() > MIN_NETWORK_ID_LENGTH.into() && protocol.starts_with("/") {
CoreBuilder {
network_id: StreamProtocol::try_from_owned(protocol.clone())
.map_err(|_| SwarmNlError::NetworkIdParseError(protocol))
.unwrap(),
..self
}
} else {
panic!("Could not parse provided network id");
}
}
pub fn listen_on(self, ip_address: IpAddr) -> Self {
CoreBuilder { ip_address, ..self }
}
pub fn with_idle_connection_timeout(self, keep_alive_duration: Seconds) -> Self {
CoreBuilder {
keep_alive_duration,
..self
}
}
pub fn with_stream_size(self, size: usize) -> Self {
CoreBuilder {
stream_size: size,
..self
}
}
pub fn with_ping(self, config: PingConfig) -> Self {
CoreBuilder {
ping: (
ping::Behaviour::new(
ping::Config::new()
.with_interval(config.interval)
.with_timeout(config.timeout),
),
config.err_policy,
),
..self
}
}
pub fn with_rpc<F>(self, config: RpcConfig) -> Self {
CoreBuilder {
request_response: Behaviour::new(
[(self.network_id.clone(), ProtocolSupport::Full)],
request_response::Config::default()
.with_request_timeout(config.timeout)
.with_max_concurrent_streams(config.max_concurrent_streams),
),
..self
}
}
pub fn with_kademlia(self, config: kad::Config) -> Self {
let peer_id = self.keypair.public().to_peer_id();
let store = kad::store::MemoryStore::new(peer_id);
let kademlia = kad::Behaviour::with_config(peer_id, store, config);
CoreBuilder { kademlia, ..self }
}
pub fn with_gossipsub(
self,
config: gossipsub::Config,
auth: gossipsub::MessageAuthenticity,
) -> Self {
let gossipsub = gossipsub::Behaviour::new(auth, config)
.map_err(|_| SwarmNlError::GossipConfigError)
.unwrap();
CoreBuilder { gossipsub, ..self }
}
pub fn with_transports(self, transport: TransportOpts) -> Self {
CoreBuilder { transport, ..self }
}
pub fn configure_network_events(self, handler: T) -> Self {
CoreBuilder { handler, ..self }
}
pub fn network_id(&self) -> String {
self.network_id.to_string()
}
pub async fn build(self) -> SwarmNlResult<Core<T>> {
#[cfg(feature = "async-std-runtime")]
let mut swarm = {
let swarm_builder: SwarmBuilder<_, _> = match self.transport {
TransportOpts::TcpQuic { tcp_config } => match tcp_config {
TcpConfig::Default => {
libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
.with_async_std()
.with_tcp(
tcp::Config::default(),
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.map_err(|_| {
SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
tcp_config: TcpConfig::Default,
})
})?
.with_quic()
.with_dns()
.await
.map_err(|_| SwarmNlError::DNSConfigError)?
},
TcpConfig::Custom {
ttl,
nodelay,
backlog,
} => {
let tcp_config = tcp::Config::default()
.ttl(ttl)
.nodelay(nodelay)
.listen_backlog(backlog);
libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
.with_async_std()
.with_tcp(
tcp_config,
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.map_err(|_| {
SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
tcp_config: TcpConfig::Custom {
ttl,
nodelay,
backlog,
},
})
})?
.with_quic()
.with_dns()
.await
.map_err(|_| SwarmNlError::DNSConfigError)?
},
},
};
swarm_builder
.with_behaviour(|_| CoreBehaviour {
ping: self.ping.0,
kademlia: self.kademlia,
identify: self.identify,
request_response: self.request_response,
gossipsub: self.gossipsub,
})
.map_err(|_| SwarmNlError::ProtocolConfigError)?
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(self.keep_alive_duration))
})
.build()
};
#[cfg(feature = "tokio-runtime")]
let mut swarm = {
let swarm_builder: SwarmBuilder<_, _> = match self.transport {
TransportOpts::TcpQuic { tcp_config } => match tcp_config {
TcpConfig::Default => {
libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
.with_tokio()
.with_tcp(
tcp::Config::default(),
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.map_err(|_| {
SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
tcp_config: TcpConfig::Default,
})
})?
.with_quic()
},
TcpConfig::Custom {
ttl,
nodelay,
backlog,
} => {
let tcp_config = tcp::Config::default()
.ttl(ttl)
.nodelay(nodelay)
.listen_backlog(backlog);
libp2p::SwarmBuilder::with_existing_identity(self.keypair.clone())
.with_tokio()
.with_tcp(
tcp_config,
(tls::Config::new, noise::Config::new),
yamux::Config::default,
)
.map_err(|_| {
SwarmNlError::TransportConfigError(TransportOpts::TcpQuic {
tcp_config: TcpConfig::Custom {
ttl,
nodelay,
backlog,
},
})
})?
.with_quic()
},
},
};
swarm_builder
.with_behaviour(|_| CoreBehaviour {
ping: self.ping.0,
kademlia: self.kademlia,
identify: self.identify,
request_response: self.request_response,
gossipsub: self.gossipsub,
})
.map_err(|_| SwarmNlError::ProtocolConfigError)?
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(self.keep_alive_duration))
})
.build()
};
match self.transport {
TransportOpts::TcpQuic { tcp_config: _ } => {
let listen_addr_tcp = Multiaddr::empty()
.with(match self.ip_address {
IpAddr::V4(address) => Protocol::from(address),
IpAddr::V6(address) => Protocol::from(address),
})
.with(Protocol::Tcp(self.tcp_udp_port.0));
let listen_addr_quic = Multiaddr::empty()
.with(match self.ip_address {
IpAddr::V4(address) => Protocol::from(address),
IpAddr::V6(address) => Protocol::from(address),
})
.with(Protocol::Udp(self.tcp_udp_port.1))
.with(Protocol::QuicV1);
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
swarm.listen_on(listen_addr_tcp.clone()).map_err(|_| {
SwarmNlError::MultiaddressListenError(listen_addr_tcp.to_string())
})?;
#[cfg(any(feature = "tokio-runtime", feature = "async-std-runtime"))]
swarm.listen_on(listen_addr_quic.clone()).map_err(|_| {
SwarmNlError::MultiaddressListenError(listen_addr_quic.to_string())
})?;
},
}
for peer_info in self.boot_nodes {
if let Some(peer_id) = string_to_peer_id(&peer_info.0) {
if let Ok(multiaddr) = peer_info.1.parse::<Multiaddr>() {
if !self.blacklist.list.iter().any(|&id| id == peer_id) {
swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, multiaddr.clone());
println!("Dailing {}", multiaddr);
swarm
.dial(multiaddr.clone().with(Protocol::P2p(peer_id)))
.map_err(|_| {
SwarmNlError::RemotePeerDialError(multiaddr.to_string())
})?;
}
}
}
}
let _ = swarm.behaviour_mut().kademlia.bootstrap();
swarm.behaviour_mut().kademlia.set_mode(Some(Mode::Server));
for peer_id in &self.blacklist.list {
swarm.behaviour_mut().gossipsub.blacklist_peer(peer_id);
}
let (application_sender, network_receiver) =
mpsc::channel::<StreamData>(STREAM_BUFFER_CAPACITY);
let (network_sender, application_receiver) =
mpsc::channel::<StreamData>(STREAM_BUFFER_CAPACITY);
let peer_id = self.keypair.public().to_peer_id();
let mut timeouts = HashMap::<PeerId, u16>::new();
timeouts.insert(peer_id.clone(), 0);
let mut outbound_errors = HashMap::<PeerId, u16>::new();
outbound_errors.insert(peer_id.clone(), 0);
let manager = PingManager {
timeouts,
outbound_errors,
};
let ping_info = PingInfo {
policy: self.ping.1,
manager,
};
let gossip_info = GossipsubInfo {
blacklist: self.blacklist,
};
let stream_id = StreamId::new();
let stream_request_buffer =
Arc::new(Mutex::new(StreamRequestBuffer::new(self.stream_size)));
let stream_response_buffer =
Arc::new(Mutex::new(StreamResponseBuffer::new(self.stream_size)));
let network_info = NetworkInfo {
id: self.network_id,
ping: ping_info,
gossipsub: gossip_info,
};
let network_core = Core {
keypair: self.keypair,
application_sender,
stream_request_buffer: stream_request_buffer.clone(),
stream_response_buffer: stream_response_buffer.clone(),
current_stream_id: Arc::new(Mutex::new(stream_id)),
state: Arc::new(Mutex::new(self.handler)),
};
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(Core::handle_async_operations(
swarm,
network_info,
network_sender,
network_receiver,
network_core.clone(),
));
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(Core::handle_async_operations(
swarm,
network_info,
network_sender,
network_receiver,
network_core.clone(),
));
#[cfg(feature = "async-std-runtime")]
async_std::task::spawn(Core::handle_network_response(
application_receiver,
network_core.clone(),
));
#[cfg(feature = "tokio-runtime")]
tokio::task::spawn(Core::handle_network_response(
application_receiver,
network_core.clone(),
));
#[cfg(feature = "async-std-runtime")]
async_std::task::sleep(Duration::from_secs(BOOT_WAIT_TIME)).await;
#[cfg(feature = "tokio-runtime")]
tokio::time::sleep(Duration::from_secs(BOOT_WAIT_TIME)).await;
Ok(network_core)
}
}
#[derive(Clone)]
pub struct Core<T: EventHandler + Clone + Send + Sync + 'static> {
keypair: Keypair,
application_sender: Sender<StreamData>,
stream_response_buffer: Arc<Mutex<StreamResponseBuffer>>,
stream_request_buffer: Arc<Mutex<StreamRequestBuffer>>,
current_stream_id: Arc<Mutex<StreamId>>,
pub state: Arc<Mutex<T>>,
}
impl<T: EventHandler + Clone + Send + Sync + 'static> Core<T> {
pub fn save_keypair_offline(&self, config_file_path: &str) -> bool {
if let Err(_) = fs::metadata(config_file_path) {
fs::File::create(config_file_path).expect("could not create config file");
}
if KeyType::RSA != self.keypair.key_type() {
if let Ok(protobuf_keypair) = self.keypair.to_protobuf_encoding() {
return util::write_config(
"auth",
"protobuf_keypair",
&format!("{:?}", protobuf_keypair),
config_file_path,
) && util::write_config(
"auth",
"Crypto",
&format!("{}", self.keypair.key_type()),
config_file_path,
);
}
}
false
}
pub fn peer_id(&self) -> PeerId {
self.keypair.public().to_peer_id()
}
pub async fn send_to_network(&mut self, app_request: AppData) -> Option<StreamId> {
let stream_id = StreamId::next(*self.current_stream_id.lock().await);
let request = StreamData::FromApplication(stream_id, app_request.clone());
match app_request {
AppData::KademliaDeleteRecord { .. } | AppData::KademliaStopProviding { .. } => {
let _ = self.application_sender.send(request).await;
return None;
},
_ => {
let mut stream_request_buffer = self.stream_request_buffer.lock().await;
if !stream_request_buffer.insert(stream_id) {
return None;
}
if let Ok(_) = self.application_sender.send(request).await {
*self.current_stream_id.lock().await = stream_id;
return Some(stream_id);
} else {
return None;
}
},
}
}
pub async fn recv_from_network(&mut self, stream_id: StreamId) -> NetworkResult {
#[cfg(feature = "async-std-runtime")]
{
let channel = self.clone();
let response_handler = async_std::task::spawn(async move {
let mut loop_count = 0;
loop {
let mut buffer_guard = channel.stream_response_buffer.lock().await;
if let Some(result) = buffer_guard.remove(&stream_id) {
return Ok(result);
}
if loop_count < 5 {
loop_count += 1;
} else {
return Err(NetworkError::NetworkReadTimeout);
}
async_std::task::sleep(Duration::from_secs(TASK_SLEEP_DURATION)).await;
}
});
match response_handler.await {
Ok(result) => result,
Err(_) => Err(NetworkError::InternalTaskError),
}
}
#[cfg(feature = "tokio-runtime")]
{
let channel = self.clone();
let response_handler = tokio::task::spawn(async move {
let mut loop_count = 0;
loop {
let mut buffer_guard = channel.stream_response_buffer.lock().await;
if let Some(result) = buffer_guard.remove(&stream_id) {
return Ok(result);
}
if loop_count < 5 {
loop_count += 1;
} else {
return Err(NetworkError::NetworkReadTimeout);
}
tokio::time::sleep(Duration::from_secs(TASK_SLEEP_DURATION)).await;
}
});
match response_handler.await {
Ok(result) => result?,
Err(_) => Err(NetworkError::InternalTaskError),
}
}
}
pub async fn query_network(&mut self, request: AppData) -> NetworkResult {
if let Some(stream_id) = self.send_to_network(request).await {
self.recv_from_network(stream_id).await
} else {
Err(NetworkError::StreamBufferOverflow)
}
}
async fn handle_network_response(mut receiver: Receiver<StreamData>, network_core: Core<T>) {
loop {
select! {
response = receiver.select_next_some() => {
let mut buffer_guard = network_core.stream_response_buffer.lock().await;
match response {
StreamData::ToApplication(stream_id, response) => match response {
AppResponse::Error(error) => buffer_guard.insert(stream_id, Err(error)),
res @ AppResponse::Echo(..) => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::DailPeerSuccess(..) => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaStoreRecordSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaLookupSuccess(..) => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaGetProviders{..} => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaNoProvidersFound => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::KademliaGetRoutingTableInfo { .. } => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::FetchData(..) => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GetNetworkInfo{..} => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubBroadcastSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubJoinSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubExitSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubBlacklistSuccess => buffer_guard.insert(stream_id, Ok(res)),
res @ AppResponse::GossipsubGetInfo{..} => buffer_guard.insert(stream_id, Ok(res)),
},
_ => false
};
}
}
}
}
async fn handle_async_operations(
mut swarm: Swarm<CoreBehaviour>,
mut network_info: NetworkInfo,
mut network_sender: Sender<StreamData>,
mut receiver: Receiver<StreamData>,
network_core: Core<T>,
) {
let mut exec_queue_1 = ExecQueue::new();
let mut exec_queue_2 = ExecQueue::new();
let mut exec_queue_3 = ExecQueue::new();
let mut exec_queue_4 = ExecQueue::new();
loop {
select! {
stream_data = receiver.next() => {
match stream_data {
Some(incoming_data) => {
match incoming_data {
StreamData::FromApplication(stream_id, app_data) => {
let stream_id = stream_id;
match app_data {
AppData::Echo(message) => {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Echo(message))).await;
},
AppData::DailPeer(peer_id, multiaddr) => {
if let Ok(multiaddr) = multiaddr.parse::<Multiaddr>() {
swarm.behaviour_mut().kademlia.add_address(&peer_id, multiaddr.clone());
if let Ok(_) = swarm.dial(multiaddr.clone().with(Protocol::P2p(peer_id))) {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::DailPeerSuccess(multiaddr.to_string()))).await;
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::DailPeerError))).await;
}
}
},
AppData::KademliaStoreRecord { key, value, expiration_time, explicit_peers } => {
let mut record = Record::new(key.clone(), value);
record.expires = expiration_time;
if let Ok(_) = swarm.behaviour_mut().kademlia.put_record(record.clone(), kad::Quorum::One) {
let _ = swarm.behaviour_mut().kademlia.start_providing(RecordKey::new(&key));
exec_queue_1.push(stream_id).await;
if let Some(explicit_peers) = explicit_peers {
let peers = explicit_peers.iter().map(|peer_id_string| {
PeerId::from_bytes(&peer_id_string.from_base58().unwrap_or_default())
}).filter_map(Result::ok).collect::<Vec<_>>();
swarm.behaviour_mut().kademlia.put_record_to(record, peers.into_iter(), kad::Quorum::One);
}
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadStoreRecordError(key)))).await;
}
},
AppData::KademliaLookupRecord { key } => {
let _ = swarm.behaviour_mut().kademlia.get_record(key.clone().into());
exec_queue_2.push(stream_id).await;
},
AppData::KademliaGetProviders { key } => {
swarm.behaviour_mut().kademlia.get_providers(key.clone().into());
exec_queue_3.push(stream_id).await;
}
AppData::KademliaStopProviding { key } => {
swarm.behaviour_mut().kademlia.stop_providing(&key.into());
}
AppData::KademliaDeleteRecord { key } => {
swarm.behaviour_mut().kademlia.remove_record(&key.into());
}
AppData::KademliaGetRoutingTableInfo => {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaGetRoutingTableInfo{protocol_id: network_info.id.to_string()})).await;
},
AppData::FetchData { keys, peer } => {
let rpc = Rpc::ReqResponse { data: keys.clone() };
let _ = swarm
.behaviour_mut()
.request_response
.send_request(&peer, rpc);
exec_queue_4.push(stream_id).await;
},
AppData::GetNetworkInfo => {
let connected_peers = swarm.connected_peers().map(|peer| peer.to_owned()).collect::<Vec<_>>();
let external_addresses = swarm.listeners().map(|multiaddr| multiaddr.to_string()).collect::<Vec<_>>();
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GetNetworkInfo { peer_id: swarm.local_peer_id().clone(), connected_peers, external_addresses })).await;
},
AppData::GossipsubBroadcastMessage { message, topic } => {
let topic_hash = TopicHash::from_raw(topic);
let message = message.join(GOSSIP_MESSAGE_SEPARATOR);
let is_subscribed = swarm.behaviour().gossipsub.mesh_peers(&topic_hash).any(|peer| peer == swarm.local_peer_id());
if swarm
.behaviour_mut().gossipsub
.publish(topic_hash, message.as_bytes()).is_ok() && !is_subscribed {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBroadcastSuccess)).await;
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubBroadcastMessageError))).await;
}
},
AppData::GossipsubJoinNetwork(topic) => {
let topic = IdentTopic::new(topic);
if swarm.behaviour_mut().gossipsub.subscribe(&topic).is_ok() {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubJoinSuccess)).await;
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubJoinNetworkError))).await;
}
},
AppData::GossipsubGetInfo => {
let subscribed_topics = swarm.behaviour().gossipsub.topics().map(|topic| topic.clone().into_string()).collect::<Vec<_>>();
let mesh_peers = swarm.behaviour().gossipsub.all_peers().map(|(peer, topics)| {
(peer.to_owned(), topics.iter().map(|&t| t.clone().as_str().to_owned()).collect::<Vec<_>>())
}).collect::<Vec<_>>();
let blacklist = network_info.gossipsub.blacklist.into_inner();
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubGetInfo { topics: subscribed_topics, mesh_peers, blacklist })).await;
},
AppData::GossipsubExitNetwork(topic) => {
let topic = IdentTopic::new(topic);
if swarm.behaviour_mut().gossipsub.unsubscribe(&topic).is_ok() {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubExitSuccess)).await;
} else {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::GossipsubJoinNetworkError))).await;
}
}
AppData::GossipsubBlacklistPeer(peer) => {
swarm.behaviour_mut().gossipsub.blacklist_peer(&peer);
network_info.gossipsub.blacklist.list.insert(peer);
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBlacklistSuccess)).await;
},
AppData::GossipsubFilterBlacklist(peer) => {
swarm.behaviour_mut().gossipsub.remove_blacklisted_peer(&peer);
network_info.gossipsub.blacklist.list.remove(&peer);
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::GossipsubBlacklistSuccess)).await;
},
}
}
_ => {}
}
},
_ => {}
}
},
swarm_event = swarm.next() => {
match swarm_event {
Some(event) => {
match event {
SwarmEvent::NewListenAddr {
listener_id,
address,
} => {
network_core.state.lock().await.new_listen_addr(swarm.local_peer_id().to_owned(), listener_id, address);
}
SwarmEvent::Behaviour(event) => match event {
CoreEvent::Ping(ping::Event {
peer,
connection: _,
result,
}) => {
match result {
Ok(duration) => {
if let Some(err_count) =
network_info.ping.manager.outbound_errors.get(&peer)
{
let new_err_count = (err_count / 2) as u16;
network_info
.ping
.manager
.outbound_errors
.insert(peer, new_err_count);
}
if let Some(timeout_err_count) =
network_info.ping.manager.timeouts.get(&peer)
{
let new_err_count = (timeout_err_count / 2) as u16;
network_info
.ping
.manager
.timeouts
.insert(peer, new_err_count);
}
network_core.state.lock().await.outbound_ping_success(peer, duration);
}
Err(err_type) => {
match network_info.ping.policy {
PingErrorPolicy::NoDisconnect => {
}
PingErrorPolicy::DisconnectAfterMaxErrors(max_errors) => {
let err_count = network_info
.ping
.manager
.outbound_errors
.entry(peer)
.or_insert(0);
if *err_count != max_errors {
let _ = swarm.disconnect_peer_id(peer);
network_info
.ping
.manager
.outbound_errors
.remove(&peer);
} else {
*err_count += 1;
}
}
PingErrorPolicy::DisconnectAfterMaxTimeouts(
max_timeout_errors,
) => {
if let Failure::Timeout = err_type {
let err_count = network_info
.ping
.manager
.timeouts
.entry(peer)
.or_insert(0);
if *err_count != max_timeout_errors {
let _ = swarm.disconnect_peer_id(peer);
network_info
.ping
.manager
.timeouts
.remove(&peer);
} else {
*err_count += 1;
}
}
}
}
network_core.state.lock().await.outbound_ping_error(peer, err_type);
}
}
}
CoreEvent::Kademlia(event) => match event {
kad::Event::OutboundQueryProgressed { result, .. } => match result {
kad::QueryResult::GetProviders(Ok(success)) => {
match success {
kad::GetProvidersOk::FoundProviders { key, providers, .. } => {
let peer_id_strings = providers.iter().map(|peer_id| {
peer_id.to_base58()
}).collect::<Vec<_>>();
if let Some(stream_id) = exec_queue_3.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaGetProviders{ key: key.to_vec(), providers: peer_id_strings })).await;
}
},
kad::GetProvidersOk::FinishedWithNoAdditionalRecord { .. } => {
if let Some(stream_id) = exec_queue_3.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaNoProvidersFound)).await;
}
}
}
},
kad::QueryResult::GetProviders(Err(_)) => {
if let Some(stream_id) = exec_queue_3.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaNoProvidersFound)).await;
}
},
kad::QueryResult::GetRecord(Ok(kad::GetRecordOk::FoundRecord(
kad::PeerRecord { record:kad::Record{ value, .. }, .. },
))) => {
if let Some(stream_id) = exec_queue_2.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaLookupSuccess(value))).await;
}
}
kad::QueryResult::GetRecord(Ok(_)) => {
if let Some(stream_id) = exec_queue_2.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadFetchRecordError(vec![])))).await;
}
},
kad::QueryResult::GetRecord(Err(e)) => {
let key = match e {
kad::GetRecordError::NotFound { key, .. } => key,
kad::GetRecordError::QuorumFailed { key, .. } => key,
kad::GetRecordError::Timeout { key } => key,
};
if let Some(stream_id) = exec_queue_2.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadFetchRecordError(key.to_vec())))).await;
}
}
kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
if let Some(stream_id) = exec_queue_1.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::KademliaStoreRecordSuccess)).await;
}
network_core.state.lock().await.kademlia_put_record_success(key.to_vec());
}
kad::QueryResult::PutRecord(Err(e)) => {
let key = match e {
kad::PutRecordError::QuorumFailed { key, .. } => key,
kad::PutRecordError::Timeout { key, .. } => key,
};
if let Some(stream_id) = exec_queue_1.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::KadStoreRecordError(key.to_vec())))).await;
}
network_core.state.lock().await.kademlia_put_record_error();
}
kad::QueryResult::StartProviding(Ok(kad::AddProviderOk {
key,
})) => {
network_core.state.lock().await.kademlia_start_providing_success(key.to_vec());
}
kad::QueryResult::StartProviding(Err(_)) => {
network_core.state.lock().await.kademlia_start_providing_error();
}
_ => {}
}
kad::Event::RoutingUpdated { peer, .. } => {
network_core.state.lock().await.routing_table_updated(peer);
}
_ => {}
},
CoreEvent::Identify(event) => match event {
identify::Event::Received { peer_id, info } => {
network_core.state.lock().await.identify_info_recieved(peer_id, info.clone());
if info.protocol_version != network_info.id.as_ref() {
let _ = swarm.disconnect_peer_id(peer_id);
} else {
let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, info.listen_addrs[0].clone());
}
}
_ => {}
},
CoreEvent::RequestResponse(event) => match event {
request_response::Event::Message { peer: _, message } => match message {
request_response::Message::Request { request_id: _, request, channel } => {
match request {
Rpc::ReqResponse { data } => {
let response_data = network_core.state.lock().await.rpc_incoming_message_handled(data);
let response_rpc = Rpc::ReqResponse { data: response_data };
let _ = swarm.behaviour_mut().request_response.send_response(channel, response_rpc);
}
}
},
request_response::Message::Response { response, .. } => {
if let Some(stream_id) = exec_queue_4.pop().await {
match response {
Rpc::ReqResponse { data } => {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::FetchData(data))).await;
},
}
}
},
},
request_response::Event::OutboundFailure { .. } => {
if let Some(stream_id) = exec_queue_4.pop().await {
let _ = network_sender.send(StreamData::ToApplication(stream_id, AppResponse::Error(NetworkError::RpcDataFetchError))).await;
}
},
_ => {}
},
CoreEvent::Gossipsub(event) => match event {
gossipsub::Event::Message { propagation_source, message_id, message } => {
let data_string = String::from_utf8(message.data).unwrap_or_default();
let gossip_data = data_string.split(GOSSIP_MESSAGE_SEPARATOR).map(|msg| msg.to_string()).collect::<Vec<_>>();
if network_core.state.lock().await.gossipsub_incoming_message_filtered(propagation_source.clone(), message_id, message.source, message.topic.to_string(), gossip_data.clone()) {
network_core.state.lock().await.gossipsub_incoming_message_handled(propagation_source, gossip_data);
}
},
gossipsub::Event::Subscribed { peer_id, topic } => {
network_core.state.lock().await.gossipsub_subscribe_message_recieved(peer_id, topic.to_string());
},
gossipsub::Event::Unsubscribed { peer_id, topic } => {
network_core.state.lock().await.gossipsub_unsubscribe_message_recieved(peer_id, topic.to_string());
},
_ => {},
}
},
SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
num_established,
concurrent_dial_errors: _,
established_in,
} => {
if let ConnectedPoint::Listener { send_back_addr, .. } = endpoint.clone() {
let _ = swarm.behaviour_mut().kademlia.add_address(&peer_id, send_back_addr);
}
network_core.state.lock().await.connection_established(
peer_id,
connection_id,
&endpoint,
num_established,
established_in,
);
}
SwarmEvent::ConnectionClosed {
peer_id,
connection_id,
endpoint,
num_established,
cause,
} => {
network_core.state.lock().await.connection_closed(
peer_id,
connection_id,
&endpoint,
num_established,
cause,
);
}
SwarmEvent::ExpiredListenAddr {
listener_id,
address,
} => {
network_core.state.lock().await.expired_listen_addr(listener_id, address);
}
SwarmEvent::ListenerClosed {
listener_id,
addresses,
reason: _,
} => {
network_core.state.lock().await.listener_closed(listener_id, addresses);
}
SwarmEvent::ListenerError {
listener_id,
error: _,
} => {
network_core.state.lock().await.listener_error(listener_id);
}
SwarmEvent::Dialing {
peer_id,
connection_id,
} => {
network_core.state.lock().await.dialing(peer_id, connection_id);
}
SwarmEvent::NewExternalAddrCandidate { address } => {
network_core.state.lock().await.new_external_addr_candidate(address);
}
SwarmEvent::ExternalAddrConfirmed { address } => {
network_core.state.lock().await.external_addr_confirmed(address);
}
SwarmEvent::ExternalAddrExpired { address } => {
network_core.state.lock().await.external_addr_expired(address);
}
SwarmEvent::IncomingConnection {
connection_id,
local_addr,
send_back_addr,
} => {
network_core.state.lock().await.incoming_connection(connection_id, local_addr, send_back_addr);
}
SwarmEvent::IncomingConnectionError {
connection_id,
local_addr,
send_back_addr,
error: _,
} => {
network_core.state.lock().await.incoming_connection_error(
connection_id,
local_addr,
send_back_addr,
);
}
SwarmEvent::OutgoingConnectionError {
connection_id,
peer_id,
error: _,
} => {
network_core.state.lock().await.outgoing_connection_error(connection_id, peer_id);
}
_ => {},
}
},
_ => {}
}
}
}
}
}
}