Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions tower/src/util/call_all/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,33 @@ where
.expect("Service already taken")
}

pub(crate) fn unordered(mut self) -> super::CallAllUnordered<Svc, S> {
assert!(self.queue.is_empty() && !self.eof);
/// Transition this `CallAll` instance into a `CallAllUnordered` stream.
///
/// This conversion preserves the internal stream, backpressure flags (`eof`),
/// and any pulled but un-submitted request currently sitting in `curr_req`.
pub(crate) fn unordered(self) -> super::CallAllUnordered<Svc, S> {
// Ensure we don't discard any active, in-flight response futures.
assert!(self.queue.is_empty());

let CallAll {
service,
stream,
queue: _,
eof,
curr_req,
} = self;

// Reassemble the internal state machine while transitioning
// to an unordered concurrency driver.
let inner = CallAll {
service,
stream,
queue: futures_util::stream::FuturesUnordered::new(),
eof,
curr_req,
};

super::CallAllUnordered::new(self.service.take().unwrap(), self.stream)
super::CallAllUnordered::from_inner(inner)
}
}

Expand Down
12 changes: 10 additions & 2 deletions tower/src/util/call_all/unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,22 @@ where
S: Stream,
{
/// Create new [`CallAllUnordered`] combinator.
///
/// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
pub fn new(service: Svc, stream: S) -> CallAllUnordered<Svc, S> {
CallAllUnordered {
inner: common::CallAll::new(service, stream, FuturesUnordered::new()),
}
}

/// Create a new [`CallAllUnordered`] from an existing, partially-evaluated [`CallAll`] instance.
///
/// This constructor allows type-safe state migrations across combinators
/// without spilling buffered requests from `curr_req`.
pub(crate) fn from_inner(
inner: common::CallAll<Svc, S, FuturesUnordered<Svc::Future>>,
) -> Self {
CallAllUnordered { inner }
}

/// Extract the wrapped [`Service`].
///
/// # Panics
Expand Down
33 changes: 32 additions & 1 deletion tower/tests/util/call_all.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::support;
use futures_core::Stream;
use futures_util::pin_mut;
use futures_util::{pin_mut, StreamExt};
use std::fmt;
use std::future::{ready, Future, Ready};
use std::task::{Context, Poll};
Expand Down Expand Up @@ -249,3 +249,34 @@ async fn stream_does_not_block_service() {
let res = assert_ready!(task.enter(|cx, _| call.as_mut().poll(cx)));
assert_eq!(res.unwrap(), "res");
}

#[tokio::test]
async fn call_all_unordered_preserves_curr_req_on_conversion() {
let _t = support::trace_init();

let (mock, mut handle) = mock::pair::<_, &'static str>();
let mut task = task::spawn(());

let items = vec!["request-1"];
let stream = futures_util::stream::iter(items);

// Keep ca as an owned, unpinned value so we can move it later.
let mut ca = mock.call_all(stream);

// Drive the poll via Pin::new to safely access poll_next on the mutable borrow.
assert_pending!(task.enter(|cx, _| std::pin::Pin::new(&mut ca).poll_next(cx)));

// Catch the request on the mock handle side.
let (_, ready_tx) = handle.next_request().await.unwrap();

// Move the unpinned, owned ca safely into the conversion method.
let unordered_stream = ca.unordered();
pin_mut!(unordered_stream);

// Satisfy backpressure requirements and reply.
ready_tx.send_response("response-1");

// Pull the final response from the migrated unordered stream.
let response = assert_ready!(task.enter(|cx, _| unordered_stream.as_mut().poll_next(cx)));
assert_eq!(response.transpose().unwrap(), Some("response-1"));
}