in shed/futures_01_ext/src/lib.rs [1114:1166]
fn test_buffered() {
type TestStream = BoxStream<(BoxFuture<(), ()>, u64), ()>;
fn create_stream() -> (Arc<AtomicUsize>, TestStream) {
let s: TestStream = stream::iter_ok(vec![
(future::ok(()).boxify(), 100),
(future::ok(()).boxify(), 2),
])
.boxify();
let counter = Arc::new(AtomicUsize::new(0));
(
counter.clone(),
s.inspect({
move |_val| {
counter.fetch_add(1, Ordering::SeqCst);
}
})
.boxify(),
)
}
let runtime = tokio::runtime::Runtime::new().unwrap();
let (counter, s) = create_stream();
let params = BufferedParams {
weight_limit: 10,
buffer_size: 10,
};
let s = s.buffered_weight_limited(params);
if let Ok((Some(()), s)) = runtime.block_on(s.into_future().compat()) {
assert_eq!(counter.load(Ordering::SeqCst), 1);
assert_eq!(runtime.block_on(s.collect().compat()).unwrap().len(), 1);
assert_eq!(counter.load(Ordering::SeqCst), 2);
} else {
panic!("failed to block on a stream");
}
let (counter, s) = create_stream();
let params = BufferedParams {
weight_limit: 200,
buffer_size: 10,
};
let s = s.buffered_weight_limited(params);
if let Ok((Some(()), s)) = runtime.block_on(s.into_future().compat()) {
assert_eq!(counter.load(Ordering::SeqCst), 2);
assert_eq!(runtime.block_on(s.collect().compat()).unwrap().len(), 1);
assert_eq!(counter.load(Ordering::SeqCst), 2);
} else {
panic!("failed to block on a stream");
}
}