in benchmarks/src/jmh/java/benchmarks/flow/scrabble/optimizations/FlowableSplit.java [187:321]
void drain() {
if (getAndIncrement() != 0) {
return;
}
SimplePlainQueue<String[]> q = queue;
int missed = 1;
int consumed = produced;
String[] array = current;
int idx = index;
int emptyCount = empty;
Subscriber<? super String> a = downstream;
for (;;) {
long r = requested.get();
long e = 0;
while (e != r) {
if (cancelled) {
current = null;
q.clear();
return;
}
boolean d = done;
if (array == null) {
array = q.poll();
if (array != null) {
current = array;
if (++consumed == limit) {
consumed = 0;
upstream.request(limit);
}
}
}
boolean empty = array == null;
if (d && empty) {
current = null;
Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
}
return;
}
if (empty) {
break;
}
if (array.length == idx + 1) {
array = null;
current = null;
idx = 0;
continue;
}
String v = array[idx];
if (v.isEmpty()) {
emptyCount++;
idx++;
} else {
while (emptyCount != 0 && e != r) {
if (cancelled) {
current = null;
q.clear();
return;
}
a.onNext("");
e++;
emptyCount--;
}
if (e != r && emptyCount == 0) {
a.onNext(v);
e++;
idx++;
}
}
}
if (e == r) {
if (cancelled) {
current = null;
q.clear();
return;
}
boolean d = done;
if (array == null) {
array = q.poll();
if (array != null) {
current = array;
if (++consumed == limit) {
consumed = 0;
upstream.request(limit);
}
}
}
boolean empty = array == null;
if (d && empty) {
current = null;
Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
}
return;
}
}
if (e != 0L) {
BackpressureHelper.produced(requested, e);
}
empty = emptyCount;
produced = consumed;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}