swarm_nl/core/
prelude.rs

1// Copyright 2024 Algorealm, Inc.
2// Apache 2.0 License
3
4//! The module that contains important data structures and logic for the functioning of SwarmNL.
5
6use self::ping_config::PingInfo;
7use libp2p::gossipsub::MessageId;
8use libp2p_identity::PublicKey;
9use serde::{Deserialize, Serialize};
10use std::fmt::Debug;
11use std::hash::Hash;
12use std::{collections::VecDeque, time::Instant};
13use thiserror::Error;
14
15use super::*;
16
17/// The duration (in seconds) to wait for response from the network layer before timing
18/// out.
19pub const NETWORK_READ_TIMEOUT: Seconds = 30;
20
21/// The time it takes for the task to sleep before it can recheck if an output has been placed in
22/// the response buffer.
23pub const TASK_SLEEP_DURATION: Seconds = 3;
24
25/// The height of the internal queue. This represents the maximum number of elements that a queue
26/// can accommodate without losing its oldest elements.
27const MAX_QUEUE_ELEMENTS: usize = 300;
28
29/// Type that represents the response of the network layer to the application layer's event handler.
30pub type AppResponseResult = Result<AppResponse, NetworkError>;
31
32/// Type that represents the data exchanged during RPC operations.
33pub type RpcData = ByteVector;
34
35/// Type that represents a vector of vector of bytes.
36pub type ByteVector = Vec<Vec<u8>>;
37
38/// Type that represents the id of a shard.
39pub type ShardId = String;
40
41/// Type that represents the result for network operations.
42pub type NetworkResult<T> = Result<T, NetworkError>;
43
44/// Type that represents a vector of string.
45pub type StringVector = Vec<String>;
46
47/// Type that represents a nonce.
48pub type Nonce = u64;
49
50/// Time to wait (in seconds) for the node (network layer) to boot.
51pub(super) const BOOT_WAIT_TIME: Seconds = 1;
52
53/// The buffer capacity of an mpsc stream.
54pub(super) const STREAM_BUFFER_CAPACITY: usize = 100;
55
56/// Data exchanged over a stream between the application and network layer.
57#[derive(Debug, Clone)]
58pub(super) enum StreamData {
59	/// Application data sent over the stream.
60	FromApplication(StreamId, AppData),
61	/// Network response data sent over the stream to the application layer.
62	ToApplication(StreamId, AppResponse),
63}
64
65/// Request sent from the application layer to the networking layer.
66#[derive(Debug, Clone)]
67pub enum AppData {
68	/// A simple echo message.
69	Echo(String),
70	/// Dail peer.
71	DailPeer(PeerId, MultiaddrString),
72	/// Store a value associated with a given key in the Kademlia DHT.
73	KademliaStoreRecord {
74		key: Vec<u8>,
75		value: Vec<u8>,
76		// expiration time for local records
77		expiration_time: Option<Instant>,
78		// store on explicit peers
79		explicit_peers: Option<Vec<PeerIdString>>,
80	},
81	/// Perform a lookup of a value associated with a given key in the Kademlia DHT.
82	KademliaLookupRecord { key: Vec<u8> },
83	/// Perform a lookup of peers that store a record.
84	KademliaGetProviders { key: Vec<u8> },
85	/// Stop providing a record on the network.
86	KademliaStopProviding { key: Vec<u8> },
87	/// Remove record from local store.
88	KademliaDeleteRecord { key: Vec<u8> },
89	/// Return important information about the local routing table.
90	KademliaGetRoutingTableInfo,
91	/// Fetch data(s) quickly from a peer over the network.
92	SendRpc { keys: RpcData, peer: PeerId },
93	/// Get network information about the node.
94	GetNetworkInfo,
95	/// Send message to gossip peers in a mesh network.
96	GossipsubBroadcastMessage {
97		/// Topic to send messages to
98		topic: String,
99		message: ByteVector,
100	},
101	/// Join a mesh network.
102	GossipsubJoinNetwork(String),
103	/// Get gossip information about node.
104	GossipsubGetInfo,
105	/// Leave a network we are a part of.
106	GossipsubExitNetwork(String),
107	/// Blacklist a peer explicitly.
108	GossipsubBlacklistPeer(PeerId),
109	/// Remove a peer from the blacklist.
110	GossipsubFilterBlacklist(PeerId),
111}
112
113/// Response to requests sent from the application to the network layer.
114#[derive(Debug, Clone, PartialEq)]
115pub enum AppResponse {
116	/// The value written to the network.
117	Echo(String),
118	/// The peer we dailed.
119	DailPeerSuccess(String),
120	/// Store record success.
121	KademliaStoreRecordSuccess,
122	/// DHT lookup result.
123	KademliaLookupSuccess(Vec<u8>),
124	/// Nodes storing a particular record in the DHT.
125	KademliaGetProviders {
126		key: Vec<u8>,
127		providers: Vec<PeerIdString>,
128	},
129	/// No providers found.
130	KademliaNoProvidersFound,
131	/// Routing table information.
132	KademliaGetRoutingTableInfo { protocol_id: String },
133	/// Result of RPC operation.
134	SendRpc(RpcData),
135	/// A network error occured while executing the request.
136	Error(NetworkError),
137	/// Important information about the node.
138	GetNetworkInfo {
139		peer_id: PeerId,
140		connected_peers: Vec<PeerId>,
141		external_addresses: Vec<MultiaddrString>,
142	},
143	/// Successfully broadcast to the network.
144	GossipsubBroadcastSuccess,
145	/// Successfully joined a mesh network.
146	GossipsubJoinSuccess,
147	/// Successfully exited a mesh network.
148	GossipsubExitSuccess,
149	/// Gossipsub information about node.
150	GossipsubGetInfo {
151		/// Topics that the node is currently subscribed to
152		topics: StringVector,
153		/// Peers we know about and their corresponding topics
154		mesh_peers: Vec<(PeerId, StringVector)>,
155		/// Peers we have blacklisted
156		blacklist: HashSet<PeerId>,
157	},
158	/// A peer was successfully blacklisted.
159	GossipsubBlacklistSuccess,
160}
161
162/// Network error type containing errors encountered during network operations.
163#[derive(Error, Debug, Clone, PartialEq)]
164pub enum NetworkError {
165	#[error("timeout occured waiting for data from network layer")]
166	NetworkReadTimeout,
167	#[error("internal request stream buffer is full")]
168	StreamBufferOverflow,
169	#[error("failed to store record in DHT")]
170	KadStoreRecordError(Vec<u8>),
171	#[error("failed to fetch data from peer")]
172	RpcDataFetchError,
173	#[error("failed to fetch record from the DHT")]
174	KadFetchRecordError(Vec<u8>),
175	#[error("task carrying app response panicked")]
176	InternalTaskError,
177	#[error("failed to dail peer")]
178	DailPeerError,
179	#[error("failed to broadcast message to peers in the topic")]
180	GossipsubBroadcastMessageError,
181	#[error("failed to join a mesh network")]
182	GossipsubJoinNetworkError,
183	#[error("failed to exit a mesh network")]
184	GossipsubExitNetworkError,
185	#[error("internal stream failed to transport data")]
186	InternalStreamError,
187	#[error("replica network not found")]
188	MissingReplNetwork,
189	#[error("network id for sharding has not been configured. See `CoreBuilder::with_shard()`")]
190	MissingShardingNetworkIdError,
191	#[error("threshold for data forwarding not met")]
192	DataForwardingError,
193	#[error("failed to shard data")]
194	ShardingFailureError,
195	#[error("failed to fetch sharded data")]
196	ShardingFetchError,
197	#[error("shard not found for input key")]
198	ShardNotFound,
199	#[error("no nodes found in logical shard")]
200	MissingShardNodesError,
201}
202
203/// A simple struct used to track requests sent from the application layer to the network layer.
204#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
205pub struct StreamId(u32);
206
207impl StreamId {
208	/// Generate a new random stream id.
209	/// Must only be called once.
210	pub fn new() -> Self {
211		StreamId(0)
212	}
213
214	/// Generate a new random stream id, using the current as reference.
215	pub fn next(current_id: StreamId) -> Self {
216		StreamId(current_id.0.wrapping_add(1))
217	}
218}
219
220/// Type that keeps track of the requests from the application layer.
221/// This type has a maximum buffer size and will drop subsequent requests when full.
222/// It is unlikely to be ever full as the default is usize::MAX except otherwise specified during
223/// configuration. It is always good practice to read responses from the internal stream buffer
224/// using `query_network()` or explicitly using `recv_from_network`.
225#[derive(Clone, Debug)]
226pub(super) struct StreamRequestBuffer {
227	/// Max requests we can keep track of.
228	size: usize,
229	buffer: HashSet<StreamId>,
230}
231
232impl StreamRequestBuffer {
233	/// Create a new request buffer.
234	pub fn new(buffer_size: usize) -> Self {
235		Self {
236			size: buffer_size,
237			buffer: HashSet::new(),
238		}
239	}
240
241	/// Push [`StreamId`]s into buffer.
242	/// Returns `false` if the buffer is full and request cannot be stored.
243	pub fn insert(&mut self, id: StreamId) -> bool {
244		if self.buffer.len() < self.size {
245			self.buffer.insert(id);
246			return true;
247		}
248		false
249	}
250}
251
252/// Type that keeps track of the response to the requests from the application layer.
253pub(super) struct StreamResponseBuffer {
254	/// Max responses we can keep track of.
255	size: usize,
256	buffer: HashMap<StreamId, AppResponseResult>,
257}
258
259impl StreamResponseBuffer {
260	/// Create a new request buffer.
261	pub fn new(buffer_size: usize) -> Self {
262		Self {
263			size: buffer_size,
264			buffer: HashMap::new(),
265		}
266	}
267
268	/// Push a [`StreamId`] into buffer.
269	/// Returns `false` if the buffer is full and request cannot be stored.
270	pub fn insert(&mut self, id: StreamId, response: AppResponseResult) -> bool {
271		if self.buffer.len() < self.size {
272			self.buffer.insert(id, response);
273			return true;
274		}
275		false
276	}
277
278	/// Remove a [`StreamId`] from the buffer.
279	pub fn remove(&mut self, id: &StreamId) -> Option<AppResponseResult> {
280		self.buffer.remove(&id)
281	}
282}
283
284/// Type representing the RPC data structure sent between nodes in the network.
285#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
286pub(super) enum Rpc {
287	/// Using request-response.
288	ReqResponse { data: RpcData },
289}
290
291/// The configuration for the RPC protocol.
292pub enum RpcConfig {
293	Default,
294	Custom {
295		/// Timeout for inbound and outbound requests.
296		timeout: Duration,
297		/// Maximum number of concurrent inbound + outbound streams.
298		max_concurrent_streams: usize,
299	},
300}
301
302/// Enum that represents the events generated in the network layer.
303#[derive(Debug, Clone, Eq, PartialEq, Hash)]
304pub enum NetworkEvent {
305	/// Event that informs the application that we have started listening on a new multiaddr.
306	///
307	/// # Fields
308	///
309	/// - `local_peer_id`: The `PeerId` of the local peer.
310	/// - `listener_id`: The ID of the listener.
311	/// - `address`: The new `Multiaddr` where the local peer is listening.
312	NewListenAddr {
313		local_peer_id: PeerId,
314		listener_id: ListenerId,
315		address: Multiaddr,
316	},
317	/// Event that informs the application that a new peer (with its location details) has just
318	/// been added to the routing table.
319	///
320	/// # Fields
321	///
322	/// - `peer_id`: The `PeerId` of the new peer added to the routing table.
323	RoutingTableUpdated { peer_id: PeerId },
324	/// Event that informs the application about a newly established connection to a peer.
325	///
326	/// # Fields
327	///
328	/// - `peer_id`: The `PeerId` of the connected peer.
329	/// - `connection_id`: The ID of the connection.
330	/// - `endpoint`: The `ConnectedPoint` information about the connection's endpoint.
331	/// - `num_established`: The number of established connections with this peer.
332	/// - `established_in`: The duration it took to establish the connection.
333	ConnectionEstablished {
334		peer_id: PeerId,
335		connection_id: ConnectionId,
336		endpoint: ConnectedPoint,
337		num_established: NonZeroU32,
338		established_in: Duration,
339	},
340	/// Event that informs the application about a closed connection to a peer.
341	///
342	/// # Fields
343	///
344	/// - `peer_id`: The `PeerId` of the peer.
345	/// - `connection_id`: The ID of the connection.
346	/// - `endpoint`: The `ConnectedPoint` information about the connection's endpoint.
347	/// - `num_established`: The number of remaining established connections with this peer.
348	ConnectionClosed {
349		peer_id: PeerId,
350		connection_id: ConnectionId,
351		endpoint: ConnectedPoint,
352		num_established: u32,
353	},
354	/// Event that announces an expired listen address.
355	///
356	/// # Fields
357	///
358	/// - `listener_id`: The ID of the listener.
359	/// - `address`: The expired `Multiaddr`.
360	ExpiredListenAddr {
361		listener_id: ListenerId,
362		address: Multiaddr,
363	},
364	/// Event that announces a closed listener.
365	///
366	/// # Fields
367	///
368	/// - `listener_id`: The ID of the listener.
369	/// - `addresses`: The list of `Multiaddr` where the listener was listening.
370	ListenerClosed {
371		listener_id: ListenerId,
372		addresses: Vec<Multiaddr>,
373	},
374	/// Event that announces a listener error.
375	///
376	/// # Fields
377	///
378	/// - `listener_id`: The ID of the listener that encountered the error.
379	ListenerError { listener_id: ListenerId },
380	/// Event that announces a dialing attempt.
381	///
382	/// # Fields
383	///
384	/// - `peer_id`: The `PeerId` of the peer being dialed, if known.
385	/// - `connection_id`: The ID of the connection attempt.
386	Dialing {
387		peer_id: Option<PeerId>,
388		connection_id: ConnectionId,
389	},
390	/// Event that announces a new external address candidate.
391	///
392	/// # Fields
393	///
394	/// - `address`: The new external address candidate.
395	NewExternalAddrCandidate { address: Multiaddr },
396	/// Event that announces a confirmed external address.
397	///
398	/// # Fields
399	///
400	/// - `address`: The confirmed external address.
401	ExternalAddrConfirmed { address: Multiaddr },
402	/// Event that announces an expired external address.
403	///
404	/// # Fields
405	///
406	/// - `address`: The expired external address.
407	ExternalAddrExpired { address: Multiaddr },
408	/// Event that announces a new connection arriving on a listener and in the process of
409	/// protocol negotiation.
410	///
411	/// # Fields
412	///
413	/// - `connection_id`: The ID of the incoming connection.
414	/// - `local_addr`: The local `Multiaddr` where the connection is received.
415	/// - `send_back_addr`: The remote `Multiaddr` of the peer initiating the connection.
416	IncomingConnection {
417		connection_id: ConnectionId,
418		local_addr: Multiaddr,
419		send_back_addr: Multiaddr,
420	},
421	/// Event that announces an error happening on an inbound connection during its initial
422	/// handshake.
423	///
424	/// # Fields
425	///
426	/// - `connection_id`: The ID of the incoming connection.
427	/// - `local_addr`: The local `Multiaddr` where the connection was received.
428	/// - `send_back_addr`: The remote `Multiaddr` of the peer initiating the connection.
429	IncomingConnectionError {
430		connection_id: ConnectionId,
431		local_addr: Multiaddr,
432		send_back_addr: Multiaddr,
433	},
434	/// Event that announces an error happening on an outbound connection during its initial
435	/// handshake.
436	///
437	/// # Fields
438	///
439	/// - `connection_id`: The ID of the outbound connection.
440	/// - `peer_id`: The `PeerId` of the peer being connected to, if known.
441	OutgoingConnectionError {
442		connection_id: ConnectionId,
443		peer_id: Option<PeerId>,
444	},
445	/// Event that announces the arrival of a pong message from a peer.
446	///
447	/// # Fields
448	///
449	/// - `peer_id`: The `PeerId` of the peer that sent the pong message.
450	/// - `duration`: The duration it took for the round trip.
451	OutboundPingSuccess { peer_id: PeerId, duration: Duration },
452	/// Event that announces a `Ping` error.
453	///
454	/// # Fields
455	///
456	/// - `peer_id`: The `PeerId` of the peer that encountered the ping error.
457	OutboundPingError { peer_id: PeerId },
458	/// Event that announces the arrival of a `PeerInfo` via the `Identify` protocol.
459	///
460	/// # Fields
461	///
462	/// - `peer_id`: The `PeerId` of the peer that sent the identify info.
463	/// - `info`: The `IdentifyInfo` received from the peer.
464	IdentifyInfoReceived { peer_id: PeerId, info: IdentifyInfo },
465	/// Event that announces the successful write of a record to the DHT.
466	///
467	/// # Fields
468	///
469	/// - `key`: The key of the record that was successfully written.
470	KademliaPutRecordSuccess { key: Vec<u8> },
471	/// Event that announces the failure of a node to save a record.
472	KademliaPutRecordError,
473	/// Event that announces a node as a provider of a record in the DHT.
474	///
475	/// # Fields
476	///
477	/// - `key`: The key of the record being provided.
478	KademliaStartProvidingSuccess { key: Vec<u8> },
479	/// Event that announces the failure of a node to become a provider of a record in the DHT.
480	KademliaStartProvidingError,
481	/// Event that announces the arrival of an RPC message.
482	///
483	/// # Fields
484	///
485	/// - `data`: The `RpcData` of the received message.
486	RpcIncomingMessageHandled { data: RpcData },
487	/// Event that announces that a peer has just left a network.
488	///
489	/// # Fields
490	///
491	/// - `peer_id`: The `PeerId` of the peer that left.
492	/// - `topic`: The topic the peer unsubscribed from.
493	GossipsubUnsubscribeMessageReceived { peer_id: PeerId, topic: String },
494	/// Event that announces that a peer has just joined a network.
495	///
496	/// # Fields
497	///
498	/// - `peer_id`: The `PeerId` of the peer that joined.
499	/// - `topic`: The topic the peer subscribed to.
500	GossipsubSubscribeMessageReceived { peer_id: PeerId, topic: String },
501	/// Event that announces the arrival of a replicated data content
502	///
503	/// # Fields
504	///
505	/// - `data`: The data contained in the gossip message.
506	/// - `outgoing_timestamp`: The time the message left the source
507	/// - `outgoing_timestamp`: The time the message was recieved
508	/// - `message_id`: The unique id of the message
509	/// - `source`: The `PeerId` of the source peer.
510	ReplicaDataIncoming {
511		/// Data
512		data: StringVector,
513		/// The replica network that owns the data
514		network: String,
515		/// Timestamp at which the message left the sending node
516		outgoing_timestamp: Seconds,
517		/// Timestamp at which the message arrived
518		incoming_timestamp: Seconds,
519		/// Message ID to prevent deduplication. It is usually a hash of the incoming message
520		message_id: String,
521		/// Sender PeerId
522		source: PeerId,
523	},
524	/// Event that announces the arrival of a forwarded sharded data
525	///
526	/// # Fields
527	///
528	/// - `data`: The data contained in the gossip message.
529	IncomingForwardedData {
530		/// Data
531		data: StringVector,
532		/// Sender's PeerId
533		source: PeerId,
534	},
535	/// Event that announces the arrival of a gossip message.
536	///
537	/// # Fields
538	///
539	/// - `source`: The `PeerId` of the source peer.
540	/// - `data`: The data contained in the gossip message.
541	GossipsubIncomingMessageHandled { source: PeerId, data: StringVector },
542	// /// Event that announces the beginning of the filtering and authentication of the incoming
543	// /// gossip message.
544	// ///
545	// /// # Fields
546	// ///
547	// /// - `propagation_source`: The `PeerId` of the peer from whom the message was received.
548	// /// - `message_id`: The ID of the incoming message.
549	// /// - `source`: The `PeerId` of the original sender, if known.
550	// /// - `topic`: The topic of the message.
551	// /// - `data`: The data contained in the message.
552	// GossipsubIncomingMessageFiltered {
553	//     propagation_source: PeerId,
554	//     message_id: MessageId,
555	//     source: Option<PeerId>,
556	//     topic: String,
557	//     data: StringVector,
558	// },
559}
560
561/// The struct that contains incoming information about a peer returned by the `Identify` protocol.
562#[derive(Debug, Clone, Eq, PartialEq, Hash)]
563pub struct IdentifyInfo {
564	/// The public key of the remote peer.
565	pub public_key: PublicKey,
566	/// The address the remote peer is listening on.
567	pub listen_addrs: Vec<Multiaddr>,
568	/// The protocols supported by the remote peer.
569	pub protocols: Vec<StreamProtocol>,
570	/// The address we are listened on, observed by the remote peer.
571	pub observed_addr: Multiaddr,
572}
573
574/// Important information to obtain from the [`CoreBuilder`], to properly handle network
575/// operations.
576#[derive(Clone)]
577pub(super) struct NetworkInfo {
578	/// The name/id of the network.
579	pub id: StreamProtocol,
580	/// Important information to manage `Ping` operations.
581	pub ping: PingInfo,
582	/// Important information to manage `Gossipsub` operations.
583	pub gossipsub: gossipsub_cfg::GossipsubInfo,
584	/// The function that handles incoming RPC data request and produces a response.
585	pub rpc_handler_fn: fn(RpcData) -> RpcData,
586	/// The function to filter incoming gossip messages.
587	pub gossip_filter_fn: fn(PeerId, MessageId, Option<PeerId>, String, StringVector) -> bool,
588	/// Important information to manage `Replication` operations.
589	pub replication: replication::ReplInfo,
590	/// Important information to manage `sharding` operations.
591	pub sharding: sharding::ShardingInfo,
592}
593
594/// Module that contains important data structures to manage `Ping` operations on the network.
595pub mod ping_config {
596	use libp2p_identity::PeerId;
597	use std::{collections::HashMap, time::Duration};
598
599	/// Policies to handle a `Ping` error.
600	/// All connections to peers are closed during a disconnect operation.
601	#[derive(Debug, Clone)]
602	pub enum PingErrorPolicy {
603		/// Do not disconnect under any circumstances.
604		NoDisconnect,
605		/// Disconnect after a number of outbound errors.
606		DisconnectAfterMaxErrors(u16),
607		/// Disconnect after a certain number of concurrent timeouts.
608		DisconnectAfterMaxTimeouts(u16),
609	}
610
611	/// Struct that stores critical information for the execution of the [`PingErrorPolicy`].
612	#[derive(Debug, Clone)]
613	pub struct PingManager {
614		/// The number of timeout errors encountered from a peer.
615		pub timeouts: HashMap<PeerId, u16>,
616		/// The number of outbound errors encountered from a peer.
617		pub outbound_errors: HashMap<PeerId, u16>,
618	}
619
620	/// The configuration for the `Ping` protocol.
621	#[derive(Debug, Clone)]
622	pub struct PingConfig {
623		/// The interval between successive pings.
624		/// Default is 15 seconds.
625		pub interval: Duration,
626		/// The duration before which the request is considered failure.
627		/// Default is 20 seconds.
628		pub timeout: Duration,
629		/// Error policy.
630		pub err_policy: PingErrorPolicy,
631	}
632
633	/// Critical information to manage `Ping` operations.
634	#[derive(Debug, Clone)]
635	pub struct PingInfo {
636		pub policy: PingErrorPolicy,
637		pub manager: PingManager,
638	}
639}
640
641/// Module containing important state relating to the [`Gossipsub`](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/README.md) protocol.
642pub mod gossipsub_cfg {
643	use super::*;
644
645	/// The struct containing the list of blacklisted peers.
646	#[derive(Clone, Debug, Default)]
647	pub struct Blacklist {
648		// Blacklist
649		pub list: HashSet<PeerId>,
650	}
651
652	/// `Gossipsub` configuration.
653	pub enum GossipsubConfig {
654		/// A default configuration.
655		Default,
656		/// A custom configuration.
657		///
658		/// # Fields
659		///
660		/// - `config`: The custom configuration for gossipsub.
661		/// - `auth`: The signature authenticity check.
662		Custom {
663			config: gossipsub::Config,
664			auth: gossipsub::MessageAuthenticity,
665		},
666	}
667
668	impl Blacklist {
669		/// Return the inner list we're keeping track of.
670		pub fn into_inner(&self) -> HashSet<PeerId> {
671			self.list.clone()
672		}
673	}
674
675	/// Important information to manage `Gossipsub` operations.
676	#[derive(Clone)]
677	pub struct GossipsubInfo {
678		pub blacklist: Blacklist,
679	}
680}
681
682/// Queue that stores and removes data in a FIFO manner.
683#[derive(Clone)]
684pub(super) struct DataQueue<T: Debug + Clone + Eq + PartialEq + Hash> {
685	buffer: Arc<Mutex<VecDeque<T>>>,
686}
687
688impl<T> DataQueue<T>
689where
690	T: Debug + Clone + Eq + PartialEq + Hash,
691{
692	/// The initial buffer capacity, to optimize for speed and defer allocation
693	const INITIAL_BUFFER_CAPACITY: usize = 300;
694
695	/// Create new queue.
696	pub fn new() -> Self {
697		Self {
698			buffer: Arc::new(Mutex::new(VecDeque::with_capacity(
699				DataQueue::<T>::INITIAL_BUFFER_CAPACITY,
700			))),
701		}
702	}
703
704	/// Remove an item from the top of the queue.
705	pub async fn pop(&self) -> Option<T> {
706		self.buffer.lock().await.pop_front()
707	}
708
709	/// Append an item to the queue.
710	pub async fn push(&self, item: T) {
711		let mut buffer = self.buffer.lock().await;
712		if buffer.len() >= MAX_QUEUE_ELEMENTS {
713			buffer.pop_front();
714		}
715		buffer.push_back(item);
716	}
717
718	/// Return the inner data structure of the queue.
719	pub async fn into_inner(&self) -> VecDeque<T> {
720		self.buffer.lock().await.clone()
721	}
722
723	/// Drain the contents of the queue.
724	pub async fn drain(&mut self) {
725		self.buffer.lock().await.drain(..);
726	}
727}