public void visitMergeJoin()

in src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java [772:923]


    public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
        try {
            if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
                int errCode=1101;
                throw new SparkCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
            }

            curSparkOp = phyToSparkOpMap.get(joinOp.getInputs().get(0));
            SparkOperator rightSparkOp;
            if(curSparkOp.equals(compiledInputs[0])) {
                rightSparkOp = compiledInputs[1];
            } else {
                rightSparkOp = compiledInputs[0];
            }

            PhysicalPlan rightPipelinePlan;
            PhysicalPlan rightPhyPlan = rightSparkOp.physicalPlan;
            if (rightPhyPlan.getRoots().size() != 1) {
                int errCode = 2171;
                String errMsg = "Expected one but found more then one root physical operator in physical plan.";
                throw new SparkCompilerException(errMsg,errCode);
            }
            PhysicalOperator rightPhyLoader = rightPhyPlan.getRoots().get(0);
            if (!(rightPhyLoader instanceof POLoad)) {
                int errCode = 2172;
                String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightPhyLoader.getClass().getCanonicalName();
                throw new SparkCompilerException(errMsg,errCode);
            }
            if (rightPhyPlan.getSuccessors(rightPhyLoader) == null || rightPhyPlan.getSuccessors(rightPhyLoader).isEmpty()) {
                // Load - Join case.
                rightPipelinePlan = null;
            } else{ // We got something on right side. Yank it and set it as inner plan of right input.
                rightPipelinePlan = rightPhyPlan.clone();
                PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
                rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0));
                rightPipelinePlan.remove(root);
                rightPhyPlan.trimBelow(rightPhyLoader);
            }

            joinOp.setupRightPipeline(rightPipelinePlan);
            rightSparkOp.setRequestedParallelism(1); // for indexing job

            POLoad rightLoader = (POLoad)rightSparkOp.physicalPlan.getRoots().get(0);
            joinOp.setSignature(rightLoader.getSignature());
            LoadFunc rightLoadFunc = rightLoader.getLoadFunc();

            if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
                joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
                joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
                curSparkOp.UDFs.add(rightLoader.getLFile().getFuncSpec().toString());

                // we don't need the right rightSparkOp since
                // the right loader is an IndexableLoadFunc which can handle the index itself
                sparkPlan.remove(rightSparkOp);
                if(rightSparkOp == compiledInputs[0]) {
                    compiledInputs[0] = null;
                } else if(rightSparkOp == compiledInputs[1]) {
                    compiledInputs[1] = null;
                }

                // validate that the join keys in merge join are only
                // simple column projections or '*' and not expression - expressions
                // cannot be handled when the index is built by the storage layer on the sorted
                // data when the sorted data (and corresponding index) is written.
                // So merge join will be restricted not have expressions as join keys
                int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
                for(int i = 0; i < numInputs; i++) {
                    List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
                    for (PhysicalPlan keyPlan : keyPlans) {
                        for(PhysicalOperator op : keyPlan) {
                            if(!(op instanceof POProject)) {
                                int errCode = 1106;
                                String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
                                        rightLoader.getLFile().getFuncSpec() + " as the loader";
                                throw new SparkCompilerException(errMsg, errCode, PigException.INPUT);
                            }
                        }
                    }
                }

            } else {
                //Replacing POLoad with indexer is disabled for 'merge-sparse' joins.  While
                //this feature would be useful, the current implementation of DefaultIndexableLoader
                //is not designed to handle multiple calls to seekNear.  Specifically, it rereads the entire index
                //for each call.  Some refactoring of this class is required - and then the check below could be removed.
                if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
                    int errCode = 1104;
                    String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
                            "The specified loader " + rightLoadFunc + " doesn't implement it";
                    throw new SparkCompilerException(errMsg,errCode);
                }

                // Replace POLoad with  indexer.
                if (! (OrderedLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass()))){
                    int errCode = 1104;
                    String errMsg = "Right input of merge-join must implement " +
                            "OrderedLoadFunc interface. The specified loader "
                            + rightLoadFunc + " doesn't implement it";
                    throw new SparkCompilerException(errMsg,errCode);
                }

                String[] indexerArgs = new String[6];
                List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
                FileSpec origRightLoaderFileSpec = rightLoader.getLFile();

                indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
                indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
                indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
                indexerArgs[3] = rightLoader.getSignature();
                indexerArgs[4] = rightLoader.getOperatorKey().scope;
                indexerArgs[5] = Boolean.toString(true);

                FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
                rightLoader.setLFile(lFile);

                // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
                rightSparkOp.useTypedComparator(true);
                POStore idxStore = getStore();
                FileSpec idxStrFile = getTempFileSpec();
                idxStore.setSFile(idxStrFile);
                rightSparkOp.physicalPlan.addAsLeaf(idxStore);
                rightSparkOp.markIndexer();

                curSparkOp.UDFs.add(origRightLoaderFileSpec.getFuncSpec().toString());

                // We want to ensure indexing job runs prior to actual join job.
                // So, connect them in order.
                sparkPlan.connect(rightSparkOp, curSparkOp);

                // set up the DefaultIndexableLoader for the join operator
                String[] defaultIndexableLoaderArgs = new String[5];
                defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
                defaultIndexableLoaderArgs[1] = idxStrFile.getFileName();
                defaultIndexableLoaderArgs[2] = idxStrFile.getFuncSpec().toString();
                defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
                defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
                joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
                joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());

                joinOp.setIndexFile(idxStrFile.getFileName());
            }

            curSparkOp.physicalPlan.addAsLeaf(joinOp);
            phyToSparkOpMap.put(joinOp, curSparkOp);

        } catch (Exception e) {
            int errCode = 2034;
            String msg = "Error compiling operator "
                    + joinOp.getClass().getSimpleName();
            throw new SparkCompilerException(msg, errCode, PigException.BUG, e);
        }
    }