in mantis-common/src/main/java/io/reactivx/mantis/operators/BufferOnBackPressureOperator.java [100:199]
public Subscriber<? super T> call(final Subscriber<? super T> child) {
subscribe.increment();
final AtomicLong requested = new AtomicLong();
final AtomicInteger completionEmitted = new AtomicInteger();
final AtomicInteger terminated = new AtomicInteger();
final AtomicInteger bufferedCount = new AtomicInteger();
final AtomicBoolean onCompleteReceived = new AtomicBoolean();
final AtomicInteger wip = new AtomicInteger();
child.add(Subscriptions.create(new Action0() {
@Override
public void call() {
subscribe.decrement();
}
}));
child.setProducer(new Producer() {
@Override
public void request(long n) {
requested.getAndAdd(n);
requestedGauge.increment(n);
// System.out.println("request: " + requested.get());
pollQueue(child,
requested,
bufferedCount,
onCompleteReceived,
completionEmitted,
wip);
}
});
Subscriber<T> parent = new Subscriber<T>() {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}
@Override
public void onCompleted() {
if (terminated.compareAndSet(0, 1)) {
complete.increment();
onCompleteReceived.set(true);
pollQueue(child,
requested,
bufferedCount,
onCompleteReceived,
completionEmitted,
wip);
}
}
@Override
public void onError(Throwable e) {
if (terminated.compareAndSet(0, 1)) {
child.onError(e);
error.increment();
queue.clear();
}
}
@Override
public void onNext(T t) {
emitItem(NotificationLite.next(t));
}
private void emitItem(Object item) {
// short circuit buffering
if (requested.get() > 0 && queue.isEmpty()) {
NotificationLite.accept((Observer) child, item);
requested.decrementAndGet();
requestedGauge.decrement();
next.increment();
// System.out.println("next count: " + next.value());
} else {
boolean success = queue.offer(item);
if (success) {
bufferedCount.incrementAndGet();
bufferedGauge.increment();
// System.out.println("buffered count: " + bufferedGauge.value());
drainIfPossible(child, requested, bufferedCount, onCompleteReceived, completionEmitted);
} else {
dropped.increment();
// System.out.println("dropped count: " + dropped.value());
// dropped
}
}
}
};
// if child unsubscribes it should unsubscribe the parent, but not the other way around
child.add(parent);
return parent;
}