in modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java [235:372]
private void drain() {
// Only one thread can pass below.
if (guardCntr.getAndIncrement() != 0) {
return;
}
// Frequently accessed fields.
Subscriber<? super T> downstream = this.downstream;
OrderedMergeSubscriber<T>[] subscribers = this.subscribers;
Object[] values = this.values;
long emitted = this.emitted;
// Retry loop.
for (; ; ) {
switch ((State) STATE.getAcquire(this)) {
case INITIAL: {
int waiting = this.waiting;
// Moves non-initialized sources to the beginning of the array for faster array scans
// in the case of long initialization.
for (int i = 0; i < waiting; ) {
boolean innerDone = subscribers[0].done; // Read before polling to preserve correct program order.
Object obj = subscribers[0].queue.poll();
int done = (obj == null && innerDone) ? 1 : 0; // Flag has no effect if poll was successful.
int initialized = obj != null || innerDone ? 1 : 0;
values[0] = done > 0 ? DONE : obj;
waiting -= initialized;
int move = initialized * waiting; // No effect if value wasn't initialized.
swap(values, 0, move);
swap(subscribers, 0, move);
i = (initialized == 0) ? waiting : i; // Exit if any value was not found.
}
this.waiting = waiting;
if (waiting == 0) {
// Got first rows from all subscribers.
// Add all non-completed sources to the priority queue.
for (int i = 0; i < values.length; i++) {
if (values[i] != DONE) {
valuesQueue.enqueue(i);
}
}
// Then either start merge process or proceed with finishing if there is nothing to do.
State state = valuesQueue.isEmpty() ? State.COMPLETING : State.RUNNING;
STATE.compareAndSet(this, State.INITIAL, state);
continue;
}
break;
}
case RUNNING: {
long requested = (long) REQUESTED.getAcquire(this);
// Emit loop.
while (!valuesQueue.isEmpty()) {
int minIndex = valuesQueue.first();
if (values[minIndex] == null) {
boolean done = subscribers[minIndex].done;
T val = subscribers[minIndex].queue.poll();
if (val != null) {
values[minIndex] = val;
valuesQueue.changed(); // Force queue move the new value to it's place.
minIndex = valuesQueue.first();
} else if (done) {
// No more values to emit for the current source, remove it from queue.
valuesQueue.dequeue();
continue;
} else {
// Nothing to do, value wasn't received yet.
break;
}
}
if (emitted == requested) {
break;
}
downstream.onNext((T) values[minIndex]);
emitted++;
values[minIndex] = null;
subscribers[minIndex].request(1);
}
if (valuesQueue.isEmpty()) {
STATE.compareAndSet(this, State.RUNNING, State.COMPLETING);
continue;
}
break;
}
case COMPLETING: {
STATE.set(this, State.STOP);
// If subscription was not cancelled, there is no need to notify downstream.
if (!(boolean) CANCELLED.getAcquire(this)) {
assert valuesQueue.isEmpty();
finish(downstream);
}
// Cleanup.
Arrays.fill(values, null);
for (OrderedMergeSubscriber<T> inner : subscribers) {
inner.queue.clear();
}
// No need to release guard.
return;
}
case STOP: {
// Terminal state. No need to release guard.
return;
}
default:
throw new IllegalStateException("Should never get here.");
}
this.emitted = emitted;
// Retry if any other thread has incremented the counter.
if (guardCntr.decrementAndGet() == 0) {
break;
}
guardCntr.set(1);
}
}