From 16de79c8d448221da4a073081cba596310871015 Mon Sep 17 00:00:00 2001 From: totodore Date: Sun, 21 Jun 2026 19:00:24 +0200 Subject: [PATCH 1/2] refactor(engineio): improve websocket tx feed from internal chan --- crates/engineioxide/src/transport/ws.rs | 109 ++++++++++++++---------- 1 file changed, 64 insertions(+), 45 deletions(-) diff --git a/crates/engineioxide/src/transport/ws.rs b/crates/engineioxide/src/transport/ws.rs index 565a9940..19df61e4 100644 --- a/crates/engineioxide/src/transport/ws.rs +++ b/crates/engineioxide/src/transport/ws.rs @@ -4,7 +4,7 @@ //! Other functions are used internally to handle the websocket connection through tasks and channels //! and to handle upgrade from polling to ws -use std::sync::Arc; +use std::{ops::ControlFlow, sync::Arc}; use futures_util::{ SinkExt, StreamExt, TryStreamExt, @@ -15,7 +15,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_tungstenite::{ WebSocketStream, tungstenite::{ - Message, + Message, Utf8Bytes, handshake::derive_accept_key, protocol::{Role, WebSocketConfig}, }, @@ -250,58 +250,77 @@ async fn forward_to_socket( { let mut internal_rx = socket.internal_rx.try_lock().unwrap(); - // map a packet to a websocket message - // It is declared as a macro rather than a closure to avoid ownership issues - macro_rules! map_fn { - ($item:ident) => { - let res = match $item { - Packet::Binary(bin) | Packet::BinaryV3(bin) => { - if socket.protocol == ProtocolVersion::V3 { - // v3 protocol requires packet type as the first byte. - // This requires a new buffer. This is OK as it is only for the V3 protocol. - let mut buff = Vec::with_capacity(bin.len() + 1); - buff.push(0x04); - buff.extend(bin); - tx.feed(Message::Binary(buff.into())).await - } else { - tx.feed(Message::Binary(bin)).await - } - } - Packet::Close => { - tx.send(Message::Close(None)).await.ok(); - internal_rx.close(); + loop { + match internal_rx.recv().await { + Some(packets) => { + for packet in packets { + if feed_tx(&mut tx, packet, &socket).await == ControlFlow::Break(()) { break; - }, - // A Noop Packet maybe sent by the server to upgrade from a polling connection - // In the case that the packet was not poll in time it will remain in the buffer and therefore - // it should be discarded here - Packet::Noop => Ok(()), - _ => { - let packet: String = $item.try_into().unwrap(); - tx.feed(Message::Text(packet.into())).await } - }; - if let Err(_e) = res { - #[cfg(feature = "tracing")] - tracing::debug!("[sid={}] error sending packet: {}", socket.id, _e); } - }; - } - - while let Some(items) = internal_rx.recv().await { - for item in items { - map_fn!(item); - } - // For every available packet we continue to send until the channel is drained - while let Ok(items) = internal_rx.try_recv() { - for item in items { - map_fn!(item); + // For every available packet we continue to send until the channel is drained + while let Ok(packets) = internal_rx.try_recv() { + for packet in packets { + if feed_tx(&mut tx, packet, &socket).await == ControlFlow::Break(()) { + break; + } + } + } + tx.flush().await.ok(); } + None => break, } tx.flush().await.ok(); } } + +/// Helper that will feed the sink with the current packet. +/// +/// Return [`ControlFlow::Break`] if we return a [`Packet::close`] and +/// that we should stop everything. +async fn feed_tx( + tx: &mut SplitSink, Message>, + packet: Packet, + socket: &Socket, +) -> ControlFlow<()> +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + D: Default + Send + Sync + 'static, +{ + let res = match packet { + Packet::Binary(bin) | Packet::BinaryV3(bin) => { + if socket.protocol == ProtocolVersion::V3 { + // v3 protocol requires packet type as the first byte. + // This requires a new buffer. This is OK as it is only for the V3 protocol. + let mut buff = Vec::with_capacity(bin.len() + 1); + buff.push(0x04); + buff.extend(bin); + tx.feed(Message::Binary(buff.into())).await + } else { + tx.feed(Message::Binary(bin.clone())).await + } + } + Packet::Close => { + tx.send(Message::Close(None)).await.ok(); + return ControlFlow::Break(()); + } + // A Noop Packet maybe sent by the server to upgrade from a polling connection + // In the case that the packet was not poll in time it will remain in the buffer and therefore + // it should be discarded here + Packet::Noop => Ok(()), + _ => { + tx.feed(Message::Text(Utf8Bytes::from(String::from(packet)))) + .await + } + }; + if let Err(_e) = res { + #[cfg(feature = "tracing")] + tracing::debug!(sid = %socket.id, "failed to send packet to websocket: {}", _e); + } + + ControlFlow::Continue(()) +} /// Send a Engine.IO [`OpenPacket`] to initiate a websocket connection async fn init_handshake( sid: Sid, From f5159129e0935b64be262e0b7c6f5641e8142150 Mon Sep 17 00:00:00 2001 From: totodore Date: Sun, 21 Jun 2026 19:10:37 +0200 Subject: [PATCH 2/2] refactor(engineio): improve websocket tx feed from internal chan --- crates/engineioxide/src/transport/ws.rs | 57 ++++++++++++++++--------- 1 file changed, 37 insertions(+), 20 deletions(-) diff --git a/crates/engineioxide/src/transport/ws.rs b/crates/engineioxide/src/transport/ws.rs index 19df61e4..e57e27cb 100644 --- a/crates/engineioxide/src/transport/ws.rs +++ b/crates/engineioxide/src/transport/ws.rs @@ -33,6 +33,7 @@ use crate::{ packet::{OpenPacket, Packet}, service::ProtocolVersion, service::TransportType, + socket::PacketBuf, }; /// Create a response for websocket upgrade @@ -251,33 +252,49 @@ async fn forward_to_socket( let mut internal_rx = socket.internal_rx.try_lock().unwrap(); loop { - match internal_rx.recv().await { - Some(packets) => { - for packet in packets { - if feed_tx(&mut tx, packet, &socket).await == ControlFlow::Break(()) { - break; - } - } - // For every available packet we continue to send until the channel is drained - while let Ok(packets) = internal_rx.try_recv() { - for packet in packets { - if feed_tx(&mut tx, packet, &socket).await == ControlFlow::Break(()) { - break; - } - } - } - tx.flush().await.ok(); + let Some(packets) = internal_rx.recv().await else { + break; + }; + + let mut should_close = feed_all(&mut tx, packets, &socket).await.is_break(); + // For every available packet we continue to send until the channel is drained + while !should_close { + match internal_rx.try_recv() { + Ok(packets) => should_close = feed_all(&mut tx, packets, &socket).await.is_break(), + Err(_) => break, } - None => break, } - tx.flush().await.ok(); + + // A `Packet::Close` was sent: close the channel so that pending senders + // are notified and stop forwarding. + if should_close { + internal_rx.close(); + break; + } + } +} + +/// Feeds a batch of packets to the sink, stopping early and returning +/// [`ControlFlow::Break`] as soon as a [`Packet::Close`] is encountered. +async fn feed_all( + tx: &mut SplitSink, Message>, + packets: PacketBuf, + socket: &Socket, +) -> ControlFlow<()> +where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + D: Default + Send + Sync + 'static, +{ + for packet in packets { + feed_tx(tx, packet, socket).await?; } + ControlFlow::Continue(()) } /// Helper that will feed the sink with the current packet. /// -/// Return [`ControlFlow::Break`] if we return a [`Packet::close`] and +/// Return [`ControlFlow::Break`] if we return a [`Packet::Close`] and /// that we should stop everything. async fn feed_tx( tx: &mut SplitSink, Message>, @@ -298,7 +315,7 @@ where buff.extend(bin); tx.feed(Message::Binary(buff.into())).await } else { - tx.feed(Message::Binary(bin.clone())).await + tx.feed(Message::Binary(bin)).await } } Packet::Close => {