diff --git a/pingora-core/src/protocols/http/server.rs b/pingora-core/src/protocols/http/server.rs index 7dd96e51..a4519494 100644 --- a/pingora-core/src/protocols/http/server.rs +++ b/pingora-core/src/protocols/http/server.rs @@ -18,7 +18,7 @@ use super::custom::server::Session as SessionCustom; use super::error_resp; use super::subrequest::server::HttpSession as SessionSubrequest; use super::v1::server::HttpSession as SessionV1; -use super::v2::server::HttpSession as SessionV2; +use super::v2::server::{HttpSession as SessionV2, Idle}; use super::HttpTask; use crate::custom_session; use crate::protocols::{Digest, SocketAddr, Stream}; @@ -682,6 +682,19 @@ impl Session { } } + /// Return a future that waits for the client to abort this H2 stream without + /// reading any body data. + /// + /// For HTTP/2 this resolves when the client resets the stream (RST_STREAM) or the + /// stream errors. Other protocols have no out-of-band abort signal, so this + /// returns `None` for them. + pub fn watch_h2_stream_reset(&mut self) -> Option> { + match self { + Self::H2(s) => Some(s.idle()), + _ => None, + } + } + pub fn as_http1(&self) -> Option<&SessionV1> { match self { Self::H1(s) => Some(s), diff --git a/pingora-proxy/src/proxy_h2.rs b/pingora-proxy/src/proxy_h2.rs index a5c58a7b..b5ff8373 100644 --- a/pingora-proxy/src/proxy_h2.rs +++ b/pingora-proxy/src/proxy_h2.rs @@ -213,14 +213,21 @@ where match ret { Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, None), Err(e) => { + let upstream_read_timeout = + e.esource == ErrorSource::Upstream && matches!(e.etype, ReadTimedout); + let downstream_error = e.esource == ErrorSource::Downstream; // On application level upstream read timeouts, send RST_STREAM CANCEL, - // we know we have not received END_STREAM at this point since we read timed out + // we know we have not received END_STREAM at this point since we read timed out. + // Also cancel the upstream stream when downstream goes away/resets so the + // upstream peer can release the stream promptly. // TODO: implement for write timeouts? - if e.esource == ErrorSource::Upstream && matches!(e.etype, ReadTimedout) { + if upstream_read_timeout || downstream_error { client_body.send_reset(h2::Reason::CANCEL); - // Mark the underlying H2 connection for shutdown so it's not used - // for new streams in case it is hung. - client_session.conn.mark_shutdown(); + if upstream_read_timeout { + // Mark the underlying H2 connection for shutdown so it's not used + // for new streams in case it is hung. + client_session.conn.mark_shutdown(); + } } (false, Some(e)) } @@ -470,6 +477,35 @@ where Ok(request_done) => { downstream_state.maybe_finished(request_done); }, + Err(e) if e.esource == ErrorSource::Downstream => { + // Downstream reset/errored while the upstream write was blocked + // (e.g. on upstream flow control). Same policy as the read error + // handling above: ignore the downstream error if the upstream + // response is being admitted to cache, otherwise fail so the + // downstream stream handles are dropped promptly. + let wait_for_cache_fill = (!serve_from_cache.is_on() && support_cache_partial_read) + || serve_from_cache.is_miss(); + if !wait_for_cache_fill { + return Err(e); + } + // ignore downstream error so that upstream can continue to write cache + downstream_state.to_errored(); + if !self.inner.suppress_proxy_warn_log( + session, + ctx, + &e, + ProxyWarnLogContext::DownstreamCache, + ) { + warn!( + "Downstream Error ignored during caching: {}, {}", + e, + self.inner.request_summary(session, ctx) + ); + } + // This will not be treated as a final error, but we should signal to + // downstream session anyway. + session.downstream_session.on_proxy_failure(e); + }, Err(e) => { // mark request done, attempt to drain receive warn!("Upstream h2 body send error: {e}"); @@ -897,15 +933,42 @@ where return Ok(false); } - if let Some(data) = data { - debug!("Write {} bytes body to h2 upstream", data.len()); - write_body(client_body, data, end_of_body, write_timeout) - .await - .map_err(|e| e.into_up())?; + let (data, end) = match data { + Some(data) => { + debug!("Write {} bytes body to h2 upstream", data.len()); + (data, end_of_body) + } + None => { + debug!("Read downstream body done"); + /* send a standalone END_STREAM flag */ + (Bytes::new(), true) + } + }; + + /* For H2 downstreams, race the upstream write against a downstream stream + * reset. A write blocked on upstream flow control would otherwise keep the + * downstream stream handles referenced while a downstream RST_STREAM goes + * unobserved, pinning the downstream connection window credit until the + * write completes. */ + if let Some(reset) = session.downstream_session.watch_h2_stream_reset() { + tokio::select! { + biased; + res = write_body(client_body, data, end, write_timeout) => { + res.map_err(|e| e.into_up())?; + } + reset = reset => { + return match reset { + Ok(reason) => Error::e_explain( + H2Error, + format!("downstream reset stream (reason: {reason}) while writing body to upstream"), + ), + Err(e) => Err(e), + } + .map_err(|e| e.into_down()); + } + } } else { - debug!("Read downstream body done"); - /* send a standalone END_STREAM flag */ - write_body(client_body, Bytes::new(), true, write_timeout) + write_body(client_body, data, end, write_timeout) .await .map_err(|e| e.into_up())?; } diff --git a/pingora-proxy/tests/test_basic.rs b/pingora-proxy/tests/test_basic.rs index 1226e747..85a3bfc1 100644 --- a/pingora-proxy/tests/test_basic.rs +++ b/pingora-proxy/tests/test_basic.rs @@ -24,9 +24,7 @@ use hyperlocal::{UnixClientExt, Uri}; use reqwest::{header, StatusCode}; #[cfg(feature = "patched_http1")] use tokio::io::{AsyncReadExt, AsyncWriteExt}; -#[cfg(feature = "patched_http1")] -use tokio::net::TcpListener; -use tokio::net::TcpStream; +use tokio::net::{TcpListener, TcpStream}; use utils::server_utils::{ init, reset_suppress_proxy_warn_log_calls, suppress_proxy_warn_log_calls, @@ -1132,3 +1130,273 @@ async fn test_103_die() { let res = reqwest::get("http://127.0.0.1:6147/103-die").await.unwrap(); assert_eq!(res.status(), StatusCode::BAD_GATEWAY); } + +// A downstream RST_STREAM must be observed even while the proxy is blocked writing the +// request body to the upstream (parked on h2 flow control). Otherwise the stream is held +// open as a zombie: its handles stay referenced and the downstream connection-window +// credit is never released. On catching the RST the proxy should also cancel the +// upstream stream promptly. +#[tokio::test] +async fn test_h2_downstream_rst_while_upstream_write_blocked() { + use std::future::poll_fn; + use std::time::Duration; + + init(); + + // An h2c upstream that accepts one request but never reads its body and never + // sends window updates, so the proxy's upstream write blocks on flow control. + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let upstream_port = listener.local_addr().unwrap().port(); + let (reset_tx, reset_rx) = tokio::sync::oneshot::channel::(); + + tokio::spawn(async move { + let (io, _) = listener.accept().await.unwrap(); + let mut conn = h2::server::Builder::new() + // tiny stream window so the proxy's write parks quickly + .initial_window_size(1024) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let (req, mut send_response) = conn.accept().await.unwrap().unwrap(); + // hold the body reader without reading it: no window updates are granted + let _body = req.into_body(); + + // keep driving the connection in the background + tokio::spawn(async move { + while let Some(res) = conn.accept().await { + if res.is_err() { + break; + } + } + }); + + // wait for the proxy to reset our stream + let reason = poll_fn(|cx| send_response.poll_reset(cx)).await.unwrap(); + let _ = reset_tx.send(reason); + }); + + // h2c downstream client to the proxy + let tcp = TcpStream::connect("127.0.0.1:6146").await.unwrap(); + let (mut client, conn) = client::handshake(tcp).await.unwrap(); + tokio::spawn(async move { + // ignore errors: the proxy may tear the connection down after the RST + let _ = conn.await; + }); + + let req = Request::builder() + .method("POST") + .uri("http://127.0.0.1:6146/") + .header("x-h2", "true") + .header("x-port", upstream_port.to_string()) + .body(()) + .unwrap(); + + let (_response, mut req_body) = client.send_request(req, false).unwrap(); + + // Push body until the proxy stops granting capacity, meaning it is no longer + // reading the downstream body because its upstream write is parked on flow control. + let mut sent = 0usize; + while sent < 512 * 1024 { + req_body.reserve_capacity(16 * 1024); + let granted = match tokio::time::timeout( + Duration::from_millis(500), + poll_fn(|cx| req_body.poll_capacity(cx)), + ) + .await + { + Ok(Some(Ok(n))) => n, + Ok(other) => panic!("downstream send capacity error: {other:?}"), + // no new capacity for a while: the proxy is parked on the upstream write + Err(_) => break, + }; + req_body + .send_data(Bytes::from(vec![0u8; granted]), false) + .unwrap(); + sent += granted; + } + // we must have at least filled the upstream stream window for the write to park + assert!(sent >= 1024, "only sent {sent} bytes"); + + // reset the stream while the proxy is blocked writing upstream + req_body.send_reset(h2::Reason::CANCEL); + + // the proxy should catch the RST promptly (not hang on the blocked write) + // and cancel the upstream stream + let reason = tokio::time::timeout(Duration::from_secs(5), reset_rx) + .await + .expect("proxy did not cancel the upstream stream after the downstream RST") + .expect("upstream watcher task died before observing a reset"); + assert_eq!(reason, h2::Reason::CANCEL); +} + +// Same as above, but with caching enabled and a cacheable upstream response mid-admission: +// the downstream RST caught during the blocked upstream write must be ignored (like +// downstream read errors are during caching) so the cache fill can complete. +#[tokio::test] +async fn test_h2_downstream_rst_during_cache_fill() { + use std::future::poll_fn; + use std::time::Duration; + + init(); + + let uri = "http://127.0.0.1:6154/test_h2_downstream_rst_during_cache_fill"; + + // An h2c upstream that never reads the request body (so the proxy's upstream write + // parks on flow control) but streams a cacheable response: first chunk right away, + // final chunk + EOS only after the test signals that downstream sent its RST. + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let upstream_port = listener.local_addr().unwrap().port(); + let (finish_tx, finish_rx) = tokio::sync::oneshot::channel::<()>(); + let (done_tx, done_rx) = tokio::sync::oneshot::channel::<()>(); + + tokio::spawn(async move { + let (io, _) = listener.accept().await.unwrap(); + let mut conn = h2::server::Builder::new() + // tiny stream window so the proxy's request body write parks quickly + .initial_window_size(1024) + .handshake::<_, Bytes>(io) + .await + .unwrap(); + let (req, mut send_response) = conn.accept().await.unwrap().unwrap(); + // hold the body reader without reading it: no window updates are granted + let body = req.into_body(); + + // keep driving the connection in the background + tokio::spawn(async move { + while let Some(res) = conn.accept().await { + if res.is_err() { + break; + } + } + }); + + let resp = http::Response::builder() + .status(200) + .header("cache-control", "max-age=30") + .body(()) + .unwrap(); + let mut resp_body = send_response.send_response(resp, false).unwrap(); + resp_body + .send_data(Bytes::from_static(b"hello "), false) + .unwrap(); + + // hold the end of the response so the cache fill is still in progress + // when the downstream resets + finish_rx.await.unwrap(); + resp_body + .send_data(Bytes::from_static(b"world!"), true) + .unwrap(); + + // Keep the stream handles alive until the test is done. Dropping them here + // would make h2 send an implicit RST_STREAM(NO_ERROR) (response complete + // without consuming the request body), which the proxy's upstream read can + // observe before the clean EOS and abort the cache admission. + let _ = done_rx.await; + drop(body); + drop(resp_body); + }); + + // h2c downstream client to the caching proxy service + let tcp = TcpStream::connect("127.0.0.1:6154").await.unwrap(); + let (mut client, conn) = client::handshake(tcp).await.unwrap(); + tokio::spawn(async move { + let _ = conn.await; + }); + + let req = Request::builder() + .method("POST") + .uri(uri) + .header("x-h2", "true") + .header("x-port", upstream_port.to_string()) + .body(()) + .unwrap(); + let (response, mut req_body) = client.send_request(req, false).unwrap(); + + // a small first body chunk, fits the upstream window + req_body.reserve_capacity(16); + let granted = poll_fn(|cx| req_body.poll_capacity(cx)) + .await + .unwrap() + .unwrap(); + assert!(granted >= 16); + req_body + .send_data(Bytes::from_static(b"upload.........."), false) + .unwrap(); + + // wait for the response header and first chunk: the miss admission + // (cache fill) is now in progress + let (head, mut resp_body) = response.await.unwrap().into_parts(); + assert_eq!(head.status, 200); + assert_eq!(head.headers.get("x-cache-status").unwrap(), "miss"); + let chunk = resp_body.data().await.unwrap().unwrap(); + assert_eq!(&chunk[..], b"hello "); + let _ = resp_body.flow_control().release_capacity(chunk.len()); + + // flood the request body until the proxy stops granting capacity, i.e. it is + // parked writing to the upstream whose window is exhausted + let mut sent = 0usize; + while sent < 512 * 1024 { + req_body.reserve_capacity(16 * 1024); + let granted = match tokio::time::timeout( + Duration::from_millis(500), + poll_fn(|cx| req_body.poll_capacity(cx)), + ) + .await + { + Ok(Some(Ok(n))) => n, + Ok(other) => panic!("downstream send capacity error: {other:?}"), + Err(_) => break, + }; + req_body + .send_data(Bytes::from(vec![0u8; granted]), false) + .unwrap(); + sent += granted; + } + assert!(sent >= 1024, "only sent {sent} bytes"); + + // reset the stream while the proxy is blocked writing upstream; + // since a cache fill is in progress, the proxy should swallow this error + // and keep admitting the upstream response + req_body.send_reset(h2::Reason::CANCEL); + + // give the proxy a moment to observe the RST, then let the upstream finish + // the response + tokio::time::sleep(Duration::from_millis(200)).await; + finish_tx.send(()).unwrap(); + + // let the fill complete + tokio::time::sleep(Duration::from_secs(1)).await; + + // the object must now be fully in cache: a new request is a hit with the + // complete body and does not need the (single use) upstream + let tcp = TcpStream::connect("127.0.0.1:6154").await.unwrap(); + let (mut client, conn) = client::handshake(tcp).await.unwrap(); + tokio::spawn(async move { + let _ = conn.await; + }); + let req = Request::builder() + .method("GET") + .uri(uri) + .header("x-port", upstream_port.to_string()) + .body(()) + .unwrap(); + let (response, _) = client.send_request(req, true).unwrap(); + let (head, mut resp_body) = tokio::time::timeout(Duration::from_secs(5), response) + .await + .expect("no response for the second request") + .unwrap() + .into_parts(); + assert_eq!(head.status, 200); + assert_eq!(head.headers.get("x-cache-status").unwrap(), "hit"); + + let mut body = Vec::new(); + while let Some(chunk) = resp_body.data().await { + let chunk = chunk.unwrap(); + let _ = resp_body.flow_control().release_capacity(chunk.len()); + body.extend_from_slice(&chunk); + } + assert_eq!(body, b"hello world!"); + + // release the upstream's stream handles + let _ = done_tx.send(()); +}