protected void join()

in modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinNode.java [450:582]


        protected void join() throws Exception {
            inLoop = true;
            try {
                while (requested > 0 && (left != null || !leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty()
                        || rightMaterialization != null || waitingRight == NOT_WAITING)) {
                    checkState();

                    if (left == null) {
                        left = leftInBuf.remove();

                        matched = false;
                    }

                    if (right == null) {
                        if (rightInBuf.isEmpty() && waitingRight != NOT_WAITING) {
                            break;
                        }

                        if (!rightInBuf.isEmpty()) {
                            right = rightInBuf.remove();
                        }
                    }

                    if (right == null && rightMaterialization != null && !drainMaterialization) {
                        drainMaterialization = true;
                        left = null;

                        continue;
                    }

                    RowT row;
                    if (!drainMaterialization) {
                        if (right == null) {
                            row = handler.concat(left, rightRowFactory.create());

                            requested--;
                            downstream().push(row);

                            left = null;

                            continue;
                        }

                        int cmp = comp.compare(left, right);

                        if (cmp < 0) {
                            if (!matched) {
                                row = handler.concat(left, rightRowFactory.create());

                                requested--;
                                downstream().push(row);
                            }

                            left = null;
                            rightIdx = 0;

                            if (rightMaterialization != null) {
                                drainMaterialization = true;
                            }

                            continue;
                        } else if (cmp > 0) {
                            right = null;
                            rightIdx = 0;
                            rightMaterialization = null;

                            continue;
                        }

                        matched = true;

                        if (rightMaterialization == null && (!rightInBuf.isEmpty() || waitingRight != NOT_WAITING)) {
                            if (rightInBuf.isEmpty()) {
                                break;
                            }

                            if (comp.compare(left, rightInBuf.peek()) == 0) {
                                rightMaterialization = new ArrayList<>();
                            }
                        }

                        row = handler.concat(left, right);

                        if (rightMaterialization != null) {
                            rightMaterialization.add(right);

                            right = null;
                        } else {
                            left = null;
                        }
                    } else {
                        if (rightIdx >= rightMaterialization.size()) {
                            rightIdx = 0;
                            left = null;

                            continue;
                        }

                        RowT right = rightMaterialization.get(rightIdx++);

                        int cmp = comp.compare(left, right);

                        if (cmp > 0) {
                            rightIdx = 0;
                            rightMaterialization = null;
                            drainMaterialization = false;

                            continue;
                        }

                        row = handler.concat(left, right);
                    }

                    requested--;
                    downstream().push(row);
                }
            } finally {
                inLoop = false;
            }

            if (waitingRight == 0) {
                rightSource().request(waitingRight = inBufSize);
            }

            if (waitingLeft == 0) {
                leftSource().request(waitingLeft = inBufSize);
            }

            if (requested > 0 && waitingLeft == NOT_WAITING && left == null && leftInBuf.isEmpty()) {
                requested = 0;
                downstream().end();
            }
        }