swarm_nl/core/
sharding.rs1use std::fmt::Debug;
7
8use super::*;
9use async_trait::async_trait;
10use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
11
12pub trait ShardStorage: Send + Sync + Debug {
16	fn fetch_data(&mut self, key: ByteVector) -> ByteVector;
17}
18
19#[derive(Debug, Clone)]
21pub struct ShardingInfo {
22	pub id: String,
24	pub local_storage: Arc<Mutex<dyn ShardStorage>>,
26	pub state: Arc<Mutex<HashMap<ShardId, HashSet<PeerId>>>>,
28}
29
30#[derive(Debug)]
32pub(super) struct DefaultShardStorage;
33
34impl ShardStorage for DefaultShardStorage {
35	fn fetch_data(&mut self, key: ByteVector) -> ByteVector {
37		key
39	}
40}
41
42#[async_trait]
44pub trait Sharding
45where
46	Self::Key: Send + Sync,
47	Self::ShardId: ToString + Send + Sync,
48{
49	type Key: ?Sized;
51	type ShardId;
53
54	fn locate_shard(&self, key: &Self::Key) -> Option<Self::ShardId>;
56
57	async fn network_state(core: Core) -> HashMap<String, HashSet<PeerId>> {
59		core.network_info.sharding.state.lock().await.clone()
60	}
61
62	async fn join_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
64		let network_shard_id: Vec<u8> = match &core.network_info.sharding.id {
66			id if !id.is_empty() => id.clone().into(),
67			_ => return Err(NetworkError::MissingShardingNetworkIdError),
68		};
69		let network_sharding_id = String::from_utf8_lossy(&network_shard_id).to_string();
70
71		let gossip_request = AppData::GossipsubJoinNetwork(network_sharding_id.clone());
73		let _ = core.query_network(gossip_request).await?;
74
75		let mut shard_state = core.network_info.sharding.state.lock().await;
77		shard_state
78			.entry(shard_id.to_string())
79			.or_insert_with(Default::default)
80			.insert(core.peer_id());
81
82		drop(shard_state);
84
85		let _ = core.join_repl_network(shard_id.to_string()).await;
87
88		let message = vec![
90			Core::SHARD_GOSSIP_JOIN_FLAG.as_bytes().to_vec(), core.peer_id().to_string().into_bytes(),          shard_id.to_string().into_bytes(),                ];
94
95		let gossip_request = AppData::GossipsubBroadcastMessage {
96			topic: network_sharding_id,
97			message,
98		};
99
100		core.query_network(gossip_request).await?;
102
103		Ok(())
104	}
105
106	async fn exit_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
108		let mut shard_state = core.network_info.sharding.state.lock().await;
110		let shard_entry = shard_state
111			.entry(shard_id.to_string())
112			.or_insert(Default::default());
113
114		shard_entry.retain(|entry| entry != &core.peer_id());
115
116		if shard_entry.is_empty() {
118			shard_state.remove(&shard_id.to_string());
119		}
120
121		drop(shard_state);
123
124		let message = vec![
126			Core::SHARD_GOSSIP_EXIT_FLAG.to_string().into(), core.peer_id().to_base58().into(),               shard_id.to_string().into(),                     ];
130
131		let gossip_request = AppData::GossipsubBroadcastMessage {
133			topic: core.network_info.sharding.id.clone(),
134			message,
135		};
136
137		let _ = core.query_network(gossip_request).await?;
138
139		let shard_state = core.network_info.sharding.state.lock().await;
141		if !shard_state
142			.iter()
143			.any(|(_, peers)| peers.contains(&core.peer_id()))
144		{
145			drop(shard_state);
147
148			let gossip_request =
150				AppData::GossipsubJoinNetwork(core.network_info.sharding.id.clone());
151			core.query_network(gossip_request).await?;
152		}
153
154		Ok(())
155	}
156
157	async fn shard(
160		&self,
161		mut core: Core,
162		key: &Self::Key,
163		data: ByteVector,
164	) -> NetworkResult<Option<ByteVector>> {
165		let shard_id = match self.locate_shard(key) {
167			Some(shard_id) => shard_id,
168			None => return Err(NetworkError::ShardNotFound),
169		};
170
171		let nodes = {
173			let shard_state = core.network_info.sharding.state.lock().await;
174			shard_state.get(&shard_id.to_string()).cloned()
175		};
176
177		let nodes = match nodes {
179			Some(nodes) => nodes,
180			None => return Err(NetworkError::MissingShardNodesError),
181		};
182
183		if nodes.contains(&core.peer_id()) {
185			let _ = core.replicate(data.clone(), &shard_id.to_string()).await;
187			return Ok(Some(data)); }
189
190		let mut message = vec![
192			Core::RPC_DATA_FORWARDING_FLAG.as_bytes().to_vec(), shard_id.to_string().into_bytes(),
195		];
196		message.extend(data); let mut rng = StdRng::from_entropy();
200		let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();
201
202		nodes.shuffle(&mut rng);
203
204		for peer in nodes {
206			let rpc_request = AppData::SendRpc {
207				keys: message.clone(),
208				peer: peer.clone(),
209			};
210
211			if core.query_network(rpc_request).await.is_ok() {
214				return Ok(None); }
216		}
217
218		Err(NetworkError::DataForwardingError)
220	}
221
222	async fn fetch(
225		&self,
226		mut core: Core,
227		key: &Self::Key,
228		mut data: ByteVector,
229	) -> NetworkResult<Option<ByteVector>> {
230		let shard_id = match self.locate_shard(key) {
232			Some(shard_id) => shard_id,
233			None => return Err(NetworkError::ShardingFailureError),
234		};
235
236		let nodes = {
238			let shard_state = core.network_info.sharding.state.lock().await;
239			shard_state.get(&shard_id.to_string()).cloned()
240		};
241
242		let nodes = match nodes {
244			Some(nodes) => nodes,
245			None => return Err(NetworkError::ShardingFetchError),
246		};
247
248		if nodes.contains(&core.peer_id()) {
250			return Ok(None);
252		}
253
254		let mut rng = StdRng::from_entropy();
256		let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();
257
258		nodes.shuffle(&mut rng);
259
260		let mut message = vec![
262			Core::SHARD_RPC_REQUEST_FLAG.as_bytes().to_vec(), ];
265		message.append(&mut data);
266
267		for peer in nodes {
269			let rpc_request = AppData::SendRpc {
270				keys: message.clone(),
271				peer: peer.clone(),
272			};
273
274			if let Ok(response) = core.query_network(rpc_request).await {
276				if let AppResponse::SendRpc(data) = response {
277					return Ok(Some(data));
278				}
279			}
280		}
281
282		Err(NetworkError::ShardingFetchError)
284	}
285}