swarm_nl/core/
sharding.rs

1// Copyright 2024 Algorealm, Inc.
2// Apache 2.0 License
3
4//! Module that contains important data structures to manage sharding operations on the
5//! network.
6use std::fmt::Debug;
7
8use super::*;
9use async_trait::async_trait;
10use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
11
12/// Trait that interfaces with the storage layer of a node in a shard. It is important for handling
13/// forwarded data requests. This is a mechanism to trap into the application storage layer to read
14/// sharded data.
15pub trait ShardStorage: Send + Sync + Debug {
16	fn fetch_data(&mut self, key: ByteVector) -> ByteVector;
17}
18
19/// Important data for the operation of the sharding protocol.
20#[derive(Debug, Clone)]
21pub struct ShardingInfo {
22	/// The id of the entire sharded network.
23	pub id: String,
24	/// Shard local storage.
25	pub local_storage: Arc<Mutex<dyn ShardStorage>>,
26	/// The shards and the various nodes they contain.
27	pub state: Arc<Mutex<HashMap<ShardId, HashSet<PeerId>>>>,
28}
29
30/// Default shard storage to respond to forwarded data requests.
31#[derive(Debug)]
32pub(super) struct DefaultShardStorage;
33
34impl ShardStorage for DefaultShardStorage {
35	/// Important function to implement on shard storage interface to read local shard data and return a response to the requesting node.
36	fn fetch_data(&mut self, key: ByteVector) -> ByteVector {
37		// Simply echo incoming data request
38		key
39	}
40}
41
42/// Trait that specifies sharding logic and behaviour of shards.
43#[async_trait]
44pub trait Sharding
45where
46	Self::Key: Send + Sync,
47	Self::ShardId: ToString + Send + Sync,
48{
49	/// The type of the shard key e.g hash, range etc.
50	type Key: ?Sized;
51	/// The identifier pointing to a specific shard.
52	type ShardId;
53
54	/// Map a key to a shard.
55	fn locate_shard(&self, key: &Self::Key) -> Option<Self::ShardId>;
56
57	/// Return the state of the shard network.
58	async fn network_state(core: Core) -> HashMap<String, HashSet<PeerId>> {
59		core.network_info.sharding.state.lock().await.clone()
60	}
61
62	/// Join a shard network.
63	async fn join_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
64		// Ensure the network sharding ID is set.
65		let network_shard_id: Vec<u8> = match &core.network_info.sharding.id {
66			id if !id.is_empty() => id.clone().into(),
67			_ => return Err(NetworkError::MissingShardingNetworkIdError),
68		};
69		let network_sharding_id = String::from_utf8_lossy(&network_shard_id).to_string();
70
71		// Join the generic shard (gossip) network
72		let gossip_request = AppData::GossipsubJoinNetwork(network_sharding_id.clone());
73		let _ = core.query_network(gossip_request).await?;
74
75		// Update the local shard state
76		let mut shard_state = core.network_info.sharding.state.lock().await;
77		shard_state
78			.entry(shard_id.to_string())
79			.or_insert_with(Default::default)
80			.insert(core.peer_id());
81
82		// Free `Core`
83		drop(shard_state);
84
85		// Join the shard network (as a replica network)
86		let _ = core.join_repl_network(shard_id.to_string()).await;
87
88		// Inform the entire network about our decision
89		let message = vec![
90			Core::SHARD_GOSSIP_JOIN_FLAG.as_bytes().to_vec(), // Flag for join event.
91			core.peer_id().to_string().into_bytes(),          // Our peer ID.
92			shard_id.to_string().into_bytes(),                // Shard we're joining
93		];
94
95		let gossip_request = AppData::GossipsubBroadcastMessage {
96			topic: network_sharding_id,
97			message,
98		};
99
100		// Gossip the join event to all nodes.
101		core.query_network(gossip_request).await?;
102
103		Ok(())
104	}
105
106	/// Exit a shard network.
107	async fn exit_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
108		// First, we remove ourself from the network state
109		let mut shard_state = core.network_info.sharding.state.lock().await;
110		let shard_entry = shard_state
111			.entry(shard_id.to_string())
112			.or_insert(Default::default());
113
114		shard_entry.retain(|entry| entry != &core.peer_id());
115
116		// If the last node has exited the shard, dissolve it
117		if shard_entry.is_empty() {
118			shard_state.remove(&shard_id.to_string());
119		}
120
121		// Release `core`
122		drop(shard_state);
123
124		// Then, we make a broadcast
125		let message = vec![
126			Core::SHARD_GOSSIP_EXIT_FLAG.to_string().into(), // Appropriate flag
127			core.peer_id().to_base58().into(),               // Our peerId
128			shard_id.to_string().into(),                     // Network we're leaving
129		];
130
131		// Prepare a gossip request
132		let gossip_request = AppData::GossipsubBroadcastMessage {
133			topic: core.network_info.sharding.id.clone(),
134			message,
135		};
136
137		let _ = core.query_network(gossip_request).await?;
138
139		// Check if we're in any shard
140		let shard_state = core.network_info.sharding.state.lock().await;
141		if !shard_state
142			.iter()
143			.any(|(_, peers)| peers.contains(&core.peer_id()))
144		{
145			// Release `core`
146			drop(shard_state);
147
148			// Leave the underlying sharding (gossip) network
149			let gossip_request =
150				AppData::GossipsubJoinNetwork(core.network_info.sharding.id.clone());
151			core.query_network(gossip_request).await?;
152		}
153
154		Ok(())
155	}
156
157	/// Send data to peers in the appropriate logical shard. It returns the data if the node is a
158	/// member of the shard after replicating it to fellow nodes in the same shard.
159	async fn shard(
160		&self,
161		mut core: Core,
162		key: &Self::Key,
163		data: ByteVector,
164	) -> NetworkResult<Option<ByteVector>> {
165		// Locate the shard that would store the key.
166		let shard_id = match self.locate_shard(key) {
167			Some(shard_id) => shard_id,
168			None => return Err(NetworkError::ShardNotFound),
169		};
170
171		// Retrieve the nodes in the logical shard.
172		let nodes = {
173			let shard_state = core.network_info.sharding.state.lock().await;
174			shard_state.get(&shard_id.to_string()).cloned()
175		};
176
177		// If no nodes exist for the shard, return an error.
178		let nodes = match nodes {
179			Some(nodes) => nodes,
180			None => return Err(NetworkError::MissingShardNodesError),
181		};
182
183		// Check if the current node is part of the shard.
184		if nodes.contains(&core.peer_id()) {
185			// Replicate the data to nodes in the shard.
186			let _ = core.replicate(data.clone(), &shard_id.to_string()).await;
187			return Ok(Some(data)); // Return the data to the caller.
188		}
189
190		// Prepare the message for data forwarding.
191		let mut message = vec![
192			Core::RPC_DATA_FORWARDING_FLAG.as_bytes().to_vec(), /* Flag to indicate data
193			                                                     * forwarding. */
194			shard_id.to_string().into_bytes(),
195		];
196		message.extend(data); // Append the data payload.
197
198		// Shuffle nodes so their order of query is randomized
199		let mut rng = StdRng::from_entropy();
200		let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();
201
202		nodes.shuffle(&mut rng);
203
204		// Attempt to forward the data to peers.
205		for peer in nodes {
206			let rpc_request = AppData::SendRpc {
207				keys: message.clone(),
208				peer: peer.clone(),
209			};
210
211			// Query the network and return success on the first successful response.
212			// The recieving node will then replicate it to other nodes in the shard.
213			if core.query_network(rpc_request).await.is_ok() {
214				return Ok(None); // Forwarding succeeded.
215			}
216		}
217
218		// If all peers fail, return an error.
219		Err(NetworkError::DataForwardingError)
220	}
221
222	/// Fetch data from the shard network. It returns `None` if the node is a memnber of the shard
223	/// with the data, meaning the node should read it locally.
224	async fn fetch(
225		&self,
226		mut core: Core,
227		key: &Self::Key,
228		mut data: ByteVector,
229	) -> NetworkResult<Option<ByteVector>> {
230		// Locate the shard that would store the key.
231		let shard_id = match self.locate_shard(key) {
232			Some(shard_id) => shard_id,
233			None => return Err(NetworkError::ShardingFailureError),
234		};
235
236		// Retrieve the nodes in the logical shard.
237		let nodes = {
238			let shard_state = core.network_info.sharding.state.lock().await;
239			shard_state.get(&shard_id.to_string()).cloned()
240		};
241
242		// If no nodes exist for the shard, return an error.
243		let nodes = match nodes {
244			Some(nodes) => nodes,
245			None => return Err(NetworkError::ShardingFetchError),
246		};
247
248		// Check if the current node is part of the shard.
249		if nodes.contains(&core.peer_id()) {
250			// Return `None`
251			return Ok(None);
252		}
253
254		// Shuffle the peers.
255		let mut rng = StdRng::from_entropy();
256		let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();
257
258		nodes.shuffle(&mut rng);
259
260		// Prepare an RPC to ask for the data from nodes in the shard.
261		let mut message = vec![
262			Core::SHARD_RPC_REQUEST_FLAG.as_bytes().to_vec(), /* Flag to indicate shard data
263			                                                   * request */
264		];
265		message.append(&mut data);
266
267		// Attempt to forward the data to peers.
268		for peer in nodes {
269			let rpc_request = AppData::SendRpc {
270				keys: message.clone(),
271				peer: peer.clone(),
272			};
273
274			// Query the network and return the response on the first successful response.
275			if let Ok(response) = core.query_network(rpc_request).await {
276				if let AppResponse::SendRpc(data) = response {
277					return Ok(Some(data));
278				}
279			}
280		}
281
282		// Fetch Failed
283		Err(NetworkError::ShardingFetchError)
284	}
285}