Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions crates/engineioxide/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ use smallvec::{SmallVec, smallvec};
use tokio::sync::{
Mutex,
mpsc::{self, Receiver, error::TrySendError},
watch,
};

pub use engineioxide_core::Sid;
Expand Down Expand Up @@ -212,6 +213,13 @@ where
/// Channel to send [PacketBuf] to the internal connection
internal_tx: mpsc::Sender<PacketBuf>,

/// Channel to send volatile [PacketBuf]s that bypass the internal buffer.
/// Uses a [`watch`](tokio::sync::watch) channel so only the latest volatile
/// message is retained; subsequent volatile sends overwrite the previous one.
volatile_tx: watch::Sender<Option<PacketBuf>>,
/// Receiver for the volatile channel, read by the transport with priority.
pub(crate) volatile_rx: watch::Receiver<Option<PacketBuf>>,

/// Internal channel to receive Pong [`Packets`](Packet) (v4 protocol) or Ping (v3 protocol) in the heartbeat job
/// which is running in a separate task
heartbeat_rx: Mutex<Receiver<()>>,
Expand Down Expand Up @@ -249,6 +257,7 @@ where
) -> Self {
let (internal_tx, internal_rx) = mpsc::channel(config.max_buffer_size);
let (heartbeat_tx, heartbeat_rx) = mpsc::channel(1);
let (volatile_tx, volatile_rx) = watch::channel(None);

Self {
id: Sid::new(),
Expand All @@ -259,6 +268,9 @@ where
internal_rx: Mutex::new(PeekableReceiver::new(internal_rx)),
internal_tx,

volatile_rx,
volatile_tx,

heartbeat_rx: Mutex::new(heartbeat_rx),
heartbeat_tx,
cancellation_token: CancellationToken::new(),
Expand Down Expand Up @@ -488,6 +500,55 @@ where
TrySendError::Closed(p) => TrySendError::Closed(p.into_binary()),
})
}

/// Try to send a volatile message bypassing the internal buffer channel.
/// Volatile messages may be dropped if the transport is not ready to
/// receive them.
///
/// Because volatile messages bypass the main mpsc buffer queue, they may
/// arrive out of order relative to regular messages.
///
/// Returns `true` if the message was queued for sending, `false` if it
/// was dropped (channel full or transport shutting down).
#[inline]
pub fn emit_volatile(&self, msg: impl Into<Str>) -> bool {
Comment thread
Totodore marked this conversation as resolved.
self.send_volatile(smallvec![Packet::Message(msg.into())])
}

/// Try to send a volatile binary message bypassing the internal buffer channel.
/// Volatile messages may be dropped if the transport is not ready to
/// receive them.
///
/// Returns `true` if the message was queued for sending, `false` if it
/// was dropped.
#[inline]
pub fn emit_binary_volatile<B: Into<Bytes>>(&self, data: B) -> bool {
if self.protocol == ProtocolVersion::V3 {
self.send_volatile(smallvec![Packet::BinaryV3(data.into())])
} else {
self.send_volatile(smallvec![Packet::Binary(data.into())])
}
}

/// Try to send a volatile message with multiple adjacent binary payloads.
/// The message and all binary payloads are sent atomically as a single
/// volatile write.
///
/// Returns `true` if the message was queued for sending, `false` if it
/// was dropped.
#[inline]
pub fn emit_many_volatile(&self, msg: Str, data: VecDeque<Bytes>) -> bool {
let mut packets = SmallVec::with_capacity(1 + data.len());
packets.push(Packet::Message(msg));
for bin in data {
packets.push(Packet::Binary(bin));
}
self.send_volatile(packets)
}

pub(crate) fn send_volatile(&self, packets: PacketBuf) -> bool {
self.volatile_tx.send(Some(packets)).is_ok()
}
}

impl<D: Default + Send + Sync + 'static> std::fmt::Debug for Socket<D> {
Expand Down Expand Up @@ -548,6 +609,7 @@ where
) -> (Arc<Socket<D>>, tokio::sync::mpsc::Receiver<Packet>) {
let (internal_tx, internal_rx) = mpsc::channel(buffer_size);
let (heartbeat_tx, heartbeat_rx) = mpsc::channel(1);
let (volatile_tx, volatile_rx) = watch::channel(None);

let sock = Self {
id: sid,
Expand All @@ -558,6 +620,9 @@ where
internal_rx: Mutex::new(PeekableReceiver::new(internal_rx)),
internal_tx,

volatile_rx,
volatile_tx,

heartbeat_rx: Mutex::new(heartbeat_rx),
heartbeat_tx,
cancellation_token: CancellationToken::new(),
Expand Down
15 changes: 12 additions & 3 deletions crates/engineioxide/src/transport/polling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,20 @@ where

let max_payload = engine.config.max_payload;

let mut volatile_rx = socket.volatile_rx.clone();

#[cfg(feature = "v3")]
let Payload { data, has_binary } =
payload::encoder(rx, protocol, socket.supports_binary, max_payload).await?;
let Payload { data, has_binary } = payload::encoder(
rx,
protocol,
socket.supports_binary,
max_payload,
&mut volatile_rx,
)
.await?;
#[cfg(not(feature = "v3"))]
let Payload { data, has_binary } = payload::encoder(rx, protocol, max_payload).await?;
let Payload { data, has_binary } =
payload::encoder(rx, protocol, max_payload, &mut volatile_rx).await?;

#[cfg(feature = "tracing")]
tracing::debug!("[sid={sid}] sending data: {:?}", data);
Expand Down
Loading