in hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeRangeSearchCursor.java [78:178]
protected void checkPriorityQueue() throws HyracksDataException, IndexException {
while (!outputPriorityQueue.isEmpty() || needPush == true) {
if (!outputPriorityQueue.isEmpty()) {
PriorityQueueElement checkElement = outputPriorityQueue.peek();
if (proceed && !searchCallback.proceed(checkElement.getTuple())) {
if (includeMutableComponent) {
PriorityQueueElement mutableElement = null;
boolean mutableElementFound = false;
// scan the PQ for the mutable component's element
Iterator<PriorityQueueElement> it = outputPriorityQueue.iterator();
while (it.hasNext()) {
mutableElement = it.next();
if (mutableElement.getCursorIndex() == 0) {
mutableElementFound = true;
it.remove();
break;
}
}
if (mutableElementFound) {
// copy the in-mem tuple
if (tupleBuilder == null) {
tupleBuilder = new ArrayTupleBuilder(cmp.getKeyFieldCount());
}
TupleUtils.copyTuple(tupleBuilder, mutableElement.getTuple(), cmp.getKeyFieldCount());
copyTuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
// unlatch/unpin
rangeCursors[0].reset();
// reconcile
if (checkElement.getCursorIndex() == 0) {
searchCallback.reconcile(copyTuple);
} else {
searchCallback.reconcile(checkElement.getTuple());
searchCallback.complete(checkElement.getTuple());
}
// retraverse
reusablePred.setLowKey(copyTuple, true);
btreeAccessors[0].search(rangeCursors[0], reusablePred);
boolean isNotExhaustedCursor = pushIntoPriorityQueue(mutableElement);
if (checkElement.getCursorIndex() == 0) {
if (!isNotExhaustedCursor || cmp.compare(copyTuple, mutableElement.getTuple()) != 0) {
searchCallback.complete(copyTuple);
searchCallback.cancel(copyTuple);
continue;
}
searchCallback.complete(copyTuple);
}
} else {
// the mutable cursor is exhausted
searchCallback.reconcile(checkElement.getTuple());
}
} else {
searchCallback.reconcile(checkElement.getTuple());
}
}
// If there is no previous tuple or the previous tuple can be ignored
if (outputElement == null) {
if (isDeleted(checkElement) && !returnDeletedTuples) {
// If the key has been deleted then pop it and set needPush to true.
// We cannot push immediately because the tuple may be
// modified if hasNext() is called
outputElement = outputPriorityQueue.poll();
searchCallback.cancel(checkElement.getTuple());
needPush = true;
proceed = false;
} else {
break;
}
} else {
// Compare the previous tuple and the head tuple in the PQ
if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
// If the previous tuple and the head tuple are
// identical
// then pop the head tuple and push the next tuple from
// the tree of head tuple
// the head element of PQ is useless now
PriorityQueueElement e = outputPriorityQueue.poll();
pushIntoPriorityQueue(e);
} else {
// If the previous tuple and the head tuple are different
// the info of previous tuple is useless
if (needPush == true) {
pushIntoPriorityQueue(outputElement);
needPush = false;
}
proceed = true;
outputElement = null;
}
}
} else {
// the priority queue is empty and needPush
pushIntoPriorityQueue(outputElement);
needPush = false;
outputElement = null;
proceed = true;
}
}
}