swarm_nl/core/
sharding.rsuse std::fmt::Debug;
use super::*;
use async_trait::async_trait;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
pub trait ShardStorage: Send + Sync + Debug {
fn fetch_data(&mut self, key: ByteVector) -> ByteVector;
}
#[derive(Debug, Clone)]
pub struct ShardingInfo {
pub id: String,
pub local_storage: Arc<Mutex<dyn ShardStorage>>,
pub state: Arc<Mutex<HashMap<ShardId, HashSet<PeerId>>>>,
}
#[derive(Debug)]
pub(super) struct DefaultShardStorage;
impl ShardStorage for DefaultShardStorage {
fn fetch_data(&mut self, key: ByteVector) -> ByteVector {
key
}
}
#[async_trait]
pub trait Sharding
where
Self::Key: Send + Sync,
Self::ShardId: ToString + Send + Sync,
{
type Key: ?Sized;
type ShardId;
fn locate_shard(&self, key: &Self::Key) -> Option<Self::ShardId>;
async fn network_state(core: Core) -> HashMap<String, HashSet<PeerId>> {
core.network_info.sharding.state.lock().await.clone()
}
async fn join_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
let network_shard_id: Vec<u8> = match &core.network_info.sharding.id {
id if !id.is_empty() => id.clone().into(),
_ => return Err(NetworkError::MissingShardingNetworkIdError),
};
let network_sharding_id = String::from_utf8_lossy(&network_shard_id).to_string();
let gossip_request = AppData::GossipsubJoinNetwork(network_sharding_id.clone());
let _ = core.query_network(gossip_request).await?;
let mut shard_state = core.network_info.sharding.state.lock().await;
shard_state
.entry(shard_id.to_string())
.or_insert_with(Default::default)
.insert(core.peer_id());
drop(shard_state);
let _ = core.join_repl_network(shard_id.to_string()).await;
let message = vec![
Core::SHARD_GOSSIP_JOIN_FLAG.as_bytes().to_vec(), core.peer_id().to_string().into_bytes(), shard_id.to_string().into_bytes(), ];
let gossip_request = AppData::GossipsubBroadcastMessage {
topic: network_sharding_id,
message,
};
core.query_network(gossip_request).await?;
Ok(())
}
async fn exit_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
let mut shard_state = core.network_info.sharding.state.lock().await;
let shard_entry = shard_state
.entry(shard_id.to_string())
.or_insert(Default::default());
shard_entry.retain(|entry| entry != &core.peer_id());
if shard_entry.is_empty() {
shard_state.remove(&shard_id.to_string());
}
drop(shard_state);
let message = vec![
Core::SHARD_GOSSIP_EXIT_FLAG.to_string().into(), core.peer_id().to_base58().into(), shard_id.to_string().into(), ];
let gossip_request = AppData::GossipsubBroadcastMessage {
topic: core.network_info.sharding.id.clone(),
message,
};
let _ = core.query_network(gossip_request).await?;
let shard_state = core.network_info.sharding.state.lock().await;
if !shard_state
.iter()
.any(|(_, peers)| peers.contains(&core.peer_id()))
{
drop(shard_state);
let gossip_request =
AppData::GossipsubJoinNetwork(core.network_info.sharding.id.clone());
core.query_network(gossip_request).await?;
}
Ok(())
}
async fn shard(
&self,
mut core: Core,
key: &Self::Key,
data: ByteVector,
) -> NetworkResult<Option<ByteVector>> {
let shard_id = match self.locate_shard(key) {
Some(shard_id) => shard_id,
None => return Err(NetworkError::ShardNotFound),
};
let nodes = {
let shard_state = core.network_info.sharding.state.lock().await;
shard_state.get(&shard_id.to_string()).cloned()
};
let nodes = match nodes {
Some(nodes) => nodes,
None => return Err(NetworkError::MissingShardNodesError),
};
if nodes.contains(&core.peer_id()) {
let _ = core.replicate(data.clone(), &shard_id.to_string()).await;
return Ok(Some(data)); }
let mut message = vec![
Core::RPC_DATA_FORWARDING_FLAG.as_bytes().to_vec(), shard_id.to_string().into_bytes(),
];
message.extend(data); let mut rng = StdRng::from_entropy();
let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();
nodes.shuffle(&mut rng);
for peer in nodes {
let rpc_request = AppData::SendRpc {
keys: message.clone(),
peer: peer.clone(),
};
if core.query_network(rpc_request).await.is_ok() {
return Ok(None); }
}
Err(NetworkError::DataForwardingError)
}
async fn fetch(
&self,
mut core: Core,
key: &Self::Key,
mut data: ByteVector,
) -> NetworkResult<Option<ByteVector>> {
let shard_id = match self.locate_shard(key) {
Some(shard_id) => shard_id,
None => return Err(NetworkError::ShardingFailureError),
};
let nodes = {
let shard_state = core.network_info.sharding.state.lock().await;
shard_state.get(&shard_id.to_string()).cloned()
};
let nodes = match nodes {
Some(nodes) => nodes,
None => return Err(NetworkError::ShardingFetchError),
};
if nodes.contains(&core.peer_id()) {
return Ok(None);
}
let mut rng = StdRng::from_entropy();
let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();
nodes.shuffle(&mut rng);
let mut message = vec![
Core::SHARD_RPC_REQUEST_FLAG.as_bytes().to_vec(), ];
message.append(&mut data);
for peer in nodes {
let rpc_request = AppData::SendRpc {
keys: message.clone(),
peer: peer.clone(),
};
if let Ok(response) = core.query_network(rpc_request).await {
if let AppResponse::SendRpc(data) = response {
return Ok(Some(data));
}
}
}
Err(NetworkError::ShardingFetchError)
}
}