shed/futures_01_ext/src/lib.rs (937 lines of code) (raw):

/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under both the MIT license found in the * LICENSE-MIT file in the root directory of this source tree and the Apache * License, Version 2.0 found in the LICENSE-APACHE file in the root directory * of this source tree. */ #![deny(warnings, missing_docs, clippy::all, rustdoc::broken_intra_doc_links)] #![feature(never_type)] //! Crate extending functionality of [`futures`] crate use bytes_old::Bytes; use futures::sync::{mpsc, oneshot}; use futures::{future, stream, try_ready, Async, AsyncSink, Future, Poll, Sink, Stream}; use std::{fmt::Debug, io as std_io}; use tokio_io::{ codec::{Decoder, Encoder}, AsyncWrite, }; mod bytes_stream; pub mod decode; pub mod encode; mod futures_ordered; pub mod io; mod select_all; mod split_err; mod stream_wrappers; mod streamfork; pub use crate::bytes_stream::{BytesStream, BytesStreamFuture}; pub use crate::futures_ordered::{futures_ordered, FuturesOrdered}; pub use crate::select_all::{select_all, SelectAll}; pub use crate::split_err::split_err; pub use crate::stream_wrappers::{CollectNoConsume, CollectTo}; // Re-exports. Those are used by the macros in this crate in order to reference a stable version of // what "futures" means. pub use futures as futures_reexport; /// Map `Item` and `Error` to `()` /// /// Adapt an existing `Future` to return unit `Item` and `Error`, while still /// waiting for the underlying `Future` to complete. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Discard<F>(F); impl<F> Discard<F> { /// Create instance wrapping `f` pub fn new(f: F) -> Self { Discard(f) } } impl<F> Future for Discard<F> where F: Future, { type Item = (); type Error = (); fn poll(&mut self) -> Poll<Self::Item, Self::Error> { match self.0.poll() { Err(_) => Err(()), Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(_)) => Ok(Async::Ready(())), } } } /// Send an item over an mpsc channel, discarding both the sender and receiver-closed errors. This /// should be used when the receiver being closed makes sending values moot, since no one is /// interested in the results any more. /// /// `E` is an arbitrary error type useful for getting types to match up, but it will never be /// produced by the returned future. #[inline] pub fn send_discard<T, E>( sender: mpsc::Sender<T>, value: T, ) -> impl Future<Item = (), Error = E> + Send where T: Send, E: Send, { sender.send(value).then(|_| Ok(())) } /// Replacement for BoxFuture, deprecated in upstream futures-rs. pub type BoxFuture<T, E> = Box<dyn Future<Item = T, Error = E> + Send>; /// Replacement for BoxFutureNonSend, deprecated in upstream futures-rs. pub type BoxFutureNonSend<T, E> = Box<dyn Future<Item = T, Error = E>>; /// Replacement for BoxStream, deprecated in upstream futures-rs. pub type BoxStream<T, E> = Box<dyn Stream<Item = T, Error = E> + Send>; /// Replacement for BoxStreamNonSend, deprecated in upstream futures-rs. pub type BoxStreamNonSend<T, E> = Box<dyn Stream<Item = T, Error = E>>; /// Do something with an error if the future failed. /// /// This is created by the `FutureExt::inspect_err` method. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] pub struct InspectErr<A, F> where A: Future, { future: A, f: Option<F>, } impl<A, F> Future for InspectErr<A, F> where A: Future, F: FnOnce(&A::Error), { type Item = A::Item; type Error = A::Error; fn poll(&mut self) -> Poll<A::Item, A::Error> { match self.future.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(e)) => Ok(Async::Ready(e)), Err(e) => { self.f.take().map_or_else( // Act like a fused future || Ok(Async::NotReady), |func| { func(&e); Err(e) }, ) } } } } /// Inspect the Result returned by a future /// /// This is created by the `FutureExt::inspect_result` method. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] pub struct InspectResult<A, F> where A: Future, { future: A, f: Option<F>, } impl<A, F> Future for InspectResult<A, F> where A: Future, F: FnOnce(Result<&A::Item, &A::Error>), { type Item = A::Item; type Error = A::Error; fn poll(&mut self) -> Poll<A::Item, A::Error> { match self.future.poll() { Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(i)) => self.f.take().map_or_else( // Act like a fused future || Ok(Async::NotReady), |func| { func(Ok(&i)); Ok(Async::Ready(i)) }, ), Err(e) => self.f.take().map_or_else( // Act like a fused future || Ok(Async::NotReady), |func| { func(Err(&e)); Err(e) }, ), } } } /// A trait implemented by default for all Futures which extends the standard /// functionality. pub trait FutureExt: Future + Sized { /// Map a `Future` to have `Item=()` and `Error=()`. This is /// useful when a future is being used to drive a computation /// but the actual results aren't interesting (such as when used /// with `Handle::spawn()`). fn discard(self) -> Discard<Self> { Discard(self) } /// Create a `Send`able boxed version of this `Future`. #[inline] fn boxify(self) -> BoxFuture<Self::Item, Self::Error> where Self: 'static + Send, { // TODO: (rain1) T21801845 rename to 'boxed' once gone from upstream. Box::new(self) } /// Create a non-`Send`able boxed version of this `Future`. #[inline] fn boxify_nonsend(self) -> BoxFutureNonSend<Self::Item, Self::Error> where Self: 'static, { Box::new(self) } /// Shorthand for returning [`future::Either::A`] fn left_future<B>(self) -> future::Either<Self, B> { future::Either::A(self) } /// Shorthand for returning [`future::Either::B`] fn right_future<A>(self) -> future::Either<A, Self> { future::Either::B(self) } /// Similar to [`future::Future::inspect`], but runs the function on error fn inspect_err<F>(self, f: F) -> InspectErr<Self, F> where F: FnOnce(&Self::Error), Self: Sized, { InspectErr { future: self, f: Some(f), } } /// Similar to [`future::Future::inspect`], but runs the function on both /// output or error of the Future treating it as a regular [`Result`] fn inspect_result<F>(self, f: F) -> InspectResult<Self, F> where F: FnOnce(Result<&Self::Item, &Self::Error>), Self: Sized, { InspectResult { future: self, f: Some(f), } } } impl<T> FutureExt for T where T: Future {} /// Params for [StreamExt::buffered_weight_limited] and [WeightLimitedBufferedStream] pub struct BufferedParams { /// Limit for the sum of weights in the [WeightLimitedBufferedStream] stream pub weight_limit: u64, /// Limit for size of buffer in the [WeightLimitedBufferedStream] stream pub buffer_size: usize, } /// A trait implemented by default for all Streams which extends the standard /// functionality. pub trait StreamExt: Stream { /// Fork elements in a stream out to two sinks, depending on a predicate /// /// If the predicate returns false, send the item to `out1`, otherwise to /// `out2`. `streamfork()` acts in a similar manner to `forward()` in that it /// keeps operating until the input stream ends, and then returns everything /// in the resulting Future. /// /// The predicate returns a `Result` so that it can fail (if there's a malformed /// input that can't be assigned to either output). fn streamfork<Out1, Out2, F, E>( self, out1: Out1, out2: Out2, pred: F, ) -> streamfork::Forker<Self, Out1, Out2, F, E> where Self: Sized, Out1: Sink<SinkItem = Self::Item>, Out2: Sink<SinkItem = Self::Item, SinkError = Out1::SinkError>, F: FnMut(&Self::Item) -> Result<bool, E>, E: From<Self::Error> + From<Out1::SinkError> + From<Out2::SinkError>, { streamfork::streamfork(self, out1, out2, pred) } /// Returns a future that yields a `(Vec<<Self>::Item>, Self)`, where the /// vector is a collections of all elements yielded by the Stream. fn collect_no_consume(self) -> CollectNoConsume<Self> where Self: Sized, { stream_wrappers::collect_no_consume::new(self) } /// A shorthand for [encode::encode] fn encode<Enc>(self, encoder: Enc) -> encode::LayeredEncoder<Self, Enc> where Self: Sized, Enc: Encoder<Item = Self::Item>, { encode::encode(self, encoder) } /// Similar to [std::iter::Iterator::enumerate], returns a Stream that yields /// `(usize, Self::Item)` where the first element of tuple is the iteration /// count. fn enumerate(self) -> Enumerate<Self> where Self: Sized, { Enumerate::new(self) } /// Creates a stream wrapper and a future. The future will resolve into the wrapped stream when /// the stream wrapper returns None. It uses ConservativeReceiver to ensure that deadlocks are /// easily caught when one tries to poll on the receiver before consuming the stream. fn return_remainder(self) -> (ReturnRemainder<Self>, ConservativeReceiver<Self>) where Self: Sized, { ReturnRemainder::new(self) } /// Whether this stream is empty. /// /// This will consume one element from the stream if returned. fn is_empty<'a>(self) -> Box<dyn Future<Item = bool, Error = Self::Error> + Send + 'a> where Self: 'a + Send + Sized, { Box::new( self.into_future() .map(|(first, _rest)| first.is_none()) .map_err(|(err, _rest)| err), ) } /// Whether this stream is not empty (has at least one element). /// /// This will consume one element from the stream if returned. fn not_empty<'a>(self) -> Box<dyn Future<Item = bool, Error = Self::Error> + Send + 'a> where Self: 'a + Send + Sized, { Box::new( self.into_future() .map(|(first, _rest)| first.is_some()) .map_err(|(err, _rest)| err), ) } /// Create a `Send`able boxed version of this `Stream`. #[inline] fn boxify(self) -> BoxStream<Self::Item, Self::Error> where Self: 'static + Send + Sized, { // TODO: (rain1) T21801845 rename to 'boxed' once gone from upstream. Box::new(self) } /// Create a non-`Send`able boxed version of this `Stream`. #[inline] fn boxify_nonsend(self) -> BoxStreamNonSend<Self::Item, Self::Error> where Self: 'static + Sized, { Box::new(self) } /// Shorthand for returning [`StreamEither::A`] fn left_stream<B>(self) -> StreamEither<Self, B> where Self: Sized, { StreamEither::A(self) } /// Shorthand for returning [`StreamEither::B`] fn right_stream<A>(self) -> StreamEither<A, Self> where Self: Sized, { StreamEither::B(self) } /// Similar to [Stream::chunks], but returns earlier if [futures::Async::NotReady] /// was returned. fn batch(self, limit: usize) -> BatchStream<Self> where Self: Sized, { BatchStream::new(self, limit) } /// Like [Stream::buffered] call, but can also limit number of futures in a buffer by "weight". fn buffered_weight_limited<I, E, Fut>( self, params: BufferedParams, ) -> WeightLimitedBufferedStream<Self, I, E> where Self: Sized + Send + 'static, Self: Stream<Item = (Fut, u64), Error = E>, Fut: Future<Item = I, Error = E>, { WeightLimitedBufferedStream::new(params, self) } /// Returns a Future that yields a collection `C` containing all `Self::Item` /// yielded by the stream fn collect_to<C: Default + Extend<Self::Item>>(self) -> CollectTo<Self, C> where Self: Sized, { CollectTo::new(self) } } impl<T> StreamExt for T where T: Stream {} /// Like [stream::Buffered], but can also limit number of futures in a buffer by "weight". pub struct WeightLimitedBufferedStream<S, I, E> { queue: stream::FuturesOrdered<BoxFuture<(I, u64), E>>, current_weight: u64, weight_limit: u64, max_buffer_size: usize, stream: stream::Fuse<S>, } impl<S, I, E> WeightLimitedBufferedStream<S, I, E> where S: Stream, { /// Create a new instance that will be configured using the `params` provided pub fn new(params: BufferedParams, stream: S) -> Self { Self { queue: stream::FuturesOrdered::new(), current_weight: 0, weight_limit: params.weight_limit, max_buffer_size: params.buffer_size, stream: stream.fuse(), } } } impl<S, Fut, I: 'static, E: 'static> Stream for WeightLimitedBufferedStream<S, I, E> where S: Stream<Item = (Fut, u64), Error = E>, Fut: Future<Item = I, Error = E> + Send + 'static, { type Item = I; type Error = E; fn poll(&mut self) -> Poll<Option<Self::Item>, E> { // First up, try to spawn off as many futures as possible by filling up // our slab of futures. while self.queue.len() < self.max_buffer_size && self.current_weight < self.weight_limit { let future = match self.stream.poll()? { Async::Ready(Some((s, weight))) => { self.current_weight += weight; s.map(move |val| (val, weight)).boxify() } Async::Ready(None) | Async::NotReady => break, }; self.queue.push(future); } // Try polling a new future if let Some((val, weight)) = try_ready!(self.queue.poll()) { self.current_weight -= weight; return Ok(Async::Ready(Some(val))); } // If we've gotten this far, then there are no events for us to process // and nothing was ready, so figure out if we're not done yet or if // we've reached the end. if self.stream.is_done() { Ok(Async::Ready(None)) } else { Ok(Async::NotReady) } } } /// Trait that provides a function for making a decoding layer on top of Stream of Bytes pub trait StreamLayeredExt: Stream<Item = Bytes> { /// Returnes a Stream that will yield decoded chunks of Bytes as they come /// using provided [Decoder] fn decode<Dec>(self, decoder: Dec) -> decode::LayeredDecode<Self, Dec> where Self: Sized, Dec: Decoder; } impl<T> StreamLayeredExt for T where T: Stream<Item = Bytes>, { fn decode<Dec>(self, decoder: Dec) -> decode::LayeredDecode<Self, Dec> where Self: Sized, Dec: Decoder, { decode::decode(self, decoder) } } /// Like [std::iter::Enumerate], but for Stream pub struct Enumerate<In> { inner: In, count: usize, } impl<In> Enumerate<In> { fn new(inner: In) -> Self { Enumerate { inner, count: 0 } } } impl<In: Stream> Stream for Enumerate<In> { type Item = (usize, In::Item); type Error = In::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { match self.inner.poll() { Err(err) => Err(err), Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::Ready(Some(v))) => { let c = self.count; self.count += 1; Ok(Async::Ready(Some((c, v)))) } } } } /// Like [future::Either], but for Stream pub enum StreamEither<A, B> { /// First branch of the type A(A), /// Second branch of the type B(B), } impl<A, B> Stream for StreamEither<A, B> where A: Stream, B: Stream<Item = A::Item, Error = A::Error>, { type Item = A::Item; type Error = A::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { match self { StreamEither::A(a) => a.poll(), StreamEither::B(b) => b.poll(), } } } /// This is a wrapper around oneshot::Receiver that will return error when the receiver was polled /// and the result was not ready. This is a very strict way of preventing deadlocks in code when /// receiver is polled before the sender has send the result pub struct ConservativeReceiver<T>(oneshot::Receiver<T>); /// Error that can be returned by [ConservativeReceiver] #[derive(Clone, Copy, PartialEq, Eq, Debug)] pub enum ConservativeReceiverError { /// The underlying [oneshot::Receiver] returned [oneshot::Canceled] Canceled, /// The underlying [oneshot::Receiver] returned [Async::NotReady], which means it was polled /// before the [oneshot::Sender] send some data ReceiveBeforeSend, } impl ::std::error::Error for ConservativeReceiverError { fn description(&self) -> &str { match self { ConservativeReceiverError::Canceled => "oneshot canceled", ConservativeReceiverError::ReceiveBeforeSend => "recv called on channel before send", } } } impl ::std::fmt::Display for ConservativeReceiverError { fn fmt(&self, fmt: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { match self { ConservativeReceiverError::Canceled => write!(fmt, "oneshot canceled"), ConservativeReceiverError::ReceiveBeforeSend => { write!(fmt, "recv called on channel before send") } } } } impl ::std::convert::From<oneshot::Canceled> for ConservativeReceiverError { fn from(_: oneshot::Canceled) -> ConservativeReceiverError { ConservativeReceiverError::Canceled } } impl<T> ConservativeReceiver<T> { /// Return an instance of [ConservativeReceiver] wrapping the [oneshot::Receiver] pub fn new(recv: oneshot::Receiver<T>) -> Self { ConservativeReceiver(recv) } } impl<T> Future for ConservativeReceiver<T> { type Item = T; type Error = ConservativeReceiverError; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { match self.0.poll()? { Async::Ready(item) => Ok(Async::Ready(item)), Async::NotReady => Err(ConservativeReceiverError::ReceiveBeforeSend), } } } /// A stream wrapper returned by [StreamExt::return_remainder] pub struct ReturnRemainder<In> { inner: Option<In>, send: Option<oneshot::Sender<In>>, } impl<In> ReturnRemainder<In> { fn new(inner: In) -> (Self, ConservativeReceiver<In>) { let (send, recv) = oneshot::channel(); ( Self { inner: Some(inner), send: Some(send), }, ConservativeReceiver::new(recv), ) } } impl<In: Stream> Stream for ReturnRemainder<In> { type Item = In::Item; type Error = In::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { let maybe_item = match self.inner { Some(ref mut inner) => try_ready!(inner.poll()), None => return Ok(Async::Ready(None)), }; if maybe_item.is_none() { let inner = self .inner .take() .expect("inner was just polled, should be some"); let send = self.send.take().expect("send is None iff inner is None"); // The Receiver will handle errors let _ = send.send(inner); } Ok(Async::Ready(maybe_item)) } } /// A convenience macro for working with `io::Result<T>` from the `Read` and /// `Write` traits. /// /// This macro takes `io::Result<T>` as input, and returns `Poll<T, io::Error>` /// as the output. If the input type is of the `Err` variant, then /// `Poll::NotReady` is returned if it indicates `WouldBlock` or otherwise `Err` /// is returned. #[macro_export] #[rustfmt::skip] macro_rules! handle_nb { ($e:expr) => { match $e { Ok(t) => Ok(::futures::Async::Ready(t)), Err(ref e) if e.kind() == ::std::io::ErrorKind::WouldBlock => { Ok(::futures::Async::NotReady) } Err(e) => Err(e), } }; } /// Macro that can be used like `?` operator, but in the context where the expected return type is /// BoxFuture. The result of it is either Ok part of Result or immediate returning the Err part /// converted into BoxFuture. #[macro_export] #[rustfmt::skip] macro_rules! try_boxfuture { ($e:expr) => { match $e { Ok(t) => t, Err(e) => return $crate::FutureExt::boxify($crate::futures_reexport::future::err(e.into())), } }; } /// Macro that can be used like `?` operator, but in the context where the expected return type is /// BoxStream. The result of it is either Ok part of Result or immediate returning the Err part /// converted into BoxStream. #[macro_export] #[rustfmt::skip] macro_rules! try_boxstream { ($e:expr) => { match $e { Ok(t) => t, Err(e) => return $crate::StreamExt::boxify($crate::futures_reexport::stream::once(Err(e.into()))), } }; } /// Macro that can be used like ensure! macro from failure crate, but in the context where the /// expected return type is BoxFuture. Exits a function early with an Error if the condition is not /// satisfied. #[macro_export] #[rustfmt::skip] macro_rules! ensure_boxfuture { ($cond:expr, $e:expr) => { if !($cond) { return $crate::FutureExt::boxify(::futures::future::err($e.into())); } }; } /// Macro that can be used like ensure! macro from failure crate, but in the context where the /// expected return type is BoxStream. Exits a function early with an Error if the condition is not /// satisfied. #[macro_export] #[rustfmt::skip] macro_rules! ensure_boxstream { ($cond:expr, $e:expr) => { if !($cond) { return $crate::StreamExt::boxify(::futures::stream::once(Err($e.into()))); } }; } /// Macro that can be used like `?` operator, but in the context where the expected return type is /// a left future. The result of it is either Ok part of Result or immediate returning the Err //part / converted into a a left future. #[macro_export] #[rustfmt::skip] macro_rules! try_left_future { ($e:expr) => { match $e { Ok(t) => t, Err(e) => return $crate::futures_reexport::future::err(e.into()).left_future(), } }; } /// Simple adapter from `Sink` interface to `AsyncWrite` interface. /// It can be useful to convert from the interface that supports only AsyncWrite, and get /// Stream as a result. pub struct SinkToAsyncWrite<S> { sink: S, } impl<S> SinkToAsyncWrite<S> { /// Return an instance of [SinkToAsyncWrite] wrapping a Sink pub fn new(sink: S) -> Self { SinkToAsyncWrite { sink } } } fn create_std_error<E: Debug>(err: E) -> std_io::Error { std_io::Error::new(std_io::ErrorKind::Other, format!("{:?}", err)) } impl<E, S> std_io::Write for SinkToAsyncWrite<S> where S: Sink<SinkItem = Bytes, SinkError = E>, E: Debug, { fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> { let bytes = Bytes::from(buf); match self.sink.start_send(bytes) { Ok(AsyncSink::Ready) => Ok(buf.len()), Ok(AsyncSink::NotReady(_)) => Err(std_io::Error::new( std_io::ErrorKind::WouldBlock, "channel is busy", )), Err(err) => Err(create_std_error(err)), } } fn flush(&mut self) -> std_io::Result<()> { match self.sink.poll_complete() { Ok(Async::Ready(())) => Ok(()), Ok(Async::NotReady) => Err(std_io::Error::new( std_io::ErrorKind::WouldBlock, "channel is busy", )), Err(err) => Err(create_std_error(err)), } } } impl<E, S> AsyncWrite for SinkToAsyncWrite<S> where S: Sink<SinkItem = Bytes, SinkError = E>, E: Debug, { fn shutdown(&mut self) -> Poll<(), std_io::Error> { match self.sink.close() { Ok(res) => Ok(res), Err(err) => Err(create_std_error(err)), } } } /// It's a combinator that converts `Stream<A>` into `Stream<Vec<A>>`. /// So interface is similar to `.chunks()` method, but there's an important difference: /// BatchStream won't wait until the whole batch fills up i.e. as soon as underlying stream /// return NotReady, then new batch is returned from BatchStream pub struct BatchStream<S> where S: Stream, { inner: stream::Fuse<S>, err: Option<S::Error>, limit: usize, } impl<S: Stream> BatchStream<S> { /// Return an instance of [BatchStream] wrapping a Stream with the provided limit set pub fn new(s: S, limit: usize) -> Self { Self { inner: s.fuse(), err: None, limit, } } } impl<S: Stream> Stream for BatchStream<S> { type Item = Vec<S::Item>; type Error = S::Error; fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { let mut batch = vec![]; if let Some(err) = self.err.take() { return Err(err); } while batch.len() < self.limit { match self.inner.poll() { Ok(Async::Ready(Some(v))) => batch.push(v), Ok(Async::NotReady) | Ok(Async::Ready(None)) => break, Err(err) => { self.err = Some(err); break; } } } if batch.is_empty() { if let Some(err) = self.err.take() { return Err(err); } if self.inner.is_done() { Ok(Async::Ready(None)) } else { Ok(Async::NotReady) } } else { Ok(Async::Ready(Some(batch))) } } } #[cfg(test)] mod test { use super::*; use anyhow::Result; use assert_matches::assert_matches; use futures::stream; use futures::sync::mpsc; use futures::IntoFuture; use futures::Stream; use futures03::compat::Future01CompatExt; use cloned::cloned; use futures::future::{err, ok}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use tokio::runtime::Runtime; #[derive(Debug)] struct MyErr; impl<T> From<mpsc::SendError<T>> for MyErr { fn from(_: mpsc::SendError<T>) -> Self { MyErr } } #[test] fn discard() { use futures::sync::mpsc; let runtime = Runtime::new().unwrap(); let (tx, rx) = mpsc::channel(1); let xfer = stream::iter_ok::<_, MyErr>(vec![123]).forward(tx); runtime.spawn(xfer.discard().compat()); match runtime.block_on(rx.collect().compat()) { Ok(v) => assert_eq!(v, vec![123]), bad => panic!("bad {:?}", bad), } } #[test] fn inspect_err() { let count = Arc::new(AtomicUsize::new(0)); cloned!(count as count_cloned); let runtime = Runtime::new().unwrap(); let work = err::<i32, i32>(42).inspect_err(move |e| { assert_eq!(42, *e); count_cloned.fetch_add(1, Ordering::SeqCst); }); if runtime.block_on(work.compat()).is_ok() { panic!("future is supposed to fail"); } assert_eq!(1, count.load(Ordering::SeqCst)); } #[test] fn inspect_ok() { let count = Arc::new(AtomicUsize::new(0)); cloned!(count as count_cloned); let runtime = Runtime::new().unwrap(); let work = ok::<i32, i32>(42).inspect_err(move |_| { count_cloned.fetch_add(1, Ordering::SeqCst); }); if runtime.block_on(work.compat()).is_err() { panic!("future is supposed to succeed"); } assert_eq!(0, count.load(Ordering::SeqCst)); } #[test] fn inspect_result() { let count = Arc::new(AtomicUsize::new(0)); cloned!(count as count_cloned); let runtime = Runtime::new().unwrap(); let work = err::<i32, i32>(42).inspect_result(move |res| { if let Err(e) = res { assert_eq!(42, *e); count_cloned.fetch_add(1, Ordering::SeqCst); } else { count_cloned.fetch_add(2, Ordering::SeqCst); } }); if runtime.block_on(work.compat()).is_ok() { panic!("future is supposed to fail"); } assert_eq!(1, count.load(Ordering::SeqCst)); } #[test] fn enumerate() { let s = stream::iter_ok::<_, ()>(vec!["hello", "there", "world"]); let es = Enumerate::new(s); let v = es.collect().wait(); assert_eq!(v, Ok(vec![(0, "hello"), (1, "there"), (2, "world")])); } #[test] fn empty() { let mut s = stream::empty::<(), ()>(); // Ensure that the stream doesn't have to be consumed. assert!(s.by_ref().is_empty().wait().unwrap()); assert!(!s.not_empty().wait().unwrap()); let mut s = stream::once::<_, ()>(Ok("foo")); assert!(!s.by_ref().is_empty().wait().unwrap()); // The above is_empty would consume the first element, so the stream has to be // reinitialized. let s = stream::once::<_, ()>(Ok("foo")); assert!(s.not_empty().wait().unwrap()); } #[test] fn return_remainder() { use futures::future::poll_fn; let s = stream::iter_ok::<_, ()>(vec!["hello", "there", "world"]).fuse(); let (mut s, mut remainder) = s.return_remainder(); let runtime = Runtime::new().unwrap(); let res: Result<(), ()> = runtime.block_on( poll_fn(move || { assert_matches!( remainder.poll(), Err(ConservativeReceiverError::ReceiveBeforeSend) ); assert_eq!(s.poll(), Ok(Async::Ready(Some("hello")))); assert_matches!( remainder.poll(), Err(ConservativeReceiverError::ReceiveBeforeSend) ); assert_eq!(s.poll(), Ok(Async::Ready(Some("there")))); assert_matches!( remainder.poll(), Err(ConservativeReceiverError::ReceiveBeforeSend) ); assert_eq!(s.poll(), Ok(Async::Ready(Some("world")))); assert_matches!( remainder.poll(), Err(ConservativeReceiverError::ReceiveBeforeSend) ); assert_eq!(s.poll(), Ok(Async::Ready(None))); match remainder.poll() { Ok(Async::Ready(s)) => assert!(s.is_done()), bad => panic!("unexpected result: {:?}", bad), } Ok(Async::Ready(())) }) .compat(), ); assert_matches!(res, Ok(())); } fn assert_flush<E, S>(sink: &mut SinkToAsyncWrite<S>) where S: Sink<SinkItem = Bytes, SinkError = E>, E: Debug, { use std::io::Write; loop { let flush_res = sink.flush(); if flush_res.is_ok() { break; } if let Err(ref e) = flush_res { assert_eq!(e.kind(), std_io::ErrorKind::WouldBlock); } } } fn assert_shutdown<E, S>(sink: &mut SinkToAsyncWrite<S>) where S: Sink<SinkItem = Bytes, SinkError = E>, E: Debug, { loop { let shutdown_res = sink.shutdown(); if shutdown_res.is_ok() { break; } if let Err(ref e) = shutdown_res { assert_eq!(e.kind(), std_io::ErrorKind::WouldBlock); } } } #[test] fn sink_to_async_write() { use futures::sync::mpsc; use std::io::Write; let rt = tokio::runtime::Runtime::new().unwrap(); let (tx, rx) = mpsc::channel::<Bytes>(1); let messages_num = 10; rt.spawn( Ok::<_, ()>(()) .into_future() .map(move |()| { let mut async_write = SinkToAsyncWrite::new(tx); for i in 0..messages_num { loop { let res = async_write.write(format!("{}", i).as_bytes()); if let Err(ref e) = res { assert_eq!(e.kind(), std_io::ErrorKind::WouldBlock); assert_flush(&mut async_write); } else { break; } } } assert_flush(&mut async_write); assert_shutdown(&mut async_write); }) .compat(), ); let res = rt.block_on(rx.collect().compat()).unwrap(); assert_eq!(res.len(), messages_num); } #[test] fn test_buffered() { type TestStream = BoxStream<(BoxFuture<(), ()>, u64), ()>; fn create_stream() -> (Arc<AtomicUsize>, TestStream) { let s: TestStream = stream::iter_ok(vec![ (future::ok(()).boxify(), 100), (future::ok(()).boxify(), 2), ]) .boxify(); let counter = Arc::new(AtomicUsize::new(0)); ( counter.clone(), s.inspect({ move |_val| { counter.fetch_add(1, Ordering::SeqCst); } }) .boxify(), ) } let runtime = tokio::runtime::Runtime::new().unwrap(); let (counter, s) = create_stream(); let params = BufferedParams { weight_limit: 10, buffer_size: 10, }; let s = s.buffered_weight_limited(params); if let Ok((Some(()), s)) = runtime.block_on(s.into_future().compat()) { assert_eq!(counter.load(Ordering::SeqCst), 1); assert_eq!(runtime.block_on(s.collect().compat()).unwrap().len(), 1); assert_eq!(counter.load(Ordering::SeqCst), 2); } else { panic!("failed to block on a stream"); } let (counter, s) = create_stream(); let params = BufferedParams { weight_limit: 200, buffer_size: 10, }; let s = s.buffered_weight_limited(params); if let Ok((Some(()), s)) = runtime.block_on(s.into_future().compat()) { assert_eq!(counter.load(Ordering::SeqCst), 2); assert_eq!(runtime.block_on(s.collect().compat()).unwrap().len(), 1); assert_eq!(counter.load(Ordering::SeqCst), 2); } else { panic!("failed to block on a stream"); } } use std::collections::HashSet; fn assert_same_elements<I, T>(src: Vec<I>, iter: T) where I: Copy + Debug + Ord, T: IntoIterator<Item = I>, { let mut dst_sorted: Vec<I> = iter.into_iter().collect(); dst_sorted.sort(); let mut src_sorted = src; src_sorted.sort(); assert_eq!(src_sorted, dst_sorted); } #[test] fn collect_into_vec() { let items = vec![1, 2, 3]; let future = futures::stream::iter_ok::<_, ()>(items.clone()).collect_to::<Vec<i32>>(); let runtime = Runtime::new().unwrap(); match runtime.block_on(future.compat()) { Ok(collections) => assert_same_elements(items, collections), Err(()) => panic!("future is supposed to succeed"), } } #[test] fn collect_into_set() { let items = vec![1, 2, 3]; let future = futures::stream::iter_ok::<_, ()>(items.clone()).collect_to::<HashSet<i32>>(); let runtime = Runtime::new().unwrap(); match runtime.block_on(future.compat()) { Ok(collections) => assert_same_elements(items, collections), Err(()) => panic!("future is supposed to succeed"), } } }