diff --git a/quiche/src/stream/recv_buf.rs b/quiche/src/stream/recv_buf.rs index 6ff2ab04986..c4171c3e1ae 100644 --- a/quiche/src/stream/recv_buf.rs +++ b/quiche/src/stream/recv_buf.rs @@ -122,7 +122,18 @@ impl RecvBuf { } // No need to store empty buffer that doesn't carry the fin flag. + // + // However, we must still advance the max received offset. A + // zero-length non-FIN STREAM frame at offset N implies the peer has + // data at that offset. + // + // NOTE: connection level flow control accounting also expects this. if !buf.fin() && buf.is_empty() { + self.len = cmp::max(self.len, buf.max_off()); + if self.drain { + // we are not storing any data, off == len + self.off = self.len; + } return Ok(()); } @@ -510,16 +521,31 @@ mod tests { assert_emit_discard(&mut recv, emit, 32, 5, false, None); - // Don't store non-fin empty buffer. + // Empty non-FIN frame advances the high-water mark but stores no data. let buf = RangeBuf::from(b"", 10, false); assert!(recv.write(buf).is_ok()); - assert_eq!(recv.len, 5); + assert_eq!(recv.len, 10); assert_eq!(recv.off, 5); assert_eq!(recv.data.len(), 0); - // Check flow control for empty buffer. + // Check flow control for empty non-FIN buffer past the limit. let buf = RangeBuf::from(b"", 16, false); assert_eq!(recv.write(buf), Err(Error::FlowControl)); + } + + #[rstest] + fn empty_fin_stream_frame(#[values(true, false)] emit: bool) { + let mut recv = + RecvBuf::new(15, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW); + assert_eq!(recv.len, 0); + + let buf = RangeBuf::from(b"hello", 0, false); + assert!(recv.write(buf).is_ok()); + assert_eq!(recv.len, 5); + assert_eq!(recv.off, 0); + assert_eq!(recv.data.len(), 1); + + assert_emit_discard(&mut recv, emit, 32, 5, false, None); // Store fin empty buffer. let buf = RangeBuf::from(b"", 5, true); @@ -645,6 +671,52 @@ mod tests { assert_emit_discard_done(&mut recv, emit); } + /// Tests that an empty non-FIN frame written to a draining `RecvBuf` + /// advances `off` along with `len`, so that a subsequent `reset()` with + /// the same final size returns zero deltas (no double-counting). + #[rstest] + fn drain_empty_non_fin_frame_then_reset(#[values(true, false)] emit: bool) { + let mut recv = + RecvBuf::new(u64::MAX, DEFAULT_STREAM_WINDOW, DEFAULT_STREAM_WINDOW); + + // Write and consume 5 bytes so that the stream has some history. + let buf = RangeBuf::from(b"hello", 0, false); + assert!(recv.write(buf).is_ok()); + assert_eq!(recv.len, 5); + assert_eq!(recv.off, 0); + + assert_emit_discard(&mut recv, emit, 32, 5, false, Some(b"hello")); + assert_eq!(recv.off, 5); + + // Shut down — enters drain mode. All data was already consumed, so + // no buffered bytes need to be released. + assert_eq!(recv.shutdown(), Ok(0)); + assert!(recv.is_draining()); + assert_eq!(recv.len, 5); + assert_eq!(recv.off, 5); + assert_eq!(recv.data.len(), 0); + + // Receive an empty non-FIN STREAM frame at offset 10. In drain mode + // the fix must advance `off` to match `len`. + let buf = RangeBuf::from(b"", 10, false); + assert!(recv.write(buf).is_ok()); + assert_eq!(recv.len, 10); + assert_eq!(recv.off, 10); + assert_eq!(recv.data.len(), 0); + + // A RESET_STREAM with final_size == 10 should produce zero deltas + // because off already equals final_size — nothing to reclaim. + assert_eq!( + recv.reset(42, 10), + Ok(RecvBufResetReturn { + max_data_delta: 0, + consumed_flowcontrol: 0, + }) + ); + assert_eq!(recv.len, 10); + assert_eq!(recv.off, 10); + } + #[rstest] fn split_read(#[values(true, false)] emit: bool) { let mut recv = diff --git a/quiche/src/tests.rs b/quiche/src/tests.rs index f2546c95b9a..f4474a7638e 100644 --- a/quiche/src/tests.rs +++ b/quiche/src/tests.rs @@ -1729,6 +1729,132 @@ fn flow_control_drain( assert_eq!(pipe.server.flow_control.consumed(), 25); } +#[rstest] +/// Tests that empty non-FIN STREAM frames advance connection-level flow +/// control *exactly once*, even when subsequent frames cover the same offset +/// range. +fn flow_control_empty_stream_frame( + #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str, +) { + let mut config = test_utils::Pipe::default_config(cc_algorithm_name).unwrap(); + // Large limits so flow control on individual streams / connection don't + // interfere with the specific accounting we want to observe. + config.set_initial_max_data(1000); + config.set_initial_max_stream_data_bidi_local(1000); + config.set_initial_max_stream_data_bidi_remote(1000); + config.set_initial_max_streams_bidi(3); + config.set_initial_max_streams_uni(3); + config.verify_peer(false); + + let mut pipe = test_utils::Pipe::with_config(&mut config).unwrap(); + assert_eq!(pipe.handshake(), Ok(())); + + let send_frame_helper = + |pipe: &mut test_utils::Pipe, data: RangeBuf| -> Result<()> { + let mut buf = [0; 65536]; + let frames = [frame::Frame::Stream { stream_id: 4, data }]; + let written = test_utils::encode_pkt( + &mut pipe.client, + Type::Short, + &frames, + &mut buf, + )?; + assert_eq!(pipe.server_recv(&mut buf[..written]), Ok(written)); + Ok(()) + }; + + // 1. First real data: off=0, len=10. + send_frame_helper(&mut pipe, RangeBuf::from(&[1; 10], 0, false)).unwrap(); + assert_eq!(pipe.server.rx_data, 10); + + // 2. Empty non-FIN frame at off=15. High-water mark advances to 15 even + // though no bytes are stored. + send_frame_helper(&mut pipe, RangeBuf::from(&[0; 0], 15, false)).unwrap(); + assert_eq!(pipe.server.rx_data, 15); + + // 3. Real data at off=15, len=5 with FIN. The [15..20] range is new, so + // rx_data advances to 20. The [15..15] range already counted, so it must + // NOT be double-counted. + send_frame_helper(&mut pipe, RangeBuf::from(&[2; 5], 15, true)).unwrap(); + assert_eq!(pipe.server.rx_data, 20); + + // 4. Fill the gap: off=10, len=5. All bytes in [10..15] were already counted + // (via the empty frame at off=15), so rx_data stays at 20. + send_frame_helper(&mut pipe, RangeBuf::from(&[3; 5], 10, false)).unwrap(); + assert_eq!(pipe.server.rx_data, 20); + + // The stream is readable. + let mut r = pipe.server.readable(); + assert_eq!(r.next(), Some(4)); + assert_eq!(r.next(), None); + + // Read all 20 bytes and verify content and fin flag. + // Expected content (in order): + // off 0..10 -> [1; 10] (frame 1) + // off 10..15 -> [3; 5] (frame 4, gap fill) + // off 15..20 -> [2; 5] (frame 3) + let mut out = [0u8; 64]; + let (n, fin) = pipe.server.stream_recv(4, &mut out).unwrap(); + assert_eq!(n, 20); + assert!(fin); + assert_eq!(&out[..10], &[1u8; 10]); + assert_eq!(&out[10..15], &[3u8; 5]); + assert_eq!(&out[15..20], &[2u8; 5]); + + // stream_finished() reflects the recv-side is_fin() state. + assert!(pipe.server.stream_finished(4)); +} + +#[rstest] +/// Tests that empty non-FIN STREAM frames on a draining stream are not counted +/// again when the same final size is later received in a RESET_STREAM frame. +fn flow_control_drain_empty_stream_frame_reset( + #[values("cubic", "bbr2_gcongestion")] cc_algorithm_name: &str, +) { + let mut buf = [0; 65536]; + + let mut pipe = test_utils::Pipe::new(cc_algorithm_name).unwrap(); + assert_eq!(pipe.handshake(), Ok(())); + + assert_eq!(pipe.client.stream_send(4, b"aaaaa", false), Ok(5)); + assert_eq!(pipe.advance(), Ok(())); + + assert_eq!(pipe.server.rx_data, 5); + assert_eq!(pipe.server.flow_control.consumed(), 0); + + assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Read, 42), Ok(())); + + assert_eq!(pipe.server.rx_data, 5); + assert_eq!(pipe.server.flow_control.consumed(), 5); + + let frames = [frame::Frame::Stream { + stream_id: 4, + data: RangeBuf::from(&[], 10, false), + }]; + + let written = + test_utils::encode_pkt(&mut pipe.client, Type::Short, &frames, &mut buf) + .unwrap(); + assert_eq!(pipe.server_recv(&mut buf[..written]), Ok(written)); + + assert_eq!(pipe.server.rx_data, 10); + assert_eq!(pipe.server.flow_control.consumed(), 10); + + let frames = [frame::Frame::ResetStream { + stream_id: 4, + error_code: 42, + final_size: 10, + }]; + + let written = + test_utils::encode_pkt(&mut pipe.client, Type::Short, &frames, &mut buf) + .unwrap(); + assert_eq!(pipe.server_recv(&mut buf[..written]), Ok(written)); + + assert_eq!(pipe.server.rx_data, 10); + assert_eq!(pipe.server.flow_control.consumed(), 10); +} + #[rstest] /// Tests that flow control is properly updated when a stream receives a RESET fn flow_control_reset_stream(