public Result getNextTuple()

in src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java [313:562]


    public Result getNextTuple() throws ExecException {

        Object curLeftKey;
        Result curLeftInp;

        if(firstTime){
            prepareTupleFactories();
            leftTuples = newLeftTupleArray();

            // Do initial setup.
            curLeftInp = processInput();
            if(curLeftInp.returnStatus != POStatus.STATUS_OK)
                return curLeftInp;       // Return because we want to fetch next left tuple.

            curLeftKey = extractKeysFromTuple(curLeftInp, 0);
            if(null == curLeftKey) // We drop the tuples which have null keys.
                return new Result(endOfRecordMark, null);

            try {
                seekInRightStream(curLeftKey);
            } catch (IOException e) {
                throwProcessingException(true, e);
            } catch (ClassCastException e) {
                throwProcessingException(true, e);
            }
            leftTuples.add((Tuple)curLeftInp.result);
            firstTime = false;
            prevLeftKey = curLeftKey;
            return new Result(endOfRecordMark, null);
        }

        if(doingJoin){
            // We matched on keys. Time to do the join.

            if(counter > 0){    // We have left tuples to join with current right tuple.
                Tuple joiningLeftTup = leftTuples.get(--counter);
                leftTupSize = joiningLeftTup.size();
                Tuple joinedTup = mergedTupleMaker.newTuple(leftTupSize + rightTupSize);

                for(int i=0; i<leftTupSize; i++) {
                    joinedTup.set(i, joiningLeftTup.get(i));
                }

                for(int i=0; i < rightTupSize; i++) {
                    joinedTup.set(i+leftTupSize, curJoiningRightTup.get(i));
                }

                return new Result(POStatus.STATUS_OK, joinedTup);
            }
            // Join with current right input has ended. But bag of left tuples
            // may still join with next right tuple.

            doingJoin = false;

            while(true){
                Result rightInp = getNextRightInp();
                if(rightInp.returnStatus != POStatus.STATUS_OK){
                    prevRightInp = null;
                    return rightInp;
                }
                else{
                    Object rightKey = extractKeysFromTuple(rightInp, 1);
                    if(null == rightKey) // If we see tuple having null keys in stream, we drop them
                        continue;       // and fetch next tuple.

                    int cmpval = ((Comparable)rightKey).compareTo(curJoinKey);
                    if (cmpval == 0){
                        // Matched the very next right tuple.
                        curJoiningRightTup = (Tuple)rightInp.result;
                        rightTupSize = curJoiningRightTup.size();
                        counter = leftTuples.size();
                        doingJoin = true;
                        return this.getNextTuple();

                    }
                    else if(cmpval > 0){    // We got ahead on right side. Store currently read right tuple.
                        if(!(this.parentPlan.endOfAllInput|| leftInputConsumedInSpark)){
                            prevRightKey = rightKey;
                            prevRightInp = rightInp;
                            // There cant be any more join on this key.
                            leftTuples = newLeftTupleArray();

                            leftTuples.add((Tuple)prevLeftInp.result);
                            return new Result(endOfRecordMark, null);
                        }

                        else{           // This is end of all input and this is last join output.
                            // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
                            try {
                                ((IndexableLoadFunc)rightLoader).close();
                            } catch (IOException e) {
                                // Non-fatal error. We can continue.
                                log.error("Received exception while trying to close right side file: " + e.getMessage());
                            }
                            return new Result(POStatus.STATUS_EOP, null);
                        }
                    }
                    else{   // At this point right side can't be behind.
                        int errCode = 1102;
                        String errMsg = "Data is not sorted on right side. \n" +
                            keyOrderReminder +
                            "Last two tuples encountered were: \n"+
                        curJoiningRightTup+ "\n" + (Tuple)rightInp.result ;
                        throw new ExecException(errMsg,errCode);
                    }
                }
            }
        }

        curLeftInp = processInput();
        switch(curLeftInp.returnStatus){

        case POStatus.STATUS_OK:
            curLeftKey = extractKeysFromTuple(curLeftInp, 0);
            if(null == curLeftKey) // We drop the tuples which have null keys.
                return new Result(endOfRecordMark, null);

            int cmpVal = ((Comparable)curLeftKey).compareTo(prevLeftKey);
            if(cmpVal == 0){
                // Keep on accumulating.
                leftTuples.add((Tuple)curLeftInp.result);
                return new Result(endOfRecordMark, null);
            }
            else if(cmpVal > 0){ // Filled with left bag. Move on.
                curJoinKey = prevLeftKey;
                break;
            }
            else{   // Current key < Prev Key
                int errCode = 1102;
                String errMsg = "Data is not sorted on left side. \n" +
                            keyOrderReminder +
                            "Last two tuples encountered were: \n" +
                prevLeftKey+ "\n" + curLeftKey ;
                throw new ExecException(errMsg,errCode);
            }

        case POStatus.STATUS_EOP:
            if(this.parentPlan.endOfAllInput || isEndOfInput()){
                // We hit the end on left input.
                // Tuples in bag may still possibly join with right side.
                curJoinKey = prevLeftKey;
                curLeftKey = null;
                if (isEndOfInput()) {
                    leftInputConsumedInSpark = true;
                }
                break;
            }
            else    // Fetch next left input.
                return curLeftInp;

        default:    // If encountered with ERR / NULL on left side, we send it down.
            return curLeftInp;
        }

        if((null != prevRightKey)
                && !(this.parentPlan.endOfAllInput || leftInputConsumedInSpark)
                && ((Comparable)prevRightKey).compareTo(curLeftKey) >= 0){

            // This will happen when we accumulated inputs on left side and moved on, but are still behind the right side
            // In that case, throw away the tuples accumulated till now and add the one we read in this function call.
            leftTuples = newLeftTupleArray();
            leftTuples.add((Tuple)curLeftInp.result);
            prevLeftInp = curLeftInp;
            prevLeftKey = curLeftKey;
            return new Result(endOfRecordMark, null);
        }

        // Accumulated tuples with same key on left side.
        // But since we are reading ahead we still haven't checked the read ahead right tuple.
        // Accumulated left tuples may potentially join with that. So, lets check that first.

        if((null != prevRightKey) && prevRightKey.equals(prevLeftKey)){

            curJoiningRightTup = (Tuple)prevRightInp.result;
            counter = leftTuples.size();
            rightTupSize = curJoiningRightTup.size();
            doingJoin = true;
            prevLeftInp = curLeftInp;
            prevLeftKey = curLeftKey;
            return this.getNextTuple();
        }

        // We will get here only when curLeftKey > prevRightKey
        boolean slidingToNextRecord = false;
        while(true){
            // Start moving on right stream to find the tuple whose key is same as with current left bag key.
            Result rightInp;
            if (slidingToNextRecord) {
                rightInp = getNextRightInp();
                slidingToNextRecord = false;
            } else
                rightInp = getNextRightInp(prevLeftKey);

            if(rightInp.returnStatus != POStatus.STATUS_OK)
                return rightInp;

            Object extractedRightKey = extractKeysFromTuple(rightInp, 1);

            if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them
                continue;       // and fetch next tuple.

            Comparable rightKey = (Comparable)extractedRightKey;

            if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){
                // Sanity check.
                int errCode = 1102;
                String errMsg = "Data is not sorted on right side. \n" +
                            keyOrderReminder +
                            "Last two tuples encountered were: \n"+
                prevRightKey+ "\n" + rightKey ;
                throw new ExecException(errMsg,errCode);
            }

            if (prevLeftKey != null && rightKey.compareTo(prevLeftKey) < 0) {    // still behind the left side, do nothing, fetch next right tuple.
                slidingToNextRecord = true;
                continue;
            }

            else if (prevLeftKey != null && rightKey.compareTo(prevLeftKey) == 0){  // Found matching tuple. Time to do join.

                curJoiningRightTup = (Tuple)rightInp.result;
                counter = leftTuples.size();
                rightTupSize = curJoiningRightTup.size();
                doingJoin = true;
                prevLeftInp = curLeftInp;
                prevLeftKey = curLeftKey;
                return this.getNextTuple();
            }

            else{    // We got ahead on right side. Store currently read right tuple.
                prevRightKey = rightKey;
                prevRightInp = rightInp;
                // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call.
                leftTuples = newLeftTupleArray();
                leftTuples.add((Tuple)curLeftInp.result);
                prevLeftInp = curLeftInp;
                prevLeftKey = curLeftKey;
                if(this.parentPlan.endOfAllInput || leftInputConsumedInSpark){  // This is end of all input and this is last time we will read right input.
                    // Right loader in this case wouldn't get a chance to close input stream. So, we close it ourself.
                    try {
                        ((IndexableLoadFunc)rightLoader).close();
                    } catch (IOException e) {
                     // Non-fatal error. We can continue.
                        log.error("Received exception while trying to close right side file: " + e.getMessage());
                    }
                }
                return new Result(endOfRecordMark, null);
            }
        }
    }