public void visitMergeJoin()

in src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java [1142:1347]


    public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
        try{
            if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2) {
                int errCode=1101;
                throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
            }

            curTezOp = phyToTezOpMap.get(joinOp.getInputs().get(0));
            TezOperator rightTezOpr = null;
            TezOperator rightTezOprAggr = null;
            if (curTezOp.equals(compiledInputs[0])) {
                rightTezOpr = compiledInputs[1];
            } else {
                rightTezOpr = compiledInputs[0];
            }

            // We will first operate on right side which is indexer job.
            // First yank plan of the compiled right input and set that as an inner plan of right operator.
            PhysicalPlan rightPipelinePlan;
            if(!rightTezOpr.closed){
                PhysicalPlan rightPlan = rightTezOpr.plan;
                if(rightPlan.getRoots().size() != 1){
                    int errCode = 2171;
                    String errMsg = "Expected one but found more then one root physical operator in physical plan.";
                    throw new TezCompilerException(errMsg,errCode,PigException.BUG);
                }

                PhysicalOperator rightLoader = rightPlan.getRoots().get(0);
                if(! (rightLoader instanceof POLoad)){
                    int errCode = 2172;
                    String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
                    throw new TezCompilerException(errMsg,errCode);
                }

                if (rightPlan.getSuccessors(rightLoader) == null || rightPlan.getSuccessors(rightLoader).isEmpty())
                    // Load - Join case.
                    rightPipelinePlan = null;

                else{ // We got something on right side. Yank it and set it as inner plan of right input.
                    rightPipelinePlan = rightPlan.clone();
                    PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
                    rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0));
                    rightPipelinePlan.remove(root);
                    rightPlan.trimBelow(rightLoader);
                }
            }
            else{
                int errCode = 2022;
                String msg = "Right input plan have been closed. This is unexpected while compiling.";
                throw new PlanException(msg, errCode, PigException.BUG);
            }

            joinOp.setupRightPipeline(rightPipelinePlan);

            // At this point, we must be operating on input plan of right input and it would contain nothing else other then a POLoad.
            POLoad rightLoader = (POLoad)rightTezOpr.plan.getRoots().get(0);
            joinOp.setSignature(rightLoader.getSignature());
            LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
            List<String> udfs = new ArrayList<String>();
            if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
                joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
                joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
                udfs.add(rightLoader.getLFile().getFuncSpec().toString());

                // we don't need the right TezOper since
                // the right loader is an IndexableLoadFunc which can handle the index
                // itself
                tezPlan.remove(rightTezOpr);
                if(rightTezOpr == compiledInputs[0]) {
                    compiledInputs[0] = null;
                } else if(rightTezOpr == compiledInputs[1]) {
                    compiledInputs[1] = null;
                }
                rightTezOpr = null;

                // validate that the join keys in merge join are only
                // simple column projections or '*' and not expression - expressions
                // cannot be handled when the index is built by the storage layer on the sorted
                // data when the sorted data (and corresponding index) is written.
                // So merge join will be restricted not have expressions as
                // join keys
                int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
                for(int i = 0; i < numInputs; i++) {
                    List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
                    for (PhysicalPlan keyPlan : keyPlans) {
                        for(PhysicalOperator op : keyPlan) {
                            if(!(op instanceof POProject)) {
                                int errCode = 1106;
                                String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
                                rightLoader.getLFile().getFuncSpec() + " as the loader";
                                throw new TezCompilerException(errMsg, errCode, PigException.INPUT);
                            }
                        }
                    }
                }
            } else {
                joinOp = new POMergeJoinTez(joinOp);
                LoadFunc loadFunc = rightLoader.getLoadFunc();
                //Replacing POLoad with indexer is disabled for 'merge-sparse' joins.  While
                // this feature would be useful, the current implementation of
                // DefaultIndexableLoader
                //is not designed to handle multiple calls to seekNear.  Specifically, it rereads the entire index
                //for each call.  Some refactoring of this class is required - and then the check below could be removed.
                if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
                    int errCode = 1104;
                    String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
                    "The specified loader " + loadFunc + " doesn't implement it";
                    throw new TezCompilerException(errMsg,errCode);
                }

                // Replace POLoad with  indexer.

                if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
                    int errCode = 1104;
                    String errMsg = "Right input of merge-join must implement " +
                    "OrderedLoadFunc interface. The specified loader "
                    + loadFunc + " doesn't implement it";
                    throw new TezCompilerException(errMsg,errCode);
                }

                String[] indexerArgs = new String[6];
                List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
                FileSpec origRightLoaderFileSpec = rightLoader.getLFile();

                indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
                indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
                indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
                indexerArgs[3] = rightLoader.getSignature();
                indexerArgs[4] = rightLoader.getOperatorKey().scope;
                indexerArgs[5] = Boolean.toString(true);

                FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
                rightLoader.setLFile(lFile);

                // Loader of operator will return a tuple of form -
                // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer

                rightTezOprAggr = getTezOp();
                tezPlan.add(rightTezOprAggr);
                TezCompilerUtil.simpleConnectTwoVertex(tezPlan, rightTezOpr, rightTezOprAggr, scope, nig);
                rightTezOprAggr.setRequestedParallelism(1); // we need exactly one task for indexing job.
                rightTezOprAggr.setDontEstimateParallelism(true);

                // Convert the index as a broadcast input
                POValueOutputTez rightTezOprAggrOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
                rightTezOprAggr.plan.addAsLeaf(rightTezOprAggrOutput);
                rightTezOprAggrOutput.addOutputKey(curTezOp.getOperatorKey().toString());

                TezEdgeDescriptor edge = new TezEdgeDescriptor();
                TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
                TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp, edge);

                ((POMergeJoinTez) joinOp).setInputKey(rightTezOprAggr.getOperatorKey().toString());
                // set up the TezIndexableLoader for the join operator
                String[] tezIndexableLoaderArgs = new String[3];
                tezIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
                tezIndexableLoaderArgs[1] = joinOp.getOperatorKey().scope;
                tezIndexableLoaderArgs[2] = origRightLoaderFileSpec.getFileName();
                joinOp.setRightLoaderFuncSpec(
                        (new FuncSpec(TezIndexableLoader.class.getName(), tezIndexableLoaderArgs)));
                joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
                udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
            }

            if(joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
                curTezOp.markMergeSparseJoin();
            } else {
                curTezOp.markMergeJoin();
            }
            // We are done with right side. Lets work on left now.
            // Join will be materialized in leftTezOper.
            if (!curTezOp.isClosed()) {// Life is easy
                curTezOp.plan.addAsLeaf(joinOp);
            }
            else{
                int errCode = 2022;
                String msg = "Input plan has been closed. This is unexpected while compiling.";
                throw new PlanException(msg, errCode, PigException.BUG);
            }
            if(rightTezOprAggr != null) {
                rightTezOprAggr.markIndexer();
            }

            phyToTezOpMap.put(joinOp, curTezOp);
            // no combination of small splits as there is currently no way to guarantee the sortness
            // of the combined splits.
            curTezOp.noCombineSmallSplits();
            curTezOp.UDFs.addAll(udfs);

        }
        catch(PlanException e){
            int errCode = 2034;
            String msg = "Error compiling operator " + joinOp.getClass().getCanonicalName();
            throw new TezCompilerException(msg, errCode, PigException.BUG, e);
        }
       catch (IOException e){
           int errCode = 3000;
           String errMsg = "IOException caught while compiling POMergeJoin";
            throw new TezCompilerException(errMsg, errCode,e);
        }
       catch(CloneNotSupportedException e){
           int errCode = 2127;
           String errMsg = "Cloning exception caught while compiling POMergeJoin";
           throw new TezCompilerException(errMsg, errCode, PigException.BUG, e);
       }
    }