Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pingora-core/src/protocols/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,18 @@ impl Session {
}
}

/// Wait for the client to abort this stream without reading any body data.
///
/// For HTTP/2 this resolves when the client resets the stream (RST_STREAM) or the
/// stream errors. Other protocols have no out-of-band abort signal (detecting a
/// close would require consuming reads), so this future is pending forever for them.
pub async fn watch_h2_stream_reset(&mut self) -> Result<h2::Reason> {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer us return the future here Option<Idle<'_>>, we then avoid the future and select for downstream HTTP/1.1 clients in proxy_h2. We also avoid the case where a caller mistakenly invokes this for something other than H2 outside of select and stays pending.

@molocule molocule Jun 23, 2026

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point, I just fixed this

match self {
Self::H2(s) => s.idle().await,
Self::H1(_) | Self::Subrequest(_) | Self::Custom(_) => std::future::pending().await,
}
}

pub fn as_http1(&self) -> Option<&SessionV1> {
match self {
Self::H1(s) => Some(s),
Expand Down
72 changes: 59 additions & 13 deletions pingora-proxy/src/proxy_h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,13 @@ where
Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, None),
Err(e) => {
// On application level upstream read timeouts, send RST_STREAM CANCEL,
// we know we have not received END_STREAM at this point since we read timed out
// we know we have not received END_STREAM at this point since we read timed out.
// Also cancel the upstream stream when downstream goes away/resets so the
// upstream peer can release the stream promptly.
// TODO: implement for write timeouts?
if e.esource == ErrorSource::Upstream && matches!(e.etype, ReadTimedout) {
if (e.esource == ErrorSource::Upstream && matches!(e.etype, ReadTimedout))
|| e.esource == ErrorSource::Downstream
{
client_body.send_reset(h2::Reason::CANCEL);
// Mark the underlying H2 connection for shutdown so it's not used
// for new streams in case it is hung.
Expand Down Expand Up @@ -470,6 +474,28 @@ where
Ok(request_done) => {
downstream_state.maybe_finished(request_done);
},
Err(e) if e.esource == ErrorSource::Downstream => {
// Downstream reset/errored while the upstream write was blocked
// (e.g. on upstream flow control). Same policy as the read error
// handling above: ignore the downstream error if the upstream
// response is being admitted to cache, otherwise fail so the
// downstream stream handles are dropped promptly.
let wait_for_cache_fill = (!serve_from_cache.is_on() && support_cache_partial_read)
|| serve_from_cache.is_miss();
if !wait_for_cache_fill {
return Err(e);
}
// ignore downstream error so that upstream can continue to write cache
downstream_state.to_errored();
warn!(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the same pattern of checking suppress_proxy_warn_log before emitting a warn here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

"Downstream Error ignored during caching: {}, {}",
e,
self.inner.request_summary(session, ctx)
);
// This will not be treated as a final error, but we should signal to
// downstream session anyway.
session.downstream_session.on_proxy_failure(e);
},
Err(e) => {
// mark request done, attempt to drain receive
warn!("Upstream h2 body send error: {e}");
Expand Down Expand Up @@ -897,17 +923,37 @@ where
return Ok(false);
}

if let Some(data) = data {
debug!("Write {} bytes body to h2 upstream", data.len());
write_body(client_body, data, end_of_body, write_timeout)
.await
.map_err(|e| e.into_up())?;
} else {
debug!("Read downstream body done");
/* send a standalone END_STREAM flag */
write_body(client_body, Bytes::new(), true, write_timeout)
.await
.map_err(|e| e.into_up())?;
let (data, end) = match data {
Some(data) => {
debug!("Write {} bytes body to h2 upstream", data.len());
(data, end_of_body)
}
None => {
debug!("Read downstream body done");
/* send a standalone END_STREAM flag */
(Bytes::new(), true)
}
};

/* Race the upstream write against a downstream stream reset. A write blocked
* on upstream flow control would otherwise keep the downstream stream handles
* referenced while a downstream RST_STREAM goes unobserved, pinning the
* downstream connection window credit until the write completes. */
tokio::select! {
biased;
res = write_body(client_body, data, end, write_timeout) => {
res.map_err(|e| e.into_up())?;
}
reset = session.downstream_session.watch_h2_stream_reset() => {
return match reset {
Ok(reason) => Error::e_explain(
H2Error,
format!("downstream reset stream (reason: {reason}) while writing body to upstream"),
),
Err(e) => Err(e),
}
.map_err(|e| e.into_down());
}
}

Ok(end_of_body)
Expand Down
274 changes: 271 additions & 3 deletions pingora-proxy/tests/test_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use hyperlocal::{UnixClientExt, Uri};
use reqwest::{header, StatusCode};
#[cfg(feature = "patched_http1")]
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[cfg(feature = "patched_http1")]
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::{TcpListener, TcpStream};

use utils::server_utils::{
init, reset_suppress_proxy_warn_log_calls, suppress_proxy_warn_log_calls,
Expand Down Expand Up @@ -1132,3 +1130,273 @@ async fn test_103_die() {
let res = reqwest::get("http://127.0.0.1:6147/103-die").await.unwrap();
assert_eq!(res.status(), StatusCode::BAD_GATEWAY);
}

// A downstream RST_STREAM must be observed even while the proxy is blocked writing the
// request body to the upstream (parked on h2 flow control). Otherwise the stream is held
// open as a zombie: its handles stay referenced and the downstream connection-window
// credit is never released. On catching the RST the proxy should also cancel the
// upstream stream promptly.
#[tokio::test]
async fn test_h2_downstream_rst_while_upstream_write_blocked() {
use std::future::poll_fn;
use std::time::Duration;

init();

// An h2c upstream that accepts one request but never reads its body and never
// sends window updates, so the proxy's upstream write blocks on flow control.
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let upstream_port = listener.local_addr().unwrap().port();
let (reset_tx, reset_rx) = tokio::sync::oneshot::channel::<h2::Reason>();

tokio::spawn(async move {
let (io, _) = listener.accept().await.unwrap();
let mut conn = h2::server::Builder::new()
// tiny stream window so the proxy's write parks quickly
.initial_window_size(1024)
.handshake::<_, Bytes>(io)
.await
.unwrap();
let (req, mut send_response) = conn.accept().await.unwrap().unwrap();
// hold the body reader without reading it: no window updates are granted
let _body = req.into_body();

// keep driving the connection in the background
tokio::spawn(async move {
while let Some(res) = conn.accept().await {
if res.is_err() {
break;
}
}
});

// wait for the proxy to reset our stream
let reason = poll_fn(|cx| send_response.poll_reset(cx)).await.unwrap();
let _ = reset_tx.send(reason);
});

// h2c downstream client to the proxy
let tcp = TcpStream::connect("127.0.0.1:6146").await.unwrap();
let (mut client, conn) = client::handshake(tcp).await.unwrap();
tokio::spawn(async move {
// ignore errors: the proxy may tear the connection down after the RST
let _ = conn.await;
});

let req = Request::builder()
.method("POST")
.uri("http://127.0.0.1:6146/")
.header("x-h2", "true")
.header("x-port", upstream_port.to_string())
.body(())
.unwrap();

let (_response, mut req_body) = client.send_request(req, false).unwrap();

// Push body until the proxy stops granting capacity, meaning it is no longer
// reading the downstream body because its upstream write is parked on flow control.
let mut sent = 0usize;
while sent < 512 * 1024 {
req_body.reserve_capacity(16 * 1024);
let granted = match tokio::time::timeout(
Duration::from_millis(500),
poll_fn(|cx| req_body.poll_capacity(cx)),
)
.await
{
Ok(Some(Ok(n))) => n,
Ok(other) => panic!("downstream send capacity error: {other:?}"),
// no new capacity for a while: the proxy is parked on the upstream write
Err(_) => break,
};
req_body
.send_data(Bytes::from(vec![0u8; granted]), false)
.unwrap();
sent += granted;
}
// we must have at least filled the upstream stream window for the write to park
assert!(sent >= 1024, "only sent {sent} bytes");

// reset the stream while the proxy is blocked writing upstream
req_body.send_reset(h2::Reason::CANCEL);

// the proxy should catch the RST promptly (not hang on the blocked write)
// and cancel the upstream stream
let reason = tokio::time::timeout(Duration::from_secs(5), reset_rx)
.await
.expect("proxy did not cancel the upstream stream after the downstream RST")
.expect("upstream watcher task died before observing a reset");
assert_eq!(reason, h2::Reason::CANCEL);
}

// Same as above, but with caching enabled and a cacheable upstream response mid-admission:
// the downstream RST caught during the blocked upstream write must be ignored (like
// downstream read errors are during caching) so the cache fill can complete.
#[tokio::test]
async fn test_h2_downstream_rst_during_cache_fill() {
use std::future::poll_fn;
use std::time::Duration;

init();

let uri = "http://127.0.0.1:6154/test_h2_downstream_rst_during_cache_fill";

// An h2c upstream that never reads the request body (so the proxy's upstream write
// parks on flow control) but streams a cacheable response: first chunk right away,
// final chunk + EOS only after the test signals that downstream sent its RST.
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let upstream_port = listener.local_addr().unwrap().port();
let (finish_tx, finish_rx) = tokio::sync::oneshot::channel::<()>();
let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>();

tokio::spawn(async move {
let (io, _) = listener.accept().await.unwrap();
let mut conn = h2::server::Builder::new()
// tiny stream window so the proxy's request body write parks quickly
.initial_window_size(1024)
.handshake::<_, Bytes>(io)
.await
.unwrap();
let (req, mut send_response) = conn.accept().await.unwrap().unwrap();
// hold the body reader without reading it: no window updates are granted
let body = req.into_body();

// keep driving the connection in the background
tokio::spawn(async move {
while let Some(res) = conn.accept().await {
if res.is_err() {
break;
}
}
});

let resp = http::Response::builder()
.status(200)
.header("cache-control", "max-age=30")
.body(())
.unwrap();
let mut resp_body = send_response.send_response(resp, false).unwrap();
resp_body
.send_data(Bytes::from_static(b"hello "), false)
.unwrap();

// hold the end of the response so the cache fill is still in progress
// when the downstream resets
finish_rx.await.unwrap();
resp_body
.send_data(Bytes::from_static(b"world!"), true)
.unwrap();

// Keep the stream handles alive until the test is done. Dropping them here
// would make h2 send an implicit RST_STREAM(NO_ERROR) (response complete
// without consuming the request body), which the proxy's upstream read can
// observe before the clean EOS and abort the cache admission.
let _ = done_rx.await;
drop(body);
drop(resp_body);
});

// h2c downstream client to the caching proxy service
let tcp = TcpStream::connect("127.0.0.1:6154").await.unwrap();
let (mut client, conn) = client::handshake(tcp).await.unwrap();
tokio::spawn(async move {
let _ = conn.await;
});

let req = Request::builder()
.method("POST")
.uri(uri)
.header("x-h2", "true")
.header("x-port", upstream_port.to_string())
.body(())
.unwrap();
let (response, mut req_body) = client.send_request(req, false).unwrap();

// a small first body chunk, fits the upstream window
req_body.reserve_capacity(16);
let granted = poll_fn(|cx| req_body.poll_capacity(cx))
.await
.unwrap()
.unwrap();
assert!(granted >= 16);
req_body
.send_data(Bytes::from_static(b"upload.........."), false)
.unwrap();

// wait for the response header and first chunk: the miss admission
// (cache fill) is now in progress
let (head, mut resp_body) = response.await.unwrap().into_parts();
assert_eq!(head.status, 200);
assert_eq!(head.headers.get("x-cache-status").unwrap(), "miss");
let chunk = resp_body.data().await.unwrap().unwrap();
assert_eq!(&chunk[..], b"hello ");
let _ = resp_body.flow_control().release_capacity(chunk.len());

// flood the request body until the proxy stops granting capacity, i.e. it is
// parked writing to the upstream whose window is exhausted
let mut sent = 0usize;
while sent < 512 * 1024 {
req_body.reserve_capacity(16 * 1024);
let granted = match tokio::time::timeout(
Duration::from_millis(500),
poll_fn(|cx| req_body.poll_capacity(cx)),
)
.await
{
Ok(Some(Ok(n))) => n,
Ok(other) => panic!("downstream send capacity error: {other:?}"),
Err(_) => break,
};
req_body
.send_data(Bytes::from(vec![0u8; granted]), false)
.unwrap();
sent += granted;
}
assert!(sent >= 1024, "only sent {sent} bytes");

// reset the stream while the proxy is blocked writing upstream;
// since a cache fill is in progress, the proxy should swallow this error
// and keep admitting the upstream response
req_body.send_reset(h2::Reason::CANCEL);

// give the proxy a moment to observe the RST, then let the upstream finish
// the response
tokio::time::sleep(Duration::from_millis(200)).await;
finish_tx.send(()).unwrap();

// let the fill complete
tokio::time::sleep(Duration::from_secs(1)).await;

// the object must now be fully in cache: a new request is a hit with the
// complete body and does not need the (single use) upstream
let tcp = TcpStream::connect("127.0.0.1:6154").await.unwrap();
let (mut client, conn) = client::handshake(tcp).await.unwrap();
tokio::spawn(async move {
let _ = conn.await;
});
let req = Request::builder()
.method("GET")
.uri(uri)
.header("x-port", upstream_port.to_string())
.body(())
.unwrap();
let (response, _) = client.send_request(req, true).unwrap();
let (head, mut resp_body) = tokio::time::timeout(Duration::from_secs(5), response)
.await
.expect("no response for the second request")
.unwrap()
.into_parts();
assert_eq!(head.status, 200);
assert_eq!(head.headers.get("x-cache-status").unwrap(), "hit");

let mut body = Vec::new();
while let Some(chunk) = resp_body.data().await {
let chunk = chunk.unwrap();
let _ = resp_body.flow_control().release_capacity(chunk.len());
body.extend_from_slice(&chunk);
}
assert_eq!(body, b"hello world!");

// release the upstream's stream handles
let _ = done_tx.send(());
}
Loading