swarm_nl/core/prelude.rs
1// Copyright 2024 Algorealm, Inc.
2// Apache 2.0 License
3
4//! The module that contains important data structures and logic for the functioning of SwarmNL.
5
6use self::ping_config::PingInfo;
7use libp2p::gossipsub::MessageId;
8use libp2p_identity::PublicKey;
9use serde::{Deserialize, Serialize};
10use std::fmt::Debug;
11use std::hash::Hash;
12use std::{collections::VecDeque, time::Instant};
13use thiserror::Error;
14
15use super::*;
16
17/// The duration (in seconds) to wait for response from the network layer before timing
18/// out.
19pub const NETWORK_READ_TIMEOUT: Seconds = 30;
20
21/// The time it takes for the task to sleep before it can recheck if an output has been placed in
22/// the response buffer.
23pub const TASK_SLEEP_DURATION: Seconds = 3;
24
25/// The height of the internal queue. This represents the maximum number of elements that a queue
26/// can accommodate without losing its oldest elements.
27const MAX_QUEUE_ELEMENTS: usize = 300;
28
29/// Type that represents the response of the network layer to the application layer's event handler.
30pub type AppResponseResult = Result<AppResponse, NetworkError>;
31
32/// Type that represents the data exchanged during RPC operations.
33pub type RpcData = ByteVector;
34
35/// Type that represents a vector of vector of bytes.
36pub type ByteVector = Vec<Vec<u8>>;
37
38/// Type that represents the id of a shard.
39pub type ShardId = String;
40
41/// Type that represents the result for network operations.
42pub type NetworkResult<T> = Result<T, NetworkError>;
43
44/// Type that represents a vector of string.
45pub type StringVector = Vec<String>;
46
47/// Type that represents a nonce.
48pub type Nonce = u64;
49
50/// Time to wait (in seconds) for the node (network layer) to boot.
51pub(super) const BOOT_WAIT_TIME: Seconds = 1;
52
53/// The buffer capacity of an mpsc stream.
54pub(super) const STREAM_BUFFER_CAPACITY: usize = 100;
55
56/// Data exchanged over a stream between the application and network layer.
57#[derive(Debug, Clone)]
58pub(super) enum StreamData {
59 /// Application data sent over the stream.
60 FromApplication(StreamId, AppData),
61 /// Network response data sent over the stream to the application layer.
62 ToApplication(StreamId, AppResponse),
63}
64
65/// Request sent from the application layer to the networking layer.
66#[derive(Debug, Clone)]
67pub enum AppData {
68 /// A simple echo message.
69 Echo(String),
70 /// Dail peer.
71 DailPeer(PeerId, MultiaddrString),
72 /// Store a value associated with a given key in the Kademlia DHT.
73 KademliaStoreRecord {
74 key: Vec<u8>,
75 value: Vec<u8>,
76 // expiration time for local records
77 expiration_time: Option<Instant>,
78 // store on explicit peers
79 explicit_peers: Option<Vec<PeerIdString>>,
80 },
81 /// Perform a lookup of a value associated with a given key in the Kademlia DHT.
82 KademliaLookupRecord { key: Vec<u8> },
83 /// Perform a lookup of peers that store a record.
84 KademliaGetProviders { key: Vec<u8> },
85 /// Stop providing a record on the network.
86 KademliaStopProviding { key: Vec<u8> },
87 /// Remove record from local store.
88 KademliaDeleteRecord { key: Vec<u8> },
89 /// Return important information about the local routing table.
90 KademliaGetRoutingTableInfo,
91 /// Fetch data(s) quickly from a peer over the network.
92 SendRpc { keys: RpcData, peer: PeerId },
93 /// Get network information about the node.
94 GetNetworkInfo,
95 /// Send message to gossip peers in a mesh network.
96 GossipsubBroadcastMessage {
97 /// Topic to send messages to
98 topic: String,
99 message: ByteVector,
100 },
101 /// Join a mesh network.
102 GossipsubJoinNetwork(String),
103 /// Get gossip information about node.
104 GossipsubGetInfo,
105 /// Leave a network we are a part of.
106 GossipsubExitNetwork(String),
107 /// Blacklist a peer explicitly.
108 GossipsubBlacklistPeer(PeerId),
109 /// Remove a peer from the blacklist.
110 GossipsubFilterBlacklist(PeerId),
111}
112
113/// Response to requests sent from the application to the network layer.
114#[derive(Debug, Clone, PartialEq)]
115pub enum AppResponse {
116 /// The value written to the network.
117 Echo(String),
118 /// The peer we dailed.
119 DailPeerSuccess(String),
120 /// Store record success.
121 KademliaStoreRecordSuccess,
122 /// DHT lookup result.
123 KademliaLookupSuccess(Vec<u8>),
124 /// Nodes storing a particular record in the DHT.
125 KademliaGetProviders {
126 key: Vec<u8>,
127 providers: Vec<PeerIdString>,
128 },
129 /// No providers found.
130 KademliaNoProvidersFound,
131 /// Routing table information.
132 KademliaGetRoutingTableInfo { protocol_id: String },
133 /// Result of RPC operation.
134 SendRpc(RpcData),
135 /// A network error occured while executing the request.
136 Error(NetworkError),
137 /// Important information about the node.
138 GetNetworkInfo {
139 peer_id: PeerId,
140 connected_peers: Vec<PeerId>,
141 external_addresses: Vec<MultiaddrString>,
142 },
143 /// Successfully broadcast to the network.
144 GossipsubBroadcastSuccess,
145 /// Successfully joined a mesh network.
146 GossipsubJoinSuccess,
147 /// Successfully exited a mesh network.
148 GossipsubExitSuccess,
149 /// Gossipsub information about node.
150 GossipsubGetInfo {
151 /// Topics that the node is currently subscribed to
152 topics: StringVector,
153 /// Peers we know about and their corresponding topics
154 mesh_peers: Vec<(PeerId, StringVector)>,
155 /// Peers we have blacklisted
156 blacklist: HashSet<PeerId>,
157 },
158 /// A peer was successfully blacklisted.
159 GossipsubBlacklistSuccess,
160}
161
162/// Network error type containing errors encountered during network operations.
163#[derive(Error, Debug, Clone, PartialEq)]
164pub enum NetworkError {
165 #[error("timeout occured waiting for data from network layer")]
166 NetworkReadTimeout,
167 #[error("internal request stream buffer is full")]
168 StreamBufferOverflow,
169 #[error("failed to store record in DHT")]
170 KadStoreRecordError(Vec<u8>),
171 #[error("failed to fetch data from peer")]
172 RpcDataFetchError,
173 #[error("failed to fetch record from the DHT")]
174 KadFetchRecordError(Vec<u8>),
175 #[error("task carrying app response panicked")]
176 InternalTaskError,
177 #[error("failed to dail peer")]
178 DailPeerError,
179 #[error("failed to broadcast message to peers in the topic")]
180 GossipsubBroadcastMessageError,
181 #[error("failed to join a mesh network")]
182 GossipsubJoinNetworkError,
183 #[error("failed to exit a mesh network")]
184 GossipsubExitNetworkError,
185 #[error("internal stream failed to transport data")]
186 InternalStreamError,
187 #[error("replica network not found")]
188 MissingReplNetwork,
189 #[error("network id for sharding has not been configured. See `CoreBuilder::with_shard()`")]
190 MissingShardingNetworkIdError,
191 #[error("threshold for data forwarding not met")]
192 DataForwardingError,
193 #[error("failed to shard data")]
194 ShardingFailureError,
195 #[error("failed to fetch sharded data")]
196 ShardingFetchError,
197 #[error("shard not found for input key")]
198 ShardNotFound,
199 #[error("no nodes found in logical shard")]
200 MissingShardNodesError,
201}
202
203/// A simple struct used to track requests sent from the application layer to the network layer.
204#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
205pub struct StreamId(u32);
206
207impl StreamId {
208 /// Generate a new random stream id.
209 /// Must only be called once.
210 pub fn new() -> Self {
211 StreamId(0)
212 }
213
214 /// Generate a new random stream id, using the current as reference.
215 pub fn next(current_id: StreamId) -> Self {
216 StreamId(current_id.0.wrapping_add(1))
217 }
218}
219
220/// Type that keeps track of the requests from the application layer.
221/// This type has a maximum buffer size and will drop subsequent requests when full.
222/// It is unlikely to be ever full as the default is usize::MAX except otherwise specified during
223/// configuration. It is always good practice to read responses from the internal stream buffer
224/// using `query_network()` or explicitly using `recv_from_network`.
225#[derive(Clone, Debug)]
226pub(super) struct StreamRequestBuffer {
227 /// Max requests we can keep track of.
228 size: usize,
229 buffer: HashSet<StreamId>,
230}
231
232impl StreamRequestBuffer {
233 /// Create a new request buffer.
234 pub fn new(buffer_size: usize) -> Self {
235 Self {
236 size: buffer_size,
237 buffer: HashSet::new(),
238 }
239 }
240
241 /// Push [`StreamId`]s into buffer.
242 /// Returns `false` if the buffer is full and request cannot be stored.
243 pub fn insert(&mut self, id: StreamId) -> bool {
244 if self.buffer.len() < self.size {
245 self.buffer.insert(id);
246 return true;
247 }
248 false
249 }
250}
251
252/// Type that keeps track of the response to the requests from the application layer.
253pub(super) struct StreamResponseBuffer {
254 /// Max responses we can keep track of.
255 size: usize,
256 buffer: HashMap<StreamId, AppResponseResult>,
257}
258
259impl StreamResponseBuffer {
260 /// Create a new request buffer.
261 pub fn new(buffer_size: usize) -> Self {
262 Self {
263 size: buffer_size,
264 buffer: HashMap::new(),
265 }
266 }
267
268 /// Push a [`StreamId`] into buffer.
269 /// Returns `false` if the buffer is full and request cannot be stored.
270 pub fn insert(&mut self, id: StreamId, response: AppResponseResult) -> bool {
271 if self.buffer.len() < self.size {
272 self.buffer.insert(id, response);
273 return true;
274 }
275 false
276 }
277
278 /// Remove a [`StreamId`] from the buffer.
279 pub fn remove(&mut self, id: &StreamId) -> Option<AppResponseResult> {
280 self.buffer.remove(&id)
281 }
282}
283
284/// Type representing the RPC data structure sent between nodes in the network.
285#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
286pub(super) enum Rpc {
287 /// Using request-response.
288 ReqResponse { data: RpcData },
289}
290
291/// The configuration for the RPC protocol.
292pub enum RpcConfig {
293 Default,
294 Custom {
295 /// Timeout for inbound and outbound requests.
296 timeout: Duration,
297 /// Maximum number of concurrent inbound + outbound streams.
298 max_concurrent_streams: usize,
299 },
300}
301
302/// Enum that represents the events generated in the network layer.
303#[derive(Debug, Clone, Eq, PartialEq, Hash)]
304pub enum NetworkEvent {
305 /// Event that informs the application that we have started listening on a new multiaddr.
306 ///
307 /// # Fields
308 ///
309 /// - `local_peer_id`: The `PeerId` of the local peer.
310 /// - `listener_id`: The ID of the listener.
311 /// - `address`: The new `Multiaddr` where the local peer is listening.
312 NewListenAddr {
313 local_peer_id: PeerId,
314 listener_id: ListenerId,
315 address: Multiaddr,
316 },
317 /// Event that informs the application that a new peer (with its location details) has just
318 /// been added to the routing table.
319 ///
320 /// # Fields
321 ///
322 /// - `peer_id`: The `PeerId` of the new peer added to the routing table.
323 RoutingTableUpdated { peer_id: PeerId },
324 /// Event that informs the application about a newly established connection to a peer.
325 ///
326 /// # Fields
327 ///
328 /// - `peer_id`: The `PeerId` of the connected peer.
329 /// - `connection_id`: The ID of the connection.
330 /// - `endpoint`: The `ConnectedPoint` information about the connection's endpoint.
331 /// - `num_established`: The number of established connections with this peer.
332 /// - `established_in`: The duration it took to establish the connection.
333 ConnectionEstablished {
334 peer_id: PeerId,
335 connection_id: ConnectionId,
336 endpoint: ConnectedPoint,
337 num_established: NonZeroU32,
338 established_in: Duration,
339 },
340 /// Event that informs the application about a closed connection to a peer.
341 ///
342 /// # Fields
343 ///
344 /// - `peer_id`: The `PeerId` of the peer.
345 /// - `connection_id`: The ID of the connection.
346 /// - `endpoint`: The `ConnectedPoint` information about the connection's endpoint.
347 /// - `num_established`: The number of remaining established connections with this peer.
348 ConnectionClosed {
349 peer_id: PeerId,
350 connection_id: ConnectionId,
351 endpoint: ConnectedPoint,
352 num_established: u32,
353 },
354 /// Event that announces an expired listen address.
355 ///
356 /// # Fields
357 ///
358 /// - `listener_id`: The ID of the listener.
359 /// - `address`: The expired `Multiaddr`.
360 ExpiredListenAddr {
361 listener_id: ListenerId,
362 address: Multiaddr,
363 },
364 /// Event that announces a closed listener.
365 ///
366 /// # Fields
367 ///
368 /// - `listener_id`: The ID of the listener.
369 /// - `addresses`: The list of `Multiaddr` where the listener was listening.
370 ListenerClosed {
371 listener_id: ListenerId,
372 addresses: Vec<Multiaddr>,
373 },
374 /// Event that announces a listener error.
375 ///
376 /// # Fields
377 ///
378 /// - `listener_id`: The ID of the listener that encountered the error.
379 ListenerError { listener_id: ListenerId },
380 /// Event that announces a dialing attempt.
381 ///
382 /// # Fields
383 ///
384 /// - `peer_id`: The `PeerId` of the peer being dialed, if known.
385 /// - `connection_id`: The ID of the connection attempt.
386 Dialing {
387 peer_id: Option<PeerId>,
388 connection_id: ConnectionId,
389 },
390 /// Event that announces a new external address candidate.
391 ///
392 /// # Fields
393 ///
394 /// - `address`: The new external address candidate.
395 NewExternalAddrCandidate { address: Multiaddr },
396 /// Event that announces a confirmed external address.
397 ///
398 /// # Fields
399 ///
400 /// - `address`: The confirmed external address.
401 ExternalAddrConfirmed { address: Multiaddr },
402 /// Event that announces an expired external address.
403 ///
404 /// # Fields
405 ///
406 /// - `address`: The expired external address.
407 ExternalAddrExpired { address: Multiaddr },
408 /// Event that announces a new connection arriving on a listener and in the process of
409 /// protocol negotiation.
410 ///
411 /// # Fields
412 ///
413 /// - `connection_id`: The ID of the incoming connection.
414 /// - `local_addr`: The local `Multiaddr` where the connection is received.
415 /// - `send_back_addr`: The remote `Multiaddr` of the peer initiating the connection.
416 IncomingConnection {
417 connection_id: ConnectionId,
418 local_addr: Multiaddr,
419 send_back_addr: Multiaddr,
420 },
421 /// Event that announces an error happening on an inbound connection during its initial
422 /// handshake.
423 ///
424 /// # Fields
425 ///
426 /// - `connection_id`: The ID of the incoming connection.
427 /// - `local_addr`: The local `Multiaddr` where the connection was received.
428 /// - `send_back_addr`: The remote `Multiaddr` of the peer initiating the connection.
429 IncomingConnectionError {
430 connection_id: ConnectionId,
431 local_addr: Multiaddr,
432 send_back_addr: Multiaddr,
433 },
434 /// Event that announces an error happening on an outbound connection during its initial
435 /// handshake.
436 ///
437 /// # Fields
438 ///
439 /// - `connection_id`: The ID of the outbound connection.
440 /// - `peer_id`: The `PeerId` of the peer being connected to, if known.
441 OutgoingConnectionError {
442 connection_id: ConnectionId,
443 peer_id: Option<PeerId>,
444 },
445 /// Event that announces the arrival of a pong message from a peer.
446 ///
447 /// # Fields
448 ///
449 /// - `peer_id`: The `PeerId` of the peer that sent the pong message.
450 /// - `duration`: The duration it took for the round trip.
451 OutboundPingSuccess { peer_id: PeerId, duration: Duration },
452 /// Event that announces a `Ping` error.
453 ///
454 /// # Fields
455 ///
456 /// - `peer_id`: The `PeerId` of the peer that encountered the ping error.
457 OutboundPingError { peer_id: PeerId },
458 /// Event that announces the arrival of a `PeerInfo` via the `Identify` protocol.
459 ///
460 /// # Fields
461 ///
462 /// - `peer_id`: The `PeerId` of the peer that sent the identify info.
463 /// - `info`: The `IdentifyInfo` received from the peer.
464 IdentifyInfoReceived { peer_id: PeerId, info: IdentifyInfo },
465 /// Event that announces the successful write of a record to the DHT.
466 ///
467 /// # Fields
468 ///
469 /// - `key`: The key of the record that was successfully written.
470 KademliaPutRecordSuccess { key: Vec<u8> },
471 /// Event that announces the failure of a node to save a record.
472 KademliaPutRecordError,
473 /// Event that announces a node as a provider of a record in the DHT.
474 ///
475 /// # Fields
476 ///
477 /// - `key`: The key of the record being provided.
478 KademliaStartProvidingSuccess { key: Vec<u8> },
479 /// Event that announces the failure of a node to become a provider of a record in the DHT.
480 KademliaStartProvidingError,
481 /// Event that announces the arrival of an RPC message.
482 ///
483 /// # Fields
484 ///
485 /// - `data`: The `RpcData` of the received message.
486 RpcIncomingMessageHandled { data: RpcData },
487 /// Event that announces that a peer has just left a network.
488 ///
489 /// # Fields
490 ///
491 /// - `peer_id`: The `PeerId` of the peer that left.
492 /// - `topic`: The topic the peer unsubscribed from.
493 GossipsubUnsubscribeMessageReceived { peer_id: PeerId, topic: String },
494 /// Event that announces that a peer has just joined a network.
495 ///
496 /// # Fields
497 ///
498 /// - `peer_id`: The `PeerId` of the peer that joined.
499 /// - `topic`: The topic the peer subscribed to.
500 GossipsubSubscribeMessageReceived { peer_id: PeerId, topic: String },
501 /// Event that announces the arrival of a replicated data content
502 ///
503 /// # Fields
504 ///
505 /// - `data`: The data contained in the gossip message.
506 /// - `outgoing_timestamp`: The time the message left the source
507 /// - `outgoing_timestamp`: The time the message was recieved
508 /// - `message_id`: The unique id of the message
509 /// - `source`: The `PeerId` of the source peer.
510 ReplicaDataIncoming {
511 /// Data
512 data: StringVector,
513 /// The replica network that owns the data
514 network: String,
515 /// Timestamp at which the message left the sending node
516 outgoing_timestamp: Seconds,
517 /// Timestamp at which the message arrived
518 incoming_timestamp: Seconds,
519 /// Message ID to prevent deduplication. It is usually a hash of the incoming message
520 message_id: String,
521 /// Sender PeerId
522 source: PeerId,
523 },
524 /// Event that announces the arrival of a forwarded sharded data
525 ///
526 /// # Fields
527 ///
528 /// - `data`: The data contained in the gossip message.
529 IncomingForwardedData {
530 /// Data
531 data: StringVector,
532 /// Sender's PeerId
533 source: PeerId,
534 },
535 /// Event that announces the arrival of a gossip message.
536 ///
537 /// # Fields
538 ///
539 /// - `source`: The `PeerId` of the source peer.
540 /// - `data`: The data contained in the gossip message.
541 GossipsubIncomingMessageHandled { source: PeerId, data: StringVector },
542 // /// Event that announces the beginning of the filtering and authentication of the incoming
543 // /// gossip message.
544 // ///
545 // /// # Fields
546 // ///
547 // /// - `propagation_source`: The `PeerId` of the peer from whom the message was received.
548 // /// - `message_id`: The ID of the incoming message.
549 // /// - `source`: The `PeerId` of the original sender, if known.
550 // /// - `topic`: The topic of the message.
551 // /// - `data`: The data contained in the message.
552 // GossipsubIncomingMessageFiltered {
553 // propagation_source: PeerId,
554 // message_id: MessageId,
555 // source: Option<PeerId>,
556 // topic: String,
557 // data: StringVector,
558 // },
559}
560
561/// The struct that contains incoming information about a peer returned by the `Identify` protocol.
562#[derive(Debug, Clone, Eq, PartialEq, Hash)]
563pub struct IdentifyInfo {
564 /// The public key of the remote peer.
565 pub public_key: PublicKey,
566 /// The address the remote peer is listening on.
567 pub listen_addrs: Vec<Multiaddr>,
568 /// The protocols supported by the remote peer.
569 pub protocols: Vec<StreamProtocol>,
570 /// The address we are listened on, observed by the remote peer.
571 pub observed_addr: Multiaddr,
572}
573
574/// Important information to obtain from the [`CoreBuilder`], to properly handle network
575/// operations.
576#[derive(Clone)]
577pub(super) struct NetworkInfo {
578 /// The name/id of the network.
579 pub id: StreamProtocol,
580 /// Important information to manage `Ping` operations.
581 pub ping: PingInfo,
582 /// Important information to manage `Gossipsub` operations.
583 pub gossipsub: gossipsub_cfg::GossipsubInfo,
584 /// The function that handles incoming RPC data request and produces a response.
585 pub rpc_handler_fn: fn(RpcData) -> RpcData,
586 /// The function to filter incoming gossip messages.
587 pub gossip_filter_fn: fn(PeerId, MessageId, Option<PeerId>, String, StringVector) -> bool,
588 /// Important information to manage `Replication` operations.
589 pub replication: replication::ReplInfo,
590 /// Important information to manage `sharding` operations.
591 pub sharding: sharding::ShardingInfo,
592}
593
594/// Module that contains important data structures to manage `Ping` operations on the network.
595pub mod ping_config {
596 use libp2p_identity::PeerId;
597 use std::{collections::HashMap, time::Duration};
598
599 /// Policies to handle a `Ping` error.
600 /// All connections to peers are closed during a disconnect operation.
601 #[derive(Debug, Clone)]
602 pub enum PingErrorPolicy {
603 /// Do not disconnect under any circumstances.
604 NoDisconnect,
605 /// Disconnect after a number of outbound errors.
606 DisconnectAfterMaxErrors(u16),
607 /// Disconnect after a certain number of concurrent timeouts.
608 DisconnectAfterMaxTimeouts(u16),
609 }
610
611 /// Struct that stores critical information for the execution of the [`PingErrorPolicy`].
612 #[derive(Debug, Clone)]
613 pub struct PingManager {
614 /// The number of timeout errors encountered from a peer.
615 pub timeouts: HashMap<PeerId, u16>,
616 /// The number of outbound errors encountered from a peer.
617 pub outbound_errors: HashMap<PeerId, u16>,
618 }
619
620 /// The configuration for the `Ping` protocol.
621 #[derive(Debug, Clone)]
622 pub struct PingConfig {
623 /// The interval between successive pings.
624 /// Default is 15 seconds.
625 pub interval: Duration,
626 /// The duration before which the request is considered failure.
627 /// Default is 20 seconds.
628 pub timeout: Duration,
629 /// Error policy.
630 pub err_policy: PingErrorPolicy,
631 }
632
633 /// Critical information to manage `Ping` operations.
634 #[derive(Debug, Clone)]
635 pub struct PingInfo {
636 pub policy: PingErrorPolicy,
637 pub manager: PingManager,
638 }
639}
640
641/// Module containing important state relating to the [`Gossipsub`](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/README.md) protocol.
642pub mod gossipsub_cfg {
643 use super::*;
644
645 /// The struct containing the list of blacklisted peers.
646 #[derive(Clone, Debug, Default)]
647 pub struct Blacklist {
648 // Blacklist
649 pub list: HashSet<PeerId>,
650 }
651
652 /// `Gossipsub` configuration.
653 pub enum GossipsubConfig {
654 /// A default configuration.
655 Default,
656 /// A custom configuration.
657 ///
658 /// # Fields
659 ///
660 /// - `config`: The custom configuration for gossipsub.
661 /// - `auth`: The signature authenticity check.
662 Custom {
663 config: gossipsub::Config,
664 auth: gossipsub::MessageAuthenticity,
665 },
666 }
667
668 impl Blacklist {
669 /// Return the inner list we're keeping track of.
670 pub fn into_inner(&self) -> HashSet<PeerId> {
671 self.list.clone()
672 }
673 }
674
675 /// Important information to manage `Gossipsub` operations.
676 #[derive(Clone)]
677 pub struct GossipsubInfo {
678 pub blacklist: Blacklist,
679 }
680}
681
682/// Queue that stores and removes data in a FIFO manner.
683#[derive(Clone)]
684pub(super) struct DataQueue<T: Debug + Clone + Eq + PartialEq + Hash> {
685 buffer: Arc<Mutex<VecDeque<T>>>,
686}
687
688impl<T> DataQueue<T>
689where
690 T: Debug + Clone + Eq + PartialEq + Hash,
691{
692 /// The initial buffer capacity, to optimize for speed and defer allocation
693 const INITIAL_BUFFER_CAPACITY: usize = 300;
694
695 /// Create new queue.
696 pub fn new() -> Self {
697 Self {
698 buffer: Arc::new(Mutex::new(VecDeque::with_capacity(
699 DataQueue::<T>::INITIAL_BUFFER_CAPACITY,
700 ))),
701 }
702 }
703
704 /// Remove an item from the top of the queue.
705 pub async fn pop(&self) -> Option<T> {
706 self.buffer.lock().await.pop_front()
707 }
708
709 /// Append an item to the queue.
710 pub async fn push(&self, item: T) {
711 let mut buffer = self.buffer.lock().await;
712 if buffer.len() >= MAX_QUEUE_ELEMENTS {
713 buffer.pop_front();
714 }
715 buffer.push_back(item);
716 }
717
718 /// Return the inner data structure of the queue.
719 pub async fn into_inner(&self) -> VecDeque<T> {
720 self.buffer.lock().await.clone()
721 }
722
723 /// Drain the contents of the queue.
724 pub async fn drain(&mut self) {
725 self.buffer.lock().await.drain(..);
726 }
727}