swarm_nl/core/
sharding.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
// Copyright 2024 Algorealm, Inc.
// Apache 2.0 License

//! Module that contains important data structures to manage sharding operations on the
//! network.
use std::fmt::Debug;

use super::*;
use async_trait::async_trait;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};

/// Trait that interfaces with the storage layer of a node in a shard. It is important for handling
/// forwarded data requests. This is a mechanism to trap into the application storage layer to read
/// sharded data.
pub trait ShardStorage: Send + Sync + Debug {
	fn fetch_data(&mut self, key: ByteVector) -> ByteVector;
}

/// Important data for the operation of the sharding protocol.
#[derive(Debug, Clone)]
pub struct ShardingInfo {
	/// The id of the entire sharded network.
	pub id: String,
	/// Shard local storage.
	pub local_storage: Arc<Mutex<dyn ShardStorage>>,
	/// The shards and the various nodes they contain.
	pub state: Arc<Mutex<HashMap<ShardId, HashSet<PeerId>>>>,
}

/// Default shard storage to respond to forwarded data requests.
#[derive(Debug)]
pub(super) struct DefaultShardStorage;

impl ShardStorage for DefaultShardStorage {
	/// Important function to implement on shard storage interface to read local shard data and return a response to the requesting node.
	fn fetch_data(&mut self, key: ByteVector) -> ByteVector {
		// Simply echo incoming data request
		key
	}
}

/// Trait that specifies sharding logic and behaviour of shards.
#[async_trait]
pub trait Sharding
where
	Self::Key: Send + Sync,
	Self::ShardId: ToString + Send + Sync,
{
	/// The type of the shard key e.g hash, range etc.
	type Key: ?Sized;
	/// The identifier pointing to a specific shard.
	type ShardId;

	/// Map a key to a shard.
	fn locate_shard(&self, key: &Self::Key) -> Option<Self::ShardId>;

	/// Return the state of the shard network.
	async fn network_state(core: Core) -> HashMap<String, HashSet<PeerId>> {
		core.network_info.sharding.state.lock().await.clone()
	}

	/// Join a shard network.
	async fn join_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
		// Ensure the network sharding ID is set.
		let network_shard_id: Vec<u8> = match &core.network_info.sharding.id {
			id if !id.is_empty() => id.clone().into(),
			_ => return Err(NetworkError::MissingShardingNetworkIdError),
		};
		let network_sharding_id = String::from_utf8_lossy(&network_shard_id).to_string();

		// Join the generic shard (gossip) network
		let gossip_request = AppData::GossipsubJoinNetwork(network_sharding_id.clone());
		let _ = core.query_network(gossip_request).await?;

		// Update the local shard state
		let mut shard_state = core.network_info.sharding.state.lock().await;
		shard_state
			.entry(shard_id.to_string())
			.or_insert_with(Default::default)
			.insert(core.peer_id());

		// Free `Core`
		drop(shard_state);

		// Join the shard network (as a replica network)
		let _ = core.join_repl_network(shard_id.to_string()).await;

		// Inform the entire network about our decision
		let message = vec![
			Core::SHARD_GOSSIP_JOIN_FLAG.as_bytes().to_vec(), // Flag for join event.
			core.peer_id().to_string().into_bytes(),          // Our peer ID.
			shard_id.to_string().into_bytes(),                // Shard we're joining
		];

		let gossip_request = AppData::GossipsubBroadcastMessage {
			topic: network_sharding_id,
			message,
		};

		// Gossip the join event to all nodes.
		core.query_network(gossip_request).await?;

		Ok(())
	}

	/// Exit a shard network.
	async fn exit_network(&self, mut core: Core, shard_id: &Self::ShardId) -> NetworkResult<()> {
		// First, we remove ourself from the network state
		let mut shard_state = core.network_info.sharding.state.lock().await;
		let shard_entry = shard_state
			.entry(shard_id.to_string())
			.or_insert(Default::default());

		shard_entry.retain(|entry| entry != &core.peer_id());

		// If the last node has exited the shard, dissolve it
		if shard_entry.is_empty() {
			shard_state.remove(&shard_id.to_string());
		}

		// Release `core`
		drop(shard_state);

		// Then, we make a broadcast
		let message = vec![
			Core::SHARD_GOSSIP_EXIT_FLAG.to_string().into(), // Appropriate flag
			core.peer_id().to_base58().into(),               // Our peerId
			shard_id.to_string().into(),                     // Network we're leaving
		];

		// Prepare a gossip request
		let gossip_request = AppData::GossipsubBroadcastMessage {
			topic: core.network_info.sharding.id.clone(),
			message,
		};

		let _ = core.query_network(gossip_request).await?;

		// Check if we're in any shard
		let shard_state = core.network_info.sharding.state.lock().await;
		if !shard_state
			.iter()
			.any(|(_, peers)| peers.contains(&core.peer_id()))
		{
			// Release `core`
			drop(shard_state);

			// Leave the underlying sharding (gossip) network
			let gossip_request =
				AppData::GossipsubJoinNetwork(core.network_info.sharding.id.clone());
			core.query_network(gossip_request).await?;
		}

		Ok(())
	}

	/// Send data to peers in the appropriate logical shard. It returns the data if the node is a
	/// member of the shard after replicating it to fellow nodes in the same shard.
	async fn shard(
		&self,
		mut core: Core,
		key: &Self::Key,
		data: ByteVector,
	) -> NetworkResult<Option<ByteVector>> {
		// Locate the shard that would store the key.
		let shard_id = match self.locate_shard(key) {
			Some(shard_id) => shard_id,
			None => return Err(NetworkError::ShardNotFound),
		};

		// Retrieve the nodes in the logical shard.
		let nodes = {
			let shard_state = core.network_info.sharding.state.lock().await;
			shard_state.get(&shard_id.to_string()).cloned()
		};

		// If no nodes exist for the shard, return an error.
		let nodes = match nodes {
			Some(nodes) => nodes,
			None => return Err(NetworkError::MissingShardNodesError),
		};

		// Check if the current node is part of the shard.
		if nodes.contains(&core.peer_id()) {
			// Replicate the data to nodes in the shard.
			let _ = core.replicate(data.clone(), &shard_id.to_string()).await;
			return Ok(Some(data)); // Return the data to the caller.
		}

		// Prepare the message for data forwarding.
		let mut message = vec![
			Core::RPC_DATA_FORWARDING_FLAG.as_bytes().to_vec(), /* Flag to indicate data
			                                                     * forwarding. */
			shard_id.to_string().into_bytes(),
		];
		message.extend(data); // Append the data payload.

		// Shuffle nodes so their order of query is randomized
		let mut rng = StdRng::from_entropy();
		let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();

		nodes.shuffle(&mut rng);

		// Attempt to forward the data to peers.
		for peer in nodes {
			let rpc_request = AppData::SendRpc {
				keys: message.clone(),
				peer: peer.clone(),
			};

			// Query the network and return success on the first successful response.
			// The recieving node will then replicate it to other nodes in the shard.
			if core.query_network(rpc_request).await.is_ok() {
				return Ok(None); // Forwarding succeeded.
			}
		}

		// If all peers fail, return an error.
		Err(NetworkError::DataForwardingError)
	}

	/// Fetch data from the shard network. It returns `None` if the node is a memnber of the shard
	/// with the data, meaning the node should read it locally.
	async fn fetch(
		&self,
		mut core: Core,
		key: &Self::Key,
		mut data: ByteVector,
	) -> NetworkResult<Option<ByteVector>> {
		// Locate the shard that would store the key.
		let shard_id = match self.locate_shard(key) {
			Some(shard_id) => shard_id,
			None => return Err(NetworkError::ShardingFailureError),
		};

		// Retrieve the nodes in the logical shard.
		let nodes = {
			let shard_state = core.network_info.sharding.state.lock().await;
			shard_state.get(&shard_id.to_string()).cloned()
		};

		// If no nodes exist for the shard, return an error.
		let nodes = match nodes {
			Some(nodes) => nodes,
			None => return Err(NetworkError::ShardingFetchError),
		};

		// Check if the current node is part of the shard.
		if nodes.contains(&core.peer_id()) {
			// Return `None`
			return Ok(None);
		}

		// Shuffle the peers.
		let mut rng = StdRng::from_entropy();
		let mut nodes = nodes.iter().cloned().collect::<Vec<_>>();

		nodes.shuffle(&mut rng);

		// Prepare an RPC to ask for the data from nodes in the shard.
		let mut message = vec![
			Core::SHARD_RPC_REQUEST_FLAG.as_bytes().to_vec(), /* Flag to indicate shard data
			                                                   * request */
		];
		message.append(&mut data);

		// Attempt to forward the data to peers.
		for peer in nodes {
			let rpc_request = AppData::SendRpc {
				keys: message.clone(),
				peer: peer.clone(),
			};

			// Query the network and return the response on the first successful response.
			if let Ok(response) = core.query_network(rpc_request).await {
				if let AppResponse::SendRpc(data) = response {
					return Ok(Some(data));
				}
			}
		}

		// Fetch Failed
		Err(NetworkError::ShardingFetchError)
	}
}