Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions tower/src/load/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,17 @@ where

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let rsp = ready!(this.future.poll(cx))?;
let h = this.handle.take().expect("handle");
Poll::Ready(Ok(this.completion.track_completion(h, rsp)))

match ready!(this.future.poll(cx)) {
Ok(rsp) => {
let h = this.handle.take().expect("handle");
Poll::Ready(Ok(this.completion.track_completion(h, rsp)))
}
Err(err) => {
drop(this.handle.take().expect("handle"));
Poll::Ready(Err(err))
}
}
}
}

Expand Down
50 changes: 50 additions & 0 deletions tower/src/load/peak_ewma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,56 @@ mod tests {
assert!(svc.load() < Cost(100_000.0));
}

// Regression test for #858: a request that resolves to `Err` must stop
// contributing to load as soon as the response future resolves, not only
// when the future object is dropped.
#[tokio::test]
async fn completes_on_error() {
time::pause();

struct ErrSvc;
impl Service<()> for ErrSvc {
type Response = ();
type Error = ();
type Future = future::Ready<Result<(), ()>>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, (): ()) -> Self::Future {
future::ready(Err(()))
}
}

let mut svc = PeakEwma::new(
ErrSvc,
Duration::from_millis(10),
NANOS_PER_MILLI * 1_000.0,
CompleteOnResponse,
);

let baseline = svc.load();
let mut fut = task::spawn(svc.call(()));
let pending = svc.load();
assert!(pending > baseline, "an in-flight request should raise load");

// Poll the failed response to completion. The future object is kept
// alive afterwards.
assert!(matches!(fut.poll(), Poll::Ready(Err(()))));

// With the fix, the failed response releases its load handle at
// completion, so it no longer counts as pending even though `fut` has
// not been dropped.
let after = svc.load();
assert!(
after < pending,
"a failed request must stop counting as pending once its response resolves"
);

drop(fut);
}

#[test]
fn nanos() {
assert_eq!(super::nanos(Duration::new(0, 0)), 0.0);
Expand Down
35 changes: 35 additions & 0 deletions tower/src/load/pending_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,39 @@ mod tests {
drop(i0);
assert_eq!(svc.load(), Count(0));
}

// Regression test for #858: a request that resolves to `Err` must stop
// counting as pending as soon as its response future resolves, not only
// once the future object is dropped.
#[test]
fn failed_response_completes_immediately() {
use tokio_test::task;

struct ErrSvc;
impl Service<()> for ErrSvc {
type Response = ();
type Error = ();
type Future = future::Ready<Result<(), ()>>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), ()>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, (): ()) -> Self::Future {
future::ready(Err(()))
}
}

let mut svc = PendingRequests::new(ErrSvc, CompleteOnResponse);
assert_eq!(svc.load(), Count(0));

let mut fut = task::spawn(svc.call(()));
assert_eq!(svc.load(), Count(1));

assert!(matches!(fut.poll(), Poll::Ready(Err(()))));

// The request has completed (with an error), so it must no longer be
// counted as pending -- even though `fut` has not been dropped yet.
assert_eq!(svc.load(), Count(0));
}
}