private void addCombiner()

in src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java [91:282]


    private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException, CloneNotSupportedException {

        List<PhysicalOperator> leaves = phyPlan.getLeaves();
        if (leaves == null || leaves.size() != 1) {
            return;
        }

        // Ensure there is grouping.
        List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(phyPlan, POGlobalRearrange.class);
        if (glrs == null || glrs.size() == 0) {
            return;
        }
        for (POGlobalRearrange glr : glrs) {
            List<PhysicalOperator> glrSuccessors = phyPlan.getSuccessors(glr);
            if (glrSuccessors == null || glrSuccessors.isEmpty()) {
                continue;
            }

            if (!(glrSuccessors.get(0) instanceof POPackage)) {
                continue;
            }
            POPackage poPackage = (POPackage) glrSuccessors.get(0);

            List<PhysicalOperator> poPackageSuccessors = phyPlan.getSuccessors(poPackage);
            if (poPackageSuccessors == null || poPackageSuccessors.size() != 1) {
                continue;
            }
            PhysicalOperator successor = poPackageSuccessors.get(0);

            // Retaining the original successor to be used later in modifying the plan.
            PhysicalOperator packageSuccessor = successor;

            if (successor instanceof POLimit) {
                // POLimit is acceptable, as long as it has a single foreach as
                // successor
                List<PhysicalOperator> limitSucs = phyPlan.getSuccessors(successor);
                if (limitSucs != null && limitSucs.size() == 1 &&
                        limitSucs.get(0) instanceof POForEach) {
                    // the code below will now further examine the foreach
                    successor = limitSucs.get(0);
                }
            }
            if (successor instanceof POForEach) {
                POForEach foreach = (POForEach) successor;
                List<PhysicalOperator> foreachSuccessors = phyPlan.getSuccessors(foreach);
                // multi-query
                if (foreachSuccessors == null || foreachSuccessors.size() != 1) {
                    continue;
                }
                // Clone foreach so it can be modified to a post-reduce foreach.
                POForEach postReduceFE = foreach.clone();
                List<PhysicalPlan> feInners = postReduceFE.getInputPlans();

                // find algebraic operators and also check if the foreach statement
                // is suitable for combiner use
                List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = CombinerOptimizerUtil.findAlgebraicOps
                        (feInners);
                if (algebraicOps == null || algebraicOps.size() == 0) {
                    // the plan is not combinable or there is nothing to combine
                    // we're done
                    continue;
                }
                try {
                    List<PhysicalOperator> glrPredecessors = phyPlan.getPredecessors(glr);
                    // Exclude co-group from optimization
                    if (glrPredecessors == null || glrPredecessors.size() != 1) {
                        continue;
                    }

                    if (!(glrPredecessors.get(0) instanceof POLocalRearrange)) {
                        continue;
                    }

                    POLocalRearrange rearrange = (POLocalRearrange) glrPredecessors.get(0);

                    LOG.info("Algebraic operations found. Optimizing plan to use combiner.");

                    // Trim the global rearrange and the preceeding package.
                    convertToMapSideForEach(phyPlan, poPackage);

                    // replace PODistinct->Project[*] with distinct udf (which is Algebraic)
                    for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
                        if (!(op2plan.first instanceof PODistinct)) {
                            continue;
                        }
                        CombinerOptimizerUtil.DistinctPatcher distinctPatcher
                                = new CombinerOptimizerUtil.DistinctPatcher(op2plan.second);
                        distinctPatcher.visit();
                        if (distinctPatcher.getDistinct() == null) {
                            int errCode = 2073;
                            String msg = "Problem with replacing distinct operator with distinct built-in function.";
                            throw new PlanException(msg, errCode, PigException.BUG);
                        }
                        op2plan.first = distinctPatcher.getDistinct();
                    }

                    // create new map foreach -
                    POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
                            .getKeyType());
                    Map<PhysicalOperator, Integer> op2newpos = Maps.newHashMap();
                    Integer pos = 1;
                    // create plan for each algebraic udf and add as inner plan in map-foreach
                    for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
                        PhysicalPlan udfPlan = CombinerOptimizerUtil.createPlanWithPredecessors(op2plan.first,
                                op2plan.second);
                        mfe.addInputPlan(udfPlan, false);
                        op2newpos.put(op2plan.first, pos++);
                    }
                    CombinerOptimizerUtil.changeFunc(mfe, POUserFunc.INITIAL);

                    // since we will only be creating SingleTupleBag as input to
                    // the map foreach, we should flag the POProjects in the map
                    // foreach inner plans to also use SingleTupleBag
                    for (PhysicalPlan mpl : mfe.getInputPlans()) {
                        try {
                            new CombinerOptimizerUtil.fixMapProjects(mpl).visit();
                        } catch (VisitorException e) {
                            int errCode = 2089;
                            String msg = "Unable to flag project operator to use single tuple bag.";
                            throw new PlanException(msg, errCode, PigException.BUG, e);
                        }
                    }

                    // create new combine foreach
                    POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
                            .getKeyType());
                    // add algebraic functions with appropriate projection
                    CombinerOptimizerUtil.addAlgebraicFuncToCombineFE(cfe, op2newpos);

                    // we have modified the foreach inner plans - so set them again
                    // for the foreach so that foreach can do any re-initialization
                    // around them.
                    mfe.setInputPlans(mfe.getInputPlans());
                    cfe.setInputPlans(cfe.getInputPlans());

                    // tell POCombinerPackage which fields need projected and which
                    // placed in bags. First field is simple project rest need to go
                    // into bags
                    int numFields = algebraicOps.size() + 1; // algebraic funcs + group key
                    boolean[] bags = new boolean[numFields];
                    bags[0] = false;
                    for (int i = 1; i < numFields; i++) {
                        bags[i] = true;
                    }

                    // Use the POCombiner package in the combine plan
                    // as it needs to act differently than the regular
                    // package operator.
                    CombinerPackager pkgr = new CombinerPackager(poPackage.getPkgr(), bags);
                    POPackage combinePack = poPackage.clone();
                    combinePack.setPkgr(pkgr);

                    // A specialized local rearrange operator will replace
                    // the normal local rearrange in the map plan.
                    POLocalRearrange newRearrange = CombinerOptimizerUtil.getNewRearrange(rearrange);
                    POPreCombinerLocalRearrange combinerLocalRearrange = CombinerOptimizerUtil.getPreCombinerLR
                            (rearrange);
                    phyPlan.replace(rearrange, combinerLocalRearrange);

                    // Create a reduceBy operator.
                    POReduceBySpark reduceOperator = new POReduceBySpark(cfe.getOperatorKey(), combinerLocalRearrange
                            .getRequestedParallelism(), cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack,
                            newRearrange);
                    reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
                    fixReduceSideFE(postReduceFE, algebraicOps);
                    CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE);
                    updatePackager(reduceOperator, newRearrange);

                    // Add the new operators
                    phyPlan.add(reduceOperator);
                    phyPlan.add(mfe);
                    // Connect the new operators as follows:
                    // reduceBy (using algebraicOp.Intermediate)
                    //      -> foreach (using algebraicOp.Initial)
                     phyPlan.connect(mfe, reduceOperator);

                    // Insert the reduce stage between combiner rearrange and its successor.
                    phyPlan.disconnect(combinerLocalRearrange, packageSuccessor);
                    phyPlan.connect(reduceOperator, packageSuccessor);
                    phyPlan.connect(combinerLocalRearrange, mfe);

                    // Replace foreach with post reduce foreach
                    phyPlan.add(postReduceFE);
                    phyPlan.replace(foreach, postReduceFE);
                } catch (Exception e) {
                    int errCode = 2018;
                    String msg = "Internal error. Unable to introduce the combiner for optimization.";
                    throw new OptimizerException(msg, errCode, PigException.BUG, e);
                }
            }
        }
    }