protected void checkPriorityQueue()

in hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java [78:178]


    protected void checkPriorityQueue() throws HyracksDataException, IndexException {
        while (!outputPriorityQueue.isEmpty() || needPush == true) {
            if (!outputPriorityQueue.isEmpty()) {
                PriorityQueueElement checkElement = outputPriorityQueue.peek();
                if (proceed && !searchCallback.proceed(checkElement.getTuple())) {
                    if (includeMutableComponent) {
                        PriorityQueueElement mutableElement = null;
                        boolean mutableElementFound = false;
                        // scan the PQ for the mutable component's element
                        Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
                        while (it.hasNext()) {
                            mutableElement = it.next();
                            if (mutableElement.getCursorIndex() == 0) {
                                mutableElementFound = true;
                                it.remove();
                                break;
                            }
                        }
                        if (mutableElementFound) {
                            // copy the in-mem tuple
                            if (tupleBuilder == null) {
                                tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
                            }
                            TupleUtils.copyTuple(tupleBuilder, mutableElement.getTuple(), cmp.getKeyFieldCount());
                            copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());

                            // unlatch/unpin
                            rangeCursors[0].reset();

                            // reconcile
                            if (checkElement.getCursorIndex() == 0) {
                                searchCallback.reconcile(copyTuple);
                            } else {
                                searchCallback.reconcile(checkElement.getTuple());
                                searchCallback.complete(checkElement.getTuple());
                            }
                            // retraverse
                            reusablePred.setLowKey(copyTuple, true);
                            btreeAccessors[0].search(rangeCursors[0], reusablePred);
                            boolean isNotExhaustedCursor = pushIntoPriorityQueue(mutableElement);

                            if (checkElement.getCursorIndex() == 0) {
                                if (!isNotExhaustedCursor || cmp.compare(copyTuple, mutableElement.getTuple()) != 0) {
                                    searchCallback.complete(copyTuple);
                                    searchCallback.cancel(copyTuple);
                                    continue;
                                }
                                searchCallback.complete(copyTuple);
                            }
                        } else {
                            // the mutable cursor is exhausted
                            searchCallback.reconcile(checkElement.getTuple());
                        }
                    } else {
                        searchCallback.reconcile(checkElement.getTuple());
                    }
                }
                // If there is no previous tuple or the previous tuple can be ignored
                if (outputElement == null) {
                    if (isDeleted(checkElement) && !returnDeletedTuples) {
                        // If the key has been deleted then pop it and set needPush to true.
                        // We cannot push immediately because the tuple may be
                        // modified if hasNext() is called
                        outputElement = outputPriorityQueue.poll();
                        searchCallback.cancel(checkElement.getTuple());
                        needPush = true;
                        proceed = false;
                    } else {
                        break;
                    }
                } else {
                    // Compare the previous tuple and the head tuple in the PQ
                    if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
                        // If the previous tuple and the head tuple are
                        // identical
                        // then pop the head tuple and push the next tuple from
                        // the tree of head tuple

                        // the head element of PQ is useless now
                        PriorityQueueElement e = outputPriorityQueue.poll();
                        pushIntoPriorityQueue(e);
                    } else {
                        // If the previous tuple and the head tuple are different
                        // the info of previous tuple is useless
                        if (needPush == true) {
                            pushIntoPriorityQueue(outputElement);
                            needPush = false;
                        }
                        proceed = true;
                        outputElement = null;
                    }
                }
            } else {
                // the priority queue is empty and needPush
                pushIntoPriorityQueue(outputElement);
                needPush = false;
                outputElement = null;
                proceed = true;
            }
        }
    }