in modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java [635:777]
protected void join() throws Exception {
inLoop = true;
try {
while (requested > 0 && !(left == null && leftInBuf.isEmpty() && waitingLeft != NOT_WAITING)
&& (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) {
checkState();
if (left == null && !leftInBuf.isEmpty()) {
left = leftInBuf.remove();
}
if (right == null) {
if (rightInBuf.isEmpty() && waitingRight != NOT_WAITING) {
break;
}
if (!rightInBuf.isEmpty()) {
right = rightInBuf.remove();
matched = false;
}
}
if (right == null && rightMaterialization != null && !drainMaterialization) {
drainMaterialization = true;
left = null;
continue;
}
RowT row;
if (!drainMaterialization) {
if (left == null) {
if (!matched) {
row = handler.concat(leftRowFactory.create(), right);
requested--;
downstream().push(row);
}
right = null;
continue;
}
int cmp = comp.compare(left, right);
if (cmp < 0) {
left = null;
rightIdx = 0;
if (rightMaterialization != null) {
drainMaterialization = true;
}
continue;
} else if (cmp > 0) {
if (!matched) {
row = handler.concat(leftRowFactory.create(), right);
requested--;
downstream().push(row);
}
right = null;
rightIdx = 0;
rightMaterialization = null;
continue;
}
if (rightMaterialization == null && (!rightInBuf.isEmpty() || waitingRight != NOT_WAITING)) {
if (rightInBuf.isEmpty()) {
break;
}
if (comp.compare(left, rightInBuf.peek()) == 0) {
rightMaterialization = new ArrayList<>();
}
}
matched = true;
row = handler.concat(left, right);
if (rightMaterialization != null) {
rightMaterialization.add(right);
right = null;
} else {
left = null;
}
} else {
if (left == null) {
if (waitingLeft == NOT_WAITING) {
rightMaterialization = null;
}
break;
}
if (rightIdx >= rightMaterialization.size()) {
rightIdx = 0;
left = null;
continue;
}
RowT right = rightMaterialization.get(rightIdx++);
int cmp = comp.compare(left, right);
if (cmp > 0) {
rightIdx = 0;
rightMaterialization = null;
drainMaterialization = false;
continue;
}
row = handler.concat(left, right);
}
requested--;
downstream().push(row);
}
} finally {
inLoop = false;
}
if (waitingRight == 0) {
rightSource().request(waitingRight = inBufSize);
}
if (waitingLeft == 0) {
leftSource().request(waitingLeft = inBufSize);
}
if (requested > 0 && waitingRight == NOT_WAITING && right == null && rightInBuf.isEmpty() && rightMaterialization == null) {
requested = 0;
downstream().end();
}
}