in src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java [91:282]
private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException, CloneNotSupportedException {
List<PhysicalOperator> leaves = phyPlan.getLeaves();
if (leaves == null || leaves.size() != 1) {
return;
}
// Ensure there is grouping.
List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(phyPlan, POGlobalRearrange.class);
if (glrs == null || glrs.size() == 0) {
return;
}
for (POGlobalRearrange glr : glrs) {
List<PhysicalOperator> glrSuccessors = phyPlan.getSuccessors(glr);
if (glrSuccessors == null || glrSuccessors.isEmpty()) {
continue;
}
if (!(glrSuccessors.get(0) instanceof POPackage)) {
continue;
}
POPackage poPackage = (POPackage) glrSuccessors.get(0);
List<PhysicalOperator> poPackageSuccessors = phyPlan.getSuccessors(poPackage);
if (poPackageSuccessors == null || poPackageSuccessors.size() != 1) {
continue;
}
PhysicalOperator successor = poPackageSuccessors.get(0);
// Retaining the original successor to be used later in modifying the plan.
PhysicalOperator packageSuccessor = successor;
if (successor instanceof POLimit) {
// POLimit is acceptable, as long as it has a single foreach as
// successor
List<PhysicalOperator> limitSucs = phyPlan.getSuccessors(successor);
if (limitSucs != null && limitSucs.size() == 1 &&
limitSucs.get(0) instanceof POForEach) {
// the code below will now further examine the foreach
successor = limitSucs.get(0);
}
}
if (successor instanceof POForEach) {
POForEach foreach = (POForEach) successor;
List<PhysicalOperator> foreachSuccessors = phyPlan.getSuccessors(foreach);
// multi-query
if (foreachSuccessors == null || foreachSuccessors.size() != 1) {
continue;
}
// Clone foreach so it can be modified to a post-reduce foreach.
POForEach postReduceFE = foreach.clone();
List<PhysicalPlan> feInners = postReduceFE.getInputPlans();
// find algebraic operators and also check if the foreach statement
// is suitable for combiner use
List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = CombinerOptimizerUtil.findAlgebraicOps
(feInners);
if (algebraicOps == null || algebraicOps.size() == 0) {
// the plan is not combinable or there is nothing to combine
// we're done
continue;
}
try {
List<PhysicalOperator> glrPredecessors = phyPlan.getPredecessors(glr);
// Exclude co-group from optimization
if (glrPredecessors == null || glrPredecessors.size() != 1) {
continue;
}
if (!(glrPredecessors.get(0) instanceof POLocalRearrange)) {
continue;
}
POLocalRearrange rearrange = (POLocalRearrange) glrPredecessors.get(0);
LOG.info("Algebraic operations found. Optimizing plan to use combiner.");
// Trim the global rearrange and the preceeding package.
convertToMapSideForEach(phyPlan, poPackage);
// replace PODistinct->Project[*] with distinct udf (which is Algebraic)
for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
if (!(op2plan.first instanceof PODistinct)) {
continue;
}
CombinerOptimizerUtil.DistinctPatcher distinctPatcher
= new CombinerOptimizerUtil.DistinctPatcher(op2plan.second);
distinctPatcher.visit();
if (distinctPatcher.getDistinct() == null) {
int errCode = 2073;
String msg = "Problem with replacing distinct operator with distinct built-in function.";
throw new PlanException(msg, errCode, PigException.BUG);
}
op2plan.first = distinctPatcher.getDistinct();
}
// create new map foreach -
POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
.getKeyType());
Map<PhysicalOperator, Integer> op2newpos = Maps.newHashMap();
Integer pos = 1;
// create plan for each algebraic udf and add as inner plan in map-foreach
for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
PhysicalPlan udfPlan = CombinerOptimizerUtil.createPlanWithPredecessors(op2plan.first,
op2plan.second);
mfe.addInputPlan(udfPlan, false);
op2newpos.put(op2plan.first, pos++);
}
CombinerOptimizerUtil.changeFunc(mfe, POUserFunc.INITIAL);
// since we will only be creating SingleTupleBag as input to
// the map foreach, we should flag the POProjects in the map
// foreach inner plans to also use SingleTupleBag
for (PhysicalPlan mpl : mfe.getInputPlans()) {
try {
new CombinerOptimizerUtil.fixMapProjects(mpl).visit();
} catch (VisitorException e) {
int errCode = 2089;
String msg = "Unable to flag project operator to use single tuple bag.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
}
// create new combine foreach
POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
.getKeyType());
// add algebraic functions with appropriate projection
CombinerOptimizerUtil.addAlgebraicFuncToCombineFE(cfe, op2newpos);
// we have modified the foreach inner plans - so set them again
// for the foreach so that foreach can do any re-initialization
// around them.
mfe.setInputPlans(mfe.getInputPlans());
cfe.setInputPlans(cfe.getInputPlans());
// tell POCombinerPackage which fields need projected and which
// placed in bags. First field is simple project rest need to go
// into bags
int numFields = algebraicOps.size() + 1; // algebraic funcs + group key
boolean[] bags = new boolean[numFields];
bags[0] = false;
for (int i = 1; i < numFields; i++) {
bags[i] = true;
}
// Use the POCombiner package in the combine plan
// as it needs to act differently than the regular
// package operator.
CombinerPackager pkgr = new CombinerPackager(poPackage.getPkgr(), bags);
POPackage combinePack = poPackage.clone();
combinePack.setPkgr(pkgr);
// A specialized local rearrange operator will replace
// the normal local rearrange in the map plan.
POLocalRearrange newRearrange = CombinerOptimizerUtil.getNewRearrange(rearrange);
POPreCombinerLocalRearrange combinerLocalRearrange = CombinerOptimizerUtil.getPreCombinerLR
(rearrange);
phyPlan.replace(rearrange, combinerLocalRearrange);
// Create a reduceBy operator.
POReduceBySpark reduceOperator = new POReduceBySpark(cfe.getOperatorKey(), combinerLocalRearrange
.getRequestedParallelism(), cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack,
newRearrange);
reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
fixReduceSideFE(postReduceFE, algebraicOps);
CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE);
updatePackager(reduceOperator, newRearrange);
// Add the new operators
phyPlan.add(reduceOperator);
phyPlan.add(mfe);
// Connect the new operators as follows:
// reduceBy (using algebraicOp.Intermediate)
// -> foreach (using algebraicOp.Initial)
phyPlan.connect(mfe, reduceOperator);
// Insert the reduce stage between combiner rearrange and its successor.
phyPlan.disconnect(combinerLocalRearrange, packageSuccessor);
phyPlan.connect(reduceOperator, packageSuccessor);
phyPlan.connect(combinerLocalRearrange, mfe);
// Replace foreach with post reduce foreach
phyPlan.add(postReduceFE);
phyPlan.replace(foreach, postReduceFE);
} catch (Exception e) {
int errCode = 2018;
String msg = "Internal error. Unable to introduce the combiner for optimization.";
throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
}
}
}