A library to build custom networking layers for decentralized and distributed applications
SwarmNL is a library designed for P2P networking in distributed systems. It’s lightweight, scalable, and easy to configure, making it perfect for decentralized applications. Powered by libp2p, SwarmNL simplifies networking so developers can focus on building.
Visit the deployed Rust docs here.
SwarmNL makes buiding a peer-to-peer decentralized and distributed networking stack for your application a breeze. With SwarmNL, you can effortlessly configure nodes, tailor network conditions, and fine-tune behaviors specific to your project’s needs, allowing you to dive into networking without any hassle.
Say goodbye to the complexities of networking and hello to simplicity. With SwarmNL, all the hard work is done for you, leaving you to focus on simple configurations and your application logic.
Have a look at these step-by-step examples that demonstrate the use of SwarmNL in various contexts:
Visit the examples folder here to gain a fuller understanding on ways to use the library, including how to integrate SwarmNL with IPFS and HTTP servers.
Have a look at this document for a technical overview of SwarmNL and it’s design choices.
SwarmNL provides a simple interface to configure a node and specify parameters to dictate its behaviour. This includes:
#![cfg_attr(not(doctest))]
//! Using the default node setup configuration
// Default config
let config = BootstrapConfig::default();
// Build node or network core
let node = CoreBuilder::with_config(config)
.build()
.await
.unwrap();
//! Using a custom node setup configuration
// Custom configuration
// a. Using config from an `.ini` file
let config = BootstrapConfig::from_file("bootstrap_config.ini");
// b. Using config methods
let mut bootnode = HashMap::new(); // Bootnodes
let ports = (1509, 2710); // TCP, UDP ports
bootnode.insert(
PeerId::random(),
"/ip4/x.x.x.x/tcp/1509".to_string()
);
let config = BootstrapConfig::new()
.with_bootnodes(bootnode)
.with_tcp(ports.0)
.with_udp(ports.1);
// Build node or network core
let node = CoreBuilder::with_config(config)
.build()
.await
.unwrap();
Please look at a template .ini
file here for configuring a node in the network.
During network operations, various events are generated. These events help us track the activities in the network layer. When generated, they are stored in an internal buffer until they are explicitly polled and consumed, or until the queue is full. It is important to consume critical events promptly to prevent loss if the buffer becomes full.
#![cfg_attr(not(doctest))]
//! Consuming the events by retrieving it as a iterator
// Default config
let config = BootstrapConfig::default();
// Build node or network core
let node = CoreBuilder::with_config(config)
.build()
.await
.unwrap();
// Read all currently buffered network events
let events = node.events().await;
let _ = events
.map(|e| {
match e {
NetworkEvent::NewListenAddr {
local_peer_id,
listener_id: _,
address,
} => {
// Announce interfaces we're listening on
println!("Peer id: {}", local_peer_id);
println!("We're listening on the {}", address);
},
NetworkEvent::ConnectionEstablished {
peer_id,
connection_id: _,
endpoint: _,
num_established: _,
established_in: _,
} => {
println!("Connection established with peer: {:?}", peer_id);
},
_ => {},
}
})
.collect::<Vec<_>>();
//! Consume the immediate next events in the internal event buffer
// Read events generated at setup
while let Some(event) = node.next_event().await {
match event {
NetworkEvent::NewListenAddr {
local_peer_id,
listener_id: _,
address,
} => {
// announce interfaces we're listening on
println!("Peer id: {}", local_peer_id);
println!("We're listening on the {}", address);
},
NetworkEvent::ConnectionEstablished {
peer_id,
connection_id: _,
endpoint: _,
num_established: _,
established_in: _,
} => {
println!("Connection established with peer: {:?}", peer_id);
},
_ => {},
}
}
For communication, SwarmNL leverages the powerful capabilities of libp2p. These includes:
#![cfg_attr(not(doctest))]
//! Communicate with remote nodes using the simple and familiar async-await paradigm.
// Build node or network core
let node = CoreBuilder::with_config(config, state)
.build()
.await
.unwrap();
// Communication interfaces
// a. Kademlia DHT e.g
// Prepare an kademlia `store_record` request to send to the network layer
let (key, value, expiration_time, explicit_peers) = (
KADEMLIA_TEST_KEY.as_bytes().to_vec(),
KADEMLIA_TEST_VALUE.as_bytes().to_vec(),
None,
None,
);
let kad_request = AppData::KademliaStoreRecord {
key: key.clone(),
value,
expiration_time,
explicit_peers,
};
// Send request
if let Ok(result) = node.query_network(kad_request).await {
assert_eq!(KademliaStoreRecordSuccess,result);
}
// b. RPC (request-response) e.g
// Prepare a RPC fetch request
let fetch_key = vec!["SomeFetchKey".as_bytes().to_vec()];
let fetch_request = AppData::SendRpc {
keys: fetch_key.clone(),
peer: node4_peer_id,
};
// Get a stream id to track the request
let stream_id = node.send_to_network(fetch_request).await.unwrap();
// Poll for the result
if let Ok(result) = node.recv_from_network(stream_id).await {
// Here, the request data was simply echoed by the remote peer
assert_eq!(AppResponse::SendRpc(fetch_key), result);
}
// c. Gossiping e.g
// Prepare gossip request
let gossip_request = AppData::GossipsubBroadcastMessage {
topic: GOSSIP_NETWORK.to_string(),
message: vec!["Daniel".to_string(), "Deborah".to_string()],
};
if let Ok(result) = node.query_network(gossip_request).await {
assert_eq!(AppResponse::GossipsubBroadcastSuccess, result);
}
SwarmNL makes fault tolerance through redundancy simple and easy to integrate into your application. With replication built into SwarmNL, you can achieve robust and scalable systems effortlessly.
Here’s how you can set up and use SwarmNL’s replication capabilities:
#![cfg_attr(not(doctest))]
//! Configure the node for replication with a strong consistency model
// Define the replica network ID
const REPL_NETWORK_ID: &str = "replica_xx";
// Configure replication settings
let repl_config = ReplNetworkConfig::Custom {
queue_length: 150,
expiry_time: Some(10),
sync_wait_time: 5,
consistency_model: ConsistencyModel::Strong(ConsensusModel::All),
data_aging_period: 2,
};
// Build the node with replication enabled
let node = builder.with_replication(repl_config).build().await.unwrap();
// Join a replica network
node.join_repl_network(REPL_NETWORK_ID.into()).await;
// Replicate data across the network
node.replicate(payload, REPL_NETWORK_ID).await;
SwarmNL exposes network events to your application, allowing you to process incoming replica data effectively.
//! Listen for replication events
loop {
// Check for incoming data events
if let Some(event) = node.next_event().await {
if let NetworkEvent::ReplicaDataIncoming { source, .. } = event {
println!("Received incoming replica data from {}", source.to_base58());
}
}
// Try to consume data from the replication buffer
if let Some(repl_data) = node.consume_repl_data(REPL_NETWORK_ID).await {
println!(
"Data received from replica: {} ({} confirmations)",
repl_data.data[0],
repl_data.confirmations.unwrap()
);
}
}
Sharding is a capability in distributed systems that enables networks to scale efficiently. SwarmNL provides a generic sharding functionality, allowing applications to easily partition their network and configure it for sharding.
trap
into the application layer when handling data requests. This ensures practicality and usability in real-world scenarios.Here’s how you can set up and use SwarmNL’s sharding capabilities:
#![cfg_attr(not(doctest))]
//! Configure a node for sharding operations
/// The constant id of the sharded network. Should be kept as a secret.
pub const NETWORK_SHARDING_ID: &'static str = "sharding_xx";
/// The shard local storage which is a directory in the local filesystem.
#[derive(Debug)]
struct LocalStorage;
impl LocalStorage {
/// Reads a file's content from the working directory.
fn read_file(&self, key: &str) -> Option<ByteVector> {
let mut file = fs::File::open(key).ok()?;
let mut content = Vec::new();
file.read_to_end(&mut content).ok()?;
// Wrap the content in an outer Vec
Some(vec![content])
}
}
// Implement the `ShardStorage` trait for our local storage
impl ShardStorage for LocalStorage {
fn fetch_data(&mut self, key: ByteVector) -> ByteVector {
// Process each key in the ByteVector
for sub_key in key.iter() {
let key_str = String::from_utf8_lossy(sub_key);
// Attempt to read the file corresponding to the key
if let Some(data) = self.read_file(&format!("storage/{}", key_str.as_ref())) {
return data;
}
}
// If no match is found, return an empty ByteVector
Default::default()
}
}
/// Hash-based sharding implementation.
pub struct HashSharding;
impl HashSharding {
/// Compute a simple hash for the key.
fn hash_key(&self, key: &str) -> u64 {
// Convert the key to bytes
let key_bytes = key.as_bytes();
// Generate a hash from the first byte
if let Some(&first_byte) = key_bytes.get(0) {
key_bytes.iter().fold(first_byte as u64, |acc, &byte| {
acc.wrapping_add(byte as u64)
})
} else {
0
}
}
}
/// Implement the `Sharding` trait.
impl Sharding for HashSharding {
type Key = str;
type ShardId = String;
/// Locate the shard corresponding to the given key.
fn locate_shard(&self, key: &Self::Key) -> Option<Self::ShardId> {
// Calculate and return hash
Some(self.hash_key(key).to_string())
}
}
// Local shard storage
let local_storage = Arc::new(Mutex::new(LocalStorage));
// Configure node for replication, we will be using an eventual consistency model here.
let repl_config = ReplNetworkConfig::Custom {
queue_length: 150,
expiry_time: Some(10),
sync_wait_time: 5,
consistency_model: ConsistencyModel::Eventual,
data_aging_period: 2,
};
let node = builder
.with_replication(repl_config)
.with_sharding(NETWORK_SHARDING_ID.into(), shard_storage)
.build()
.await
.unwrap();
#![cfg_attr(not(doctest))]
//! Select a sharding algorithm and assign nodes to their respective shards
// Initialize the hash-based sharding policy
let shard_executor = HashSharding;
// Get shard IDs using the configured location algorithm.
let shard_id_1 = shard_executor.locate_shard("earth").unwrap();
let shard_id_2 = shard_executor.locate_shard("mars").unwrap();
// Nodes join their respective shards
// Node 2 and Node 3 will join the same shard, enabling replication to maintain
// a consistent shard network state across nodes.
match name {
"Node 1" => {
if shard_executor
.join_network(node.clone(), &shard_id_1)
.await
.is_ok()
{
println!("Successfully joined shard: {}", shard_id_1);
}
},
"Node 2" => {
if shard_executor
.join_network(node.clone(), &shard_id_2)
.await
.is_ok()
{
println!("Successfully joined shard: {}", shard_id_2);
}
},
"Node 3" => {
if shard_executor
.join_network(node.clone(), &shard_id_2)
.await
.is_ok()
{
println!("Successfully joined shard: {}", shard_id_2);
}
},
_ => {}
}
let shard_key = "mars".to_string();
// Store data across the network in the shard pointed to by the key
match shard_executor
.shard(
node.clone(),
&shard_key,
payload,
)
.await;
A node can receive data either through forwarding from a node in another shard or via replication from a peer node in the same shard. Below is an example demonstrating how to listen for and handle both types of events.
//! Listen for and consume data from a sharded network.
loop {
// Check for incoming data events
if let Some(event) = node.next_event().await {
// Handle incoming data events
match event {
NetworkEvent::IncomingForwardedData { data, source } => {
println!(
"Received forwarded data: {:?} from peer: {}",
data,
source.to_base58()
);
// Split the contents of the incoming data
let data_vec = data[0].split(" ").collect::<Vec<_>>();
// Extract file name and content
if let [file_name, content] = &data_vec[..] {
let _ = append_to_file(file_name, content).await;
}
},
NetworkEvent::ReplicaDataIncoming {
data,
network,
source,
..
} => {
println!(
"Received replica data: {:?} from shard peer: {}",
data,
source.to_base58()
);
if let Some(repl_data) = node.consume_repl_data(&network).await {
// Split the contents of the incoming data
let data = repl_data.data[0].split(" ").collect::<Vec<_>>();
// Extract file name and content
if let [file_name, content] = &data[..] {
let _ = append_to_file(file_name, content).await;
}
} else {
println!("Error: No message in replica buffer");
}
},
_ => {}
}
}
}
SwarmNL integrates the networking and storage layers to deliver a seamless sharding experience. This approach enables nodes to interact directly with the application layer and local environment, providing a robust and flexible solution for scalable distributed systems.
In future iterations, we will be working on:
KeyPair
is holding.From<&str>
for KeyType
to read a key type from a bootstrap config
file.Multiaddr
type.PeerId
type.