swarm_nl/core/
sharding.rs1use std::fmt::Debug;
7
8use super::*;
9use async_trait::async_trait;
10use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
11
12pub trait ShardStorage: Send + Sync + Debug {
16 fn fetch_data(&mut self, key: ByteVector) -> ByteVector;
17}
18
19#[derive(Debug, Clone)]
21pub struct ShardingInfo {
22 pub id: String,
24 pub local_storage: Arc<Mutex<dyn ShardStorage>>,
26 pub state: Arc<Mutex<HashMap<ShardId, HashSet<PeerId>>>>,
28}
29
30#[derive(Debug)]
32pub(super) struct DefaultShardStorage;
33
34impl ShardStorage for DefaultShardStorage {
35 fn fetch_data(&mut self, key: ByteVector) -> ByteVector {
37 key
39 }
40}
41
42#[async_trait]
44pub trait Sharding
45where
46 Self::Key: Send + Sync,
47 Self::ShardId: ToString + Send + Sync,
48{
49 type Key: ?Sized;
51 type ShardId;
53
54 fn locate_shard(&self, key: &Self::Key) -> Option<Self::ShardId>;
56
57 async fn network_state(core: Core) -> HashMap<String, HashSet<PeerId>> {
59 core.network_info.sharding.state.lock().await.clone()
60 }
61
62 async fn join_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
64 let network_shard_id: Vec<u8> = match &core.network_info.sharding.id {
66 id if !id.is_empty() => id.clone().into(),
67 _ => return Err(NetworkError::MissingShardingNetworkIdError),
68 };
69 let network_sharding_id = String::from_utf8_lossy(&network_shard_id).to_string();
70
71 let gossip_request = AppData::GossipsubJoinNetwork(network_sharding_id.clone());
73 let _ = core.query_network(gossip_request).await?;
74
75 let mut shard_state = core.network_info.sharding.state.lock().await;
77 shard_state
78 .entry(shard_id.to_string())
79 .or_insert_with(Default::default)
80 .insert(core.peer_id());
81
82 drop(shard_state);
84
85 let _ = core.join_repl_network(shard_id.to_string()).await;
87
88 let message = vec![
90 Core::SHARD_GOSSIP_JOIN_FLAG.as_bytes().to_vec(), core.peer_id().to_string().into_bytes(), shard_id.to_string().into_bytes(), ];
94
95 let gossip_request = AppData::GossipsubBroadcastMessage {
96 topic: network_sharding_id,
97 message,
98 };
99
100 core.query_network(gossip_request).await?;
102
103 Ok(())
104 }
105
106 async fn exit_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
108 let mut shard_state = core.network_info.sharding.state.lock().await;
110 let shard_entry = shard_state
111 .entry(shard_id.to_string())
112 .or_insert(Default::default());
113
114 shard_entry.retain(|entry| entry != &core.peer_id());
115
116 if shard_entry.is_empty() {
118 shard_state.remove(&shard_id.to_string());
119 }
120
121 drop(shard_state);
123
124 let message = vec![
126 Core::SHARD_GOSSIP_EXIT_FLAG.to_string().into(), core.peer_id().to_base58().into(), shard_id.to_string().into(), ];
130
131 let gossip_request = AppData::GossipsubBroadcastMessage {
133 topic: core.network_info.sharding.id.clone(),
134 message,
135 };
136
137 let _ = core.query_network(gossip_request).await?;
138
139 let shard_state = core.network_info.sharding.state.lock().await;
141 if !shard_state
142 .iter()
143 .any(|(_, peers)| peers.contains(&core.peer_id()))
144 {
145 drop(shard_state);
147
148 let gossip_request =
150 AppData::GossipsubJoinNetwork(core.network_info.sharding.id.clone());
151 core.query_network(gossip_request).await?;
152 }
153
154 Ok(())
155 }
156
157 async fn shard(
160 &self,
161 mut core: Core,
162 key: &Self::Key,
163 data: ByteVector,
164 ) -> NetworkResult<Option<ByteVector>> {
165 let shard_id = match self.locate_shard(key) {
167 Some(shard_id) => shard_id,
168 None => return Err(NetworkError::ShardNotFound),
169 };
170
171 let nodes = {
173 let shard_state = core.network_info.sharding.state.lock().await;
174 shard_state.get(&shard_id.to_string()).cloned()
175 };
176
177 let nodes = match nodes {
179 Some(nodes) => nodes,
180 None => return Err(NetworkError::MissingShardNodesError),
181 };
182
183 if nodes.contains(&core.peer_id()) {
185 let _ = core.replicate(data.clone(), &shard_id.to_string()).await;
187 return Ok(Some(data)); }
189
190 let mut message = vec![
192 Core::RPC_DATA_FORWARDING_FLAG.as_bytes().to_vec(), shard_id.to_string().into_bytes(),
195 ];
196 message.extend(data); let mut rng = StdRng::from_entropy();
200 let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();
201
202 nodes.shuffle(&mut rng);
203
204 for peer in nodes {
206 let rpc_request = AppData::SendRpc {
207 keys: message.clone(),
208 peer: peer.clone(),
209 };
210
211 if core.query_network(rpc_request).await.is_ok() {
214 return Ok(None); }
216 }
217
218 Err(NetworkError::DataForwardingError)
220 }
221
222 async fn fetch(
225 &self,
226 mut core: Core,
227 key: &Self::Key,
228 mut data: ByteVector,
229 ) -> NetworkResult<Option<ByteVector>> {
230 let shard_id = match self.locate_shard(key) {
232 Some(shard_id) => shard_id,
233 None => return Err(NetworkError::ShardingFailureError),
234 };
235
236 let nodes = {
238 let shard_state = core.network_info.sharding.state.lock().await;
239 shard_state.get(&shard_id.to_string()).cloned()
240 };
241
242 let nodes = match nodes {
244 Some(nodes) => nodes,
245 None => return Err(NetworkError::ShardingFetchError),
246 };
247
248 if nodes.contains(&core.peer_id()) {
250 return Ok(None);
252 }
253
254 let mut rng = StdRng::from_entropy();
256 let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();
257
258 nodes.shuffle(&mut rng);
259
260 let mut message = vec![
262 Core::SHARD_RPC_REQUEST_FLAG.as_bytes().to_vec(), ];
265 message.append(&mut data);
266
267 for peer in nodes {
269 let rpc_request = AppData::SendRpc {
270 keys: message.clone(),
271 peer: peer.clone(),
272 };
273
274 if let Ok(response) = core.query_network(rpc_request).await {
276 if let AppResponse::SendRpc(data) = response {
277 return Ok(Some(data));
278 }
279 }
280 }
281
282 Err(NetworkError::ShardingFetchError)
284 }
285}