feat: implement support for volatile events with new VolatileSocket w…#744
feat: implement support for volatile events with new VolatileSocket w…#744LiamCarPer wants to merge 10 commits into
Conversation
|
Please check this comment regarding the implementation: you are missing a way to bypass the mpsc channel in engineio. Volatile packets will still be buffered, it is not what we want. I might be wrong but, before Claude-ing something please try to understand the issue details before implementing something. Feel free to ask more details here or on the issue. |
|
Thanks for the Review. You are right, my current approach still goes through the mpcs channel so volatile packets can buffer. Does that sound good? Want to make sure i am in the right track before spending more time on it |
|
Is this what you were looking for? |
|
Yes! One thing that we are missing with this solution is that volatile packets are out of order with classic packets. It might be ok though. We can simply document this specific case. |
|
Perfect, let me look into it |
|
I checked js docs and they dont specify ordering constraints for volatile vs regular event. |
…rapper and operator flags
…tile message loop in transport
90575ac to
8861111
Compare
…lling payload test
|
Hey man, sorry for being this late ran out of claude usage, jajjajaj just joking, was busy with work. I removed VolatileSocket, used ConfOperators with volatile flag, removed tokio macros feature, documented return values, moved volatile docs to external file. Test results, 79/79 unit tests, 106/109 doc tests, 0 clippy warnings, clean. You had a concern about polling encoder, there are no .await points between the volatile snapshot and the encoder call, so the concern doesnt apply to the specific code, Check and tell me is everything as you want it! |
… polling in encoders to capture packets arriving during encoding
There was a problem hiding this comment.
Hey, it is good, however we are missing integration tests for engineioxide and socketioxide and unit tests for the encoder functions in engineioxide.
We need to test every combination possible of volatile vs non-volatile packets to ensure payloads are still correct.
|
I went ahead and added a bunch of comprehensive test coverage. For the encoder unit tests in encoder.rs, I added 10 new tests checking the core logic for all three versions (v4, v3 string, and v3 binary). That covers volatile-only payloads, mixed normal and volatile ordering both ways, making sure the latest volatile data overwrites the old stuff, and testing what happens if volatile data drops in right while a buffer is draining. Then for engineioxide integration tests in tests/volatile.rs, I added 3 tests to check the transport layer. It confirms volatile messages make it all the way through the full polling transport, double-checks the ordering when mixing volatile and normal messages, and makes sure those overwrite semantics hold up at the engine level too. Lastly for socketioxide integration tests in tests/volatile.rs, i added 4 tests for the user-facing API and broadcasting. It ensures socket.volatile().emit() returns Ok(()), makes sure broadcasting with the volatile flag on doesn't panic, and checks that io.volatile() plays nice with the rest of our tools. I have left two edge cases uncovered, Parked encoder + volatile arrives and Max payload boundary with volatile. What do you want me to do with it? |
Totodore
left a comment
There was a problem hiding this comment.
Thanks for the tests, yes it would be nice to add parked encoder and max payload checks (especially because these are edge cases that may easily results in bugs).
…r tests for volatile packets
|
I handled the WS flush consolidation in crates/engineioxide/src/transport/ws.rs. I removed that double-flush pattern where it was flushing once after the main channel drain and then again after volatile. Now there is just a single unconditional tx.flush().await.ok() at the end of each loop iteration, so it processes both the main and volatile packets before doing a single flush. I also added 3 new parked encoder tests. These cover scenarios where volatile arrives while the encoder is blocked on recv_packet() and confirms that volatile isn't captured in that specific payload, only on the next poll. I explicitly tested this across v4, v3 string, and v3 binary encoders. I added 3 new max payload volatile tests. These make sure that if volatile pushes data.len() over the limit, it correctly leaves the normal packets in the channel for the next poll. Like the others, this is fully tested for v4, v3 string, and v3 binary. |
|
I cant make it work on the whiteboard example, no events is being emitted. Try on your side to emit the drawing event with volatile and check if you get something. Same if I force polling transport, it doesn't work. Once it works with the whiteboard example check also with remote adapters (redis/postgres to find the issue). Once you find the bug in the whiteboard example, please add regression tests. Tip: |
…g transport polling stalls
|
Found it, volatile packets arriving while the encoder is parked on recv_packet() were not captured. The encoder checks volatile in the initial check and in the drain loop, but after parking, volatile arriving during that .await is missed, it only gets captured on the next poll. I have added new regression tests "volatile_broadcast_arrives_via_polling_transport". Thank you for the tip, it was useful |
Motivation
Socket.io's Node.js implementation supports a
volatileflag on emits that drops events when the underlying connection is not ready. This is highly useful for high-frequency, non-critical data like game position updates or telemetry, as it prevents buffer buildup on unstable connections.This feature was requested in #602 and is documented in the [Socket.io volatile events spec](https://www.google.com/search?q=https://socket.io/docs/v4/emitting-events/%23volatile-events). Currently,
socketioxidehas no equivalent, forcing users to either accept buffer pressure or implement their own messaging layer.Solution
Added a
Volatilevariant toBroadcastFlags(0x04) that flows through the existing adapter broadcast pipeline. On the local adapter, whenvolatileis set, errors fromsend_manyare silently discarded rather than propagated, perfectly matching fire-and-forget semantics.User-Facing API
Three entry points are provided to mirror the Node.js patterns:
Socket::volatile()Returns aVolatileSocket<'a, A>wrapper with a directemit()that silently drops events when the socket is disconnected, the internal buffer is full, or encoding fails. The wrapper also exposes chain methods (to,within,except,local,broadcast,timeout) that delegate toBroadcastOperatorswith the volatile flag pre-set.BroadcastOperators::volatile()andConfOperators::volatile()**Sets the flag on the operator chain, enabling room-based broadcasting patterns like:SocketIo::volatile()A convenience alias on the default namespace for global emits:Adapter Propagation
The flag propagates directly through the
BroadcastOptionsstruct. This ensures remote adapters (Redis, Postgres, MongoDB) receive it and can handle volatile semantics on their own nodes out of the box, without requiring any adapter-specific changes.