Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions quiche/src/stream/recv_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,18 @@ impl RecvBuf {
}

// No need to store empty buffer that doesn't carry the fin flag.
//
// However, we must still advance the max received offset. A
Comment thread
gregor-cf marked this conversation as resolved.
// zero-length non-FIN STREAM frame at offset N implies the peer has
// data at that offset.
//
// NOTE: connection level flow control accounting also expects this.
Comment thread
gregor-cf marked this conversation as resolved.
if !buf.fin() && buf.is_empty() {
self.len = cmp::max(self.len, buf.max_off());
Comment thread
gregor-cf marked this conversation as resolved.
if self.drain {
// we are not storing any data, off == len
self.off = self.len;
}
return Ok(());
Comment thread
gregor-cf marked this conversation as resolved.
}

Expand Down Expand Up @@ -510,16 +521,31 @@ mod tests {

assert_emit_discard(&mut recv, emit, 32, 5, false, None);

// Don't store non-fin empty buffer.
// Empty non-FIN frame advances the high-water mark but stores no data.
let buf = RangeBuf::from(b"", 10, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 0);

// Check flow control for empty buffer.
// Check flow control for empty non-FIN buffer past the limit.
let buf = RangeBuf::from(b"", 16, false);
assert_eq!(recv.write(buf), Err(Error::FlowControl));
}

#[rstest]
fn empty_fin_stream_frame(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(15, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);
assert_eq!(recv.len, 0);

let buf = RangeBuf::from(b"hello", 0, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);
assert_eq!(recv.data.len(), 1);

assert_emit_discard(&mut recv, emit, 32, 5, false, None);

// Store fin empty buffer.
let buf = RangeBuf::from(b"", 5, true);
Expand Down Expand Up @@ -645,6 +671,52 @@ mod tests {
assert_emit_discard_done(&mut recv, emit);
}

/// Tests that an empty non-FIN frame written to a draining `RecvBuf`
/// advances `off` along with `len`, so that a subsequent `reset()` with
/// the same final size returns zero deltas (no double-counting).
#[rstest]
fn drain_empty_non_fin_frame_then_reset(#[values(true, false)] emit: bool) {
let mut recv =
RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW);

// Write and consume 5 bytes so that the stream has some history.
let buf = RangeBuf::from(b"hello", 0, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 0);

assert_emit_discard(&mut recv, emit, 32, 5, false, Some(b"hello"));
assert_eq!(recv.off, 5);

// Shut down — enters drain mode. All data was already consumed, so
// no buffered bytes need to be released.
assert_eq!(recv.shutdown(), Ok(0));
assert!(recv.is_draining());
assert_eq!(recv.len, 5);
assert_eq!(recv.off, 5);
assert_eq!(recv.data.len(), 0);

// Receive an empty non-FIN STREAM frame at offset 10. In drain mode
// the fix must advance `off` to match `len`.
let buf = RangeBuf::from(b"", 10, false);
assert!(recv.write(buf).is_ok());
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 10);
assert_eq!(recv.data.len(), 0);

// A RESET_STREAM with final_size == 10 should produce zero deltas
// because off already equals final_size — nothing to reclaim.
assert_eq!(
recv.reset(42, 10),
Ok(RecvBufResetReturn {
max_data_delta: 0,
consumed_flowcontrol: 0,
})
);
assert_eq!(recv.len, 10);
assert_eq!(recv.off, 10);
}

#[rstest]
fn split_read(#[values(true, false)] emit: bool) {
let mut recv =
Expand Down
126 changes: 126 additions & 0 deletions quiche/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1729,6 +1729,132 @@ fn flow_control_drain(
assert_eq!(pipe.server.flow_control.consumed(), 25);
}

#[rstest]
/// Tests that empty non-FIN STREAM frames advance connection-level flow
/// control *exactly once*, even when subsequent frames cover the same offset
/// range.
fn flow_control_empty_stream_frame(
#[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
) {
let mut config = test_utils::Pipe::default_config(cc_algorithm_name).unwrap();
// Large limits so flow control on individual streams / connection don't
// interfere with the specific accounting we want to observe.
config.set_initial_max_data(1000);
config.set_initial_max_stream_data_bidi_local(1000);
config.set_initial_max_stream_data_bidi_remote(1000);
config.set_initial_max_streams_bidi(3);
config.set_initial_max_streams_uni(3);
config.verify_peer(false);

let mut pipe = test_utils::Pipe::with_config(&mut config).unwrap();
assert_eq!(pipe.handshake(), Ok(()));

let send_frame_helper =
|pipe: &mut test_utils::Pipe, data: RangeBuf| -> Result<()> {
let mut buf = [0; 65536];
let frames = [frame::Frame::Stream { stream_id: 4, data }];
let written = test_utils::encode_pkt(
&mut pipe.client,
Type::Short,
&frames,
&mut buf,
)?;
assert_eq!(pipe.server_recv(&mut buf[..written]), Ok(written));
Ok(())
};

// 1. First real data: off=0, len=10.
send_frame_helper(&mut pipe, RangeBuf::from(&[1; 10], 0, false)).unwrap();
assert_eq!(pipe.server.rx_data, 10);

// 2. Empty non-FIN frame at off=15. High-water mark advances to 15 even
// though no bytes are stored.
send_frame_helper(&mut pipe, RangeBuf::from(&[0; 0], 15, false)).unwrap();
assert_eq!(pipe.server.rx_data, 15);

// 3. Real data at off=15, len=5 with FIN. The [15..20] range is new, so
// rx_data advances to 20. The [15..15] range already counted, so it must
// NOT be double-counted.
send_frame_helper(&mut pipe, RangeBuf::from(&[2; 5], 15, true)).unwrap();
assert_eq!(pipe.server.rx_data, 20);

// 4. Fill the gap: off=10, len=5. All bytes in [10..15] were already counted
// (via the empty frame at off=15), so rx_data stays at 20.
send_frame_helper(&mut pipe, RangeBuf::from(&[3; 5], 10, false)).unwrap();
assert_eq!(pipe.server.rx_data, 20);

// The stream is readable.
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);

// Read all 20 bytes and verify content and fin flag.
// Expected content (in order):
// off 0..10 -> [1; 10] (frame 1)
// off 10..15 -> [3; 5] (frame 4, gap fill)
// off 15..20 -> [2; 5] (frame 3)
let mut out = [0u8; 64];
let (n, fin) = pipe.server.stream_recv(4, &mut out).unwrap();
assert_eq!(n, 20);
assert!(fin);
assert_eq!(&out[..10], &[1u8; 10]);
assert_eq!(&out[10..15], &[3u8; 5]);
assert_eq!(&out[15..20], &[2u8; 5]);

// stream_finished() reflects the recv-side is_fin() state.
assert!(pipe.server.stream_finished(4));
}

#[rstest]
/// Tests that empty non-FIN STREAM frames on a draining stream are not counted
/// again when the same final size is later received in a RESET_STREAM frame.
fn flow_control_drain_empty_stream_frame_reset(
#[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str,
) {
let mut buf = [0; 65536];

let mut pipe = test_utils::Pipe::new(cc_algorithm_name).unwrap();
assert_eq!(pipe.handshake(), Ok(()));

assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5));
assert_eq!(pipe.advance(), Ok(()));

assert_eq!(pipe.server.rx_data, 5);
assert_eq!(pipe.server.flow_control.consumed(), 0);

assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 42), Ok(()));

assert_eq!(pipe.server.rx_data, 5);
assert_eq!(pipe.server.flow_control.consumed(), 5);

let frames = [frame::Frame::Stream {
stream_id: 4,
data: RangeBuf::from(&[], 10, false),
}];

let written =
test_utils::encode_pkt(&mut pipe.client, Type::Short, &frames, &mut buf)
.unwrap();
assert_eq!(pipe.server_recv(&mut buf[..written]), Ok(written));

assert_eq!(pipe.server.rx_data, 10);
assert_eq!(pipe.server.flow_control.consumed(), 10);

let frames = [frame::Frame::ResetStream {
stream_id: 4,
error_code: 42,
final_size: 10,
}];

let written =
test_utils::encode_pkt(&mut pipe.client, Type::Short, &frames, &mut buf)
.unwrap();
assert_eq!(pipe.server_recv(&mut buf[..written]), Ok(written));

assert_eq!(pipe.server.rx_data, 10);
assert_eq!(pipe.server.flow_control.consumed(), 10);
}

#[rstest]
/// Tests that flow control is properly updated when a stream receives a RESET
fn flow_control_reset_stream(
Expand Down
Loading