in modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/NestedLoopJoinNode.java [469:553]
protected void join() throws Exception {
if (waitingRight == NOT_WAITING) {
if (rightNotMatchedIndexes == null) {
rightNotMatchedIndexes = new BitSet(rightMaterialized.size());
rightNotMatchedIndexes.set(0, rightMaterialized.size());
}
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty())) {
if (left == null) {
left = leftInBuf.remove();
}
while (requested > 0 && rightIdx < rightMaterialized.size()) {
checkState();
RowT right = rightMaterialized.get(rightIdx++);
if (!cond.test(left, right)) {
continue;
}
requested--;
rightNotMatchedIndexes.clear(rightIdx - 1);
RowT joined = handler.concat(left, right);
downstream().push(joined);
}
if (rightIdx == rightMaterialized.size()) {
left = null;
rightIdx = 0;
}
}
} finally {
inLoop = false;
}
}
if (waitingLeft == NOT_WAITING && requested > 0 && (rightNotMatchedIndexes != null && !rightNotMatchedIndexes.isEmpty())) {
assert lastPushedInd >= 0;
inLoop = true;
try {
for (lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd); ;
lastPushedInd = rightNotMatchedIndexes.nextSetBit(lastPushedInd + 1)
) {
checkState();
if (lastPushedInd < 0) {
break;
}
RowT row = handler.concat(leftRowFactory.create(), rightMaterialized.get(lastPushedInd));
rightNotMatchedIndexes.clear(lastPushedInd);
requested--;
downstream().push(row);
if (lastPushedInd == Integer.MAX_VALUE || requested <= 0) {
break;
}
}
} finally {
inLoop = false;
}
}
if (waitingRight == 0) {
rightSource().request(waitingRight = inBufSize);
}
if (waitingLeft == 0 && leftInBuf.isEmpty()) {
leftSource().request(waitingLeft = inBufSize);
}
if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && left == null
&& leftInBuf.isEmpty() && rightNotMatchedIndexes.isEmpty()) {
requested = 0;
downstream().end();
}
}