fn test_buffered()

in shed/futures_01_ext/src/lib.rs [1114:1166]


    fn test_buffered() {
        type TestStream = BoxStream<(BoxFuture<(), ()>, u64), ()>;

        fn create_stream() -> (Arc<AtomicUsize>, TestStream) {
            let s: TestStream = stream::iter_ok(vec![
                (future::ok(()).boxify(), 100),
                (future::ok(()).boxify(), 2),
            ])
            .boxify();

            let counter = Arc::new(AtomicUsize::new(0));

            (
                counter.clone(),
                s.inspect({
                    move |_val| {
                        counter.fetch_add(1, Ordering::SeqCst);
                    }
                })
                .boxify(),
            )
        }

        let runtime = tokio::runtime::Runtime::new().unwrap();

        let (counter, s) = create_stream();
        let params = BufferedParams {
            weight_limit: 10,
            buffer_size: 10,
        };
        let s = s.buffered_weight_limited(params);
        if let Ok((Some(()), s)) = runtime.block_on(s.into_future().compat()) {
            assert_eq!(counter.load(Ordering::SeqCst), 1);
            assert_eq!(runtime.block_on(s.collect().compat()).unwrap().len(), 1);
            assert_eq!(counter.load(Ordering::SeqCst), 2);
        } else {
            panic!("failed to block on a stream");
        }

        let (counter, s) = create_stream();
        let params = BufferedParams {
            weight_limit: 200,
            buffer_size: 10,
        };
        let s = s.buffered_weight_limited(params);
        if let Ok((Some(()), s)) = runtime.block_on(s.into_future().compat()) {
            assert_eq!(counter.load(Ordering::SeqCst), 2);
            assert_eq!(runtime.block_on(s.collect().compat()).unwrap().len(), 1);
            assert_eq!(counter.load(Ordering::SeqCst), 2);
        } else {
            panic!("failed to block on a stream");
        }
    }