private void drain()

in modules/core/src/main/java/org/apache/ignite/internal/util/subscription/OrderedMergePublisher.java [235:372]


        private void drain() {
            // Only one thread can pass below.
            if (guardCntr.getAndIncrement() != 0) {
                return;
            }

            // Frequently accessed fields.
            Subscriber<? super T> downstream = this.downstream;
            OrderedMergeSubscriber<T>[] subscribers = this.subscribers;
            Object[] values = this.values;

            long emitted = this.emitted;

            // Retry loop.
            for (; ; ) {
                switch ((State) STATE.getAcquire(this)) {
                    case INITIAL: {
                        int waiting = this.waiting;

                        // Moves non-initialized sources to the beginning of the array for faster array scans
                        // in the case of long initialization.
                        for (int i = 0; i < waiting; ) {
                            boolean innerDone = subscribers[0].done; // Read before polling to preserve correct program order.
                            Object obj = subscribers[0].queue.poll();

                            int done = (obj == null && innerDone) ? 1 : 0; // Flag has no effect if poll was successful.
                            int initialized = obj != null || innerDone ? 1 : 0;

                            values[0] = done > 0 ? DONE : obj;

                            waiting -= initialized;

                            int move = initialized * waiting; // No effect if value wasn't initialized.
                            swap(values, 0, move);
                            swap(subscribers, 0, move);

                            i = (initialized == 0) ? waiting : i; // Exit if any value was not found.
                        }

                        this.waiting = waiting;

                        if (waiting == 0) {
                            // Got first rows from all subscribers.
                            // Add all non-completed sources to the priority queue.
                            for (int i = 0; i < values.length; i++) {
                                if (values[i] != DONE) {
                                    valuesQueue.enqueue(i);
                                }
                            }

                            // Then either start merge process or proceed with finishing if there is nothing to do.
                            State state = valuesQueue.isEmpty() ? State.COMPLETING : State.RUNNING;
                            STATE.compareAndSet(this, State.INITIAL, state);

                            continue;
                        }

                        break;
                    }
                    case RUNNING: {
                        long requested = (long) REQUESTED.getAcquire(this);

                        // Emit loop.
                        while (!valuesQueue.isEmpty()) {
                            int minIndex = valuesQueue.first();

                            if (values[minIndex] == null) {
                                boolean done = subscribers[minIndex].done;
                                T val = subscribers[minIndex].queue.poll();

                                if (val != null) {
                                    values[minIndex] = val;
                                    valuesQueue.changed(); // Force queue move the new value to it's place.
                                    minIndex = valuesQueue.first();
                                } else if (done) {
                                    // No more values to emit for the current source, remove it from queue.
                                    valuesQueue.dequeue();
                                    continue;
                                } else {
                                    // Nothing to do, value wasn't received yet.
                                    break;
                                }
                            }

                            if (emitted == requested) {
                                break;
                            }

                            downstream.onNext((T) values[minIndex]);
                            emitted++;

                            values[minIndex] = null;
                            subscribers[minIndex].request(1);
                        }

                        if (valuesQueue.isEmpty()) {
                            STATE.compareAndSet(this, State.RUNNING, State.COMPLETING);

                            continue;
                        }

                        break;
                    }
                    case COMPLETING: {
                        STATE.set(this, State.STOP);

                        // If subscription was not cancelled, there is no need to notify downstream.
                        if (!(boolean) CANCELLED.getAcquire(this)) {
                            assert valuesQueue.isEmpty();

                            finish(downstream);
                        }

                        // Cleanup.
                        Arrays.fill(values, null);
                        for (OrderedMergeSubscriber<T> inner : subscribers) {
                            inner.queue.clear();
                        }
                        // No need to release guard.
                        return;
                    }
                    case STOP: {
                        // Terminal state. No need to release guard.
                        return;
                    }
                    default:
                        throw new IllegalStateException("Should never get here.");
                }

                this.emitted = emitted;

                // Retry if any other thread has incremented the counter.
                if (guardCntr.decrementAndGet() == 0) {
                    break;
                }
                guardCntr.set(1);
            }
        }