diff --git a/tower/src/load/completion.rs b/tower/src/load/completion.rs index 50c441164..c5e0c2af5 100644 --- a/tower/src/load/completion.rs +++ b/tower/src/load/completion.rs @@ -76,9 +76,17 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); - let rsp = ready!(this.future.poll(cx))?; - let h = this.handle.take().expect("handle"); - Poll::Ready(Ok(this.completion.track_completion(h, rsp))) + + match ready!(this.future.poll(cx)) { + Ok(rsp) => { + let h = this.handle.take().expect("handle"); + Poll::Ready(Ok(this.completion.track_completion(h, rsp))) + } + Err(err) => { + drop(this.handle.take().expect("handle")); + Poll::Ready(Err(err)) + } + } } } diff --git a/tower/src/load/peak_ewma.rs b/tower/src/load/peak_ewma.rs index b70d4b6b4..76972a0ac 100644 --- a/tower/src/load/peak_ewma.rs +++ b/tower/src/load/peak_ewma.rs @@ -392,6 +392,56 @@ mod tests { assert!(svc.load() < Cost(100_000.0)); } + // Regression test for #858: a request that resolves to `Err` must stop + // contributing to load as soon as the response future resolves, not only + // when the future object is dropped. + #[tokio::test] + async fn completes_on_error() { + time::pause(); + + struct ErrSvc; + impl Service<()> for ErrSvc { + type Response = (); + type Error = (); + type Future = future::Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, (): ()) -> Self::Future { + future::ready(Err(())) + } + } + + let mut svc = PeakEwma::new( + ErrSvc, + Duration::from_millis(10), + NANOS_PER_MILLI * 1_000.0, + CompleteOnResponse, + ); + + let baseline = svc.load(); + let mut fut = task::spawn(svc.call(())); + let pending = svc.load(); + assert!(pending > baseline, "an in-flight request should raise load"); + + // Poll the failed response to completion. The future object is kept + // alive afterwards. + assert!(matches!(fut.poll(), Poll::Ready(Err(())))); + + // With the fix, the failed response releases its load handle at + // completion, so it no longer counts as pending even though `fut` has + // not been dropped. + let after = svc.load(); + assert!( + after < pending, + "a failed request must stop counting as pending once its response resolves" + ); + + drop(fut); + } + #[test] fn nanos() { assert_eq!(super::nanos(Duration::new(0, 0)), 0.0); diff --git a/tower/src/load/pending_requests.rs b/tower/src/load/pending_requests.rs index d11f62cb8..720dd656d 100644 --- a/tower/src/load/pending_requests.rs +++ b/tower/src/load/pending_requests.rs @@ -216,4 +216,39 @@ mod tests { drop(i0); assert_eq!(svc.load(), Count(0)); } + + // Regression test for #858: a request that resolves to `Err` must stop + // counting as pending as soon as its response future resolves, not only + // once the future object is dropped. + #[test] + fn failed_response_completes_immediately() { + use tokio_test::task; + + struct ErrSvc; + impl Service<()> for ErrSvc { + type Response = (); + type Error = (); + type Future = future::Ready>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, (): ()) -> Self::Future { + future::ready(Err(())) + } + } + + let mut svc = PendingRequests::new(ErrSvc, CompleteOnResponse); + assert_eq!(svc.load(), Count(0)); + + let mut fut = task::spawn(svc.call(())); + assert_eq!(svc.load(), Count(1)); + + assert!(matches!(fut.poll(), Poll::Ready(Err(())))); + + // The request has completed (with an error), so it must no longer be + // counted as pending -- even though `fut` has not been dropped yet. + assert_eq!(svc.load(), Count(0)); + } }