public Subscriber call()

in mantis-common/src/main/java/io/reactivx/mantis/operators/BufferOnBackPressureOperator.java [100:199]


    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        subscribe.increment();
        final AtomicLong requested = new AtomicLong();
        final AtomicInteger completionEmitted = new AtomicInteger();
        final AtomicInteger terminated = new AtomicInteger();

        final AtomicInteger bufferedCount = new AtomicInteger();
        final AtomicBoolean onCompleteReceived = new AtomicBoolean();

        final AtomicInteger wip = new AtomicInteger();
        child.add(Subscriptions.create(new Action0() {
            @Override
            public void call() {
                subscribe.decrement();
            }
        }));

        child.setProducer(new Producer() {

            @Override
            public void request(long n) {
                requested.getAndAdd(n);
                requestedGauge.increment(n);
                //         System.out.println("request: " + requested.get());
                pollQueue(child,
                        requested,

                        bufferedCount,
                        onCompleteReceived,
                        completionEmitted,
                        wip);
            }

        });

        Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onStart() {
                request(Long.MAX_VALUE);
            }

            @Override
            public void onCompleted() {
                if (terminated.compareAndSet(0, 1)) {
                    complete.increment();
                    onCompleteReceived.set(true);
                    pollQueue(child,
                            requested,

                            bufferedCount,
                            onCompleteReceived,
                            completionEmitted,
                            wip);
                }
            }

            @Override
            public void onError(Throwable e) {
                if (terminated.compareAndSet(0, 1)) {
                    child.onError(e);
                    error.increment();
                    queue.clear();
                }
            }

            @Override
            public void onNext(T t) {
                emitItem(NotificationLite.next(t));
            }

            private void emitItem(Object item) {
                // short circuit buffering
                if (requested.get() > 0 && queue.isEmpty()) {
                    NotificationLite.accept((Observer) child, item);
                    requested.decrementAndGet();
                    requestedGauge.decrement();
                    next.increment();
                    //		System.out.println("next count: " + next.value());
                } else {
                    boolean success = queue.offer(item);
                    if (success) {
                        bufferedCount.incrementAndGet();
                        bufferedGauge.increment();
                        //				System.out.println("buffered count: " + bufferedGauge.value());
                        drainIfPossible(child, requested, bufferedCount, onCompleteReceived, completionEmitted);

                    } else {
                        dropped.increment();
                        //			System.out.println("dropped count: " + dropped.value());
                        // dropped
                    }
                }
            }


        };
        // if child unsubscribes it should unsubscribe the parent, but not the other way around
        child.add(parent);
        return parent;
    }