in modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java [493:633]
protected void join() throws Exception {
int processed = 0;
inLoop = true;
try {
while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty()
|| rightMaterialization != null || waitingRight == NOT_WAITING)) {
if (processed++ > inBufSize) {
// Allow others to do their job.
execute(this::join);
return;
}
if (left == null) {
left = leftInBuf.remove();
matched = false;
}
if (right == null) {
if (rightInBuf.isEmpty() && waitingRight != NOT_WAITING) {
break;
}
if (!rightInBuf.isEmpty()) {
right = rightInBuf.remove();
}
}
if (right == null && rightMaterialization != null && !drainMaterialization) {
drainMaterialization = true;
left = null;
continue;
}
RowT row;
if (!drainMaterialization) {
if (right == null) {
row = outputProjection.project(context(), left, rightRowFactory.create());
requested--;
downstream().push(row);
left = null;
continue;
}
int cmp = comp.compare(left, right);
if (cmp < 0) {
if (!matched) {
row = outputProjection.project(context(), left, rightRowFactory.create());
requested--;
downstream().push(row);
}
left = null;
rightIdx = 0;
if (rightMaterialization != null) {
drainMaterialization = true;
}
continue;
} else if (cmp > 0) {
right = null;
rightIdx = 0;
rightMaterialization = null;
continue;
}
matched = true;
if (rightMaterialization == null && (!rightInBuf.isEmpty() || waitingRight != NOT_WAITING)) {
if (rightInBuf.isEmpty()) {
break;
}
if (comp.compare(left, rightInBuf.peek()) == 0) {
rightMaterialization = new ArrayList<>();
}
}
row = outputProjection.project(context(), left, right);
if (rightMaterialization != null) {
rightMaterialization.add(right);
right = null;
} else {
left = null;
}
} else {
if (rightIdx >= rightMaterialization.size()) {
rightIdx = 0;
left = null;
continue;
}
RowT right = rightMaterialization.get(rightIdx++);
int cmp = comp.compare(left, right);
if (cmp > 0) {
rightIdx = 0;
rightMaterialization = null;
drainMaterialization = false;
continue;
}
row = outputProjection.project(context(), left, right);
}
requested--;
downstream().push(row);
}
} finally {
inLoop = false;
}
if (requested > 0 && waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty()) {
requested = 0;
downstream().end();
return;
}
if (waitingRight == 0) {
rightSource().request(waitingRight = inBufSize);
}
if (waitingLeft == 0) {
leftSource().request(waitingLeft = inBufSize);
}
}