Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pingora-core/src/protocols/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::custom::server::Session as SessionCustom;
use super::error_resp;
use super::subrequest::server::HttpSession as SessionSubrequest;
use super::v1::server::HttpSession as SessionV1;
use super::v2::server::HttpSession as SessionV2;
use super::v2::server::{HttpSession as SessionV2, Idle};
use super::HttpTask;
use crate::custom_session;
use crate::protocols::{Digest, SocketAddr, Stream};
Expand Down Expand Up @@ -682,6 +682,19 @@ impl Session {
}
}

/// Return a future that waits for the client to abort this H2 stream without
/// reading any body data.
///
/// For HTTP/2 this resolves when the client resets the stream (RST_STREAM) or the
/// stream errors. Other protocols have no out-of-band abort signal, so this
/// returns `None` for them.
pub fn watch_h2_stream_reset(&mut self) -> Option<Idle<'_>> {
match self {
Self::H2(s) => Some(s.idle()),
_ => None,
}
}

pub fn as_http1(&self) -> Option<&SessionV1> {
match self {
Self::H1(s) => Some(s),
Expand Down
89 changes: 76 additions & 13 deletions pingora-proxy/src/proxy_h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,21 @@ where
match ret {
Ok((downstream_can_reuse, _upstream)) => (downstream_can_reuse, None),
Err(e) => {
let upstream_read_timeout =
e.esource == ErrorSource::Upstream && matches!(e.etype, ReadTimedout);
let downstream_error = e.esource == ErrorSource::Downstream;
// On application level upstream read timeouts, send RST_STREAM CANCEL,
// we know we have not received END_STREAM at this point since we read timed out
// we know we have not received END_STREAM at this point since we read timed out.
// Also cancel the upstream stream when downstream goes away/resets so the
// upstream peer can release the stream promptly.
// TODO: implement for write timeouts?
if e.esource == ErrorSource::Upstream && matches!(e.etype, ReadTimedout) {
if upstream_read_timeout || downstream_error {
client_body.send_reset(h2::Reason::CANCEL);
// Mark the underlying H2 connection for shutdown so it's not used
// for new streams in case it is hung.
client_session.conn.mark_shutdown();
if upstream_read_timeout {
// Mark the underlying H2 connection for shutdown so it's not used
// for new streams in case it is hung.
client_session.conn.mark_shutdown();
}
}
(false, Some(e))
}
Expand Down Expand Up @@ -470,6 +477,35 @@ where
Ok(request_done) => {
downstream_state.maybe_finished(request_done);
},
Err(e) if e.esource == ErrorSource::Downstream => {
// Downstream reset/errored while the upstream write was blocked
// (e.g. on upstream flow control). Same policy as the read error
// handling above: ignore the downstream error if the upstream
// response is being admitted to cache, otherwise fail so the
// downstream stream handles are dropped promptly.
let wait_for_cache_fill = (!serve_from_cache.is_on() && support_cache_partial_read)
|| serve_from_cache.is_miss();
if !wait_for_cache_fill {
return Err(e);
}
// ignore downstream error so that upstream can continue to write cache
downstream_state.to_errored();
if !self.inner.suppress_proxy_warn_log(
session,
ctx,
&e,
ProxyWarnLogContext::DownstreamCache,
) {
warn!(
"Downstream Error ignored during caching: {}, {}",
e,
self.inner.request_summary(session, ctx)
);
}
// This will not be treated as a final error, but we should signal to
// downstream session anyway.
session.downstream_session.on_proxy_failure(e);
},
Err(e) => {
// mark request done, attempt to drain receive
warn!("Upstream h2 body send error: {e}");
Expand Down Expand Up @@ -897,15 +933,42 @@ where
return Ok(false);
}

if let Some(data) = data {
debug!("Write {} bytes body to h2 upstream", data.len());
write_body(client_body, data, end_of_body, write_timeout)
.await
.map_err(|e| e.into_up())?;
let (data, end) = match data {
Some(data) => {
debug!("Write {} bytes body to h2 upstream", data.len());
(data, end_of_body)
}
None => {
debug!("Read downstream body done");
/* send a standalone END_STREAM flag */
(Bytes::new(), true)
}
};

/* For H2 downstreams, race the upstream write against a downstream stream
* reset. A write blocked on upstream flow control would otherwise keep the
* downstream stream handles referenced while a downstream RST_STREAM goes
* unobserved, pinning the downstream connection window credit until the
* write completes. */
if let Some(reset) = session.downstream_session.watch_h2_stream_reset() {
tokio::select! {
biased;
res = write_body(client_body, data, end, write_timeout) => {
res.map_err(|e| e.into_up())?;
}
reset = reset => {
return match reset {
Ok(reason) => Error::e_explain(
H2Error,
format!("downstream reset stream (reason: {reason}) while writing body to upstream"),
),
Err(e) => Err(e),
}
.map_err(|e| e.into_down());
}
}
} else {
debug!("Read downstream body done");
/* send a standalone END_STREAM flag */
write_body(client_body, Bytes::new(), true, write_timeout)
write_body(client_body, data, end, write_timeout)
.await
.map_err(|e| e.into_up())?;
}
Expand Down
Loading
Loading