in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java [144:378]
public void visitMROp(MapReduceOper mr) throws VisitorException {
log.trace("Entering SecondaryKeyOptimizer.visitMROp, skip optimizing");
List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>();
SortKeyInfo secondarySortKeyInfo = null;
List<POToChange> sortsToRemove = null;
List<POToChange> distinctsToChange = null;
// Only optimize for Cogroup case
if (mr.isGlobalSort())
return;
// Don't optimize when we already have a custom partitioner
if (mr.getCustomPartitioner()!=null)
return;
List<PhysicalOperator> mapLeaves = mr.mapPlan.getLeaves();
if (mapLeaves == null || mapLeaves.size() != 1) {
log
.debug("Expected map to have single leaf! Skip secondary key optimizing");
return;
}
PhysicalOperator mapLeaf = mapLeaves.get(0);
// Figure out the main key of the map-reduce job from POLocalRearrange
try {
if (mapLeaf instanceof POLocalRearrange) {
SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) mapLeaf);
if (sortKeyInfo == null) {
log
.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
return;
}
sortKeyInfos.add(sortKeyInfo);
} else if (mapLeaf instanceof POUnion) {
List<PhysicalOperator> preds = mr.mapPlan
.getPredecessors(mapLeaf);
for (PhysicalOperator pred : preds) {
if (pred instanceof POLocalRearrange) {
SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) pred);
if (sortKeyInfo == null) {
log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
return;
}
sortKeyInfos.add(sortKeyInfo);
}
}
} else {
log.debug("Cannot find POLocalRearrange or POUnion in map leaf, skip secondary key optimizing");
return;
}
} catch (ExecException e) {
log
.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
return;
}
if (mr.reducePlan.isEmpty()) {
log.debug("Reduce plan is empty, skip secondary key optimizing");
return;
}
List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();
if (reduceRoots.size() != 1) {
log
.debug("Expected reduce to have single root, skip secondary key optimizing");
return;
}
PhysicalOperator root = reduceRoots.get(0);
if (!(root instanceof POPackage)) {
log
.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
return;
}
// visit the POForEach of the reduce plan. We can have Limit and Filter
// in the middle
PhysicalOperator currentNode = root;
POForEach foreach = null;
while (currentNode != null) {
if (currentNode instanceof POPackage
&& !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
|| currentNode instanceof POFilter
|| currentNode instanceof POLimit) {
List<PhysicalOperator> succs = mr.reducePlan
.getSuccessors(currentNode);
if (succs == null) // We didn't find POForEach
return;
if (succs.size() != 1) {
log.debug("See multiple output for " + currentNode
+ " in reduce plan, skip secondary key optimizing");
return;
}
currentNode = succs.get(0);
} else if (currentNode instanceof POForEach) {
foreach = (POForEach) currentNode;
break;
} else { // Skip optimization
return;
}
}
// We do not find a foreach (we shall not come here, a trick to fool findbugs)
if (foreach==null)
return;
sortsToRemove = new ArrayList<POToChange>();
distinctsToChange = new ArrayList<POToChange>();
for (PhysicalPlan innerPlan : foreach.getInputPlans()) {
// visit inner plans to figure out the sort order for distinct /
// sort
SecondaryKeyDiscover innerPlanDiscover = new SecondaryKeyDiscover(
innerPlan, sortKeyInfos, secondarySortKeyInfo);
try {
innerPlanDiscover.process();
} catch (FrontendException e) {
int errorCode = 2213;
throw new VisitorException("Error visiting inner plan for ForEach", errorCode, e);
}
secondarySortKeyInfo = innerPlanDiscover.getSecondarySortKeyInfo();
if (innerPlanDiscover.getSortsToRemove() != null) {
for (POSort sort : innerPlanDiscover.getSortsToRemove()) {
sortsToRemove.add(new POToChange(sort, innerPlan, foreach));
}
}
if (innerPlanDiscover.getDistinctsToChange() != null) {
for (PODistinct distinct : innerPlanDiscover
.getDistinctsToChange()) {
distinctsToChange.add(new POToChange(distinct, innerPlan,
foreach));
}
}
}
try {
// Change PODistinct to use POSortedDistinct, which assume the input
// data is sorted
for (POToChange distinctToChange : distinctsToChange) {
numDistinctChanged++;
PODistinct oldDistinct = (PODistinct) distinctToChange.oper;
String scope = oldDistinct.getOperatorKey().scope;
POSortedDistinct newDistinct = new POSortedDistinct(
new OperatorKey(scope, NodeIdGenerator.getGenerator()
.getNextNodeId(scope)), oldDistinct
.getRequestedParallelism(), oldDistinct
.getInputs());
newDistinct.setInputs(oldDistinct.getInputs());
newDistinct.setResultType(oldDistinct.getResultType());
distinctToChange.plan.replace(oldDistinct, newDistinct);
distinctToChange.forEach.getLeaves();
}
// Removed POSort, if the successor require a databag, we need to
// add a PORelationToExprProject
// to convert tuples into databag
for (POToChange sortToRemove : sortsToRemove) {
numSortRemoved++;
POSort oldSort = (POSort) sortToRemove.oper;
String scope = oldSort.getOperatorKey().scope;
List<PhysicalOperator> preds = sortToRemove.plan
.getPredecessors(sortToRemove.oper);
List<PhysicalOperator> succs = sortToRemove.plan
.getSuccessors(sortToRemove.oper);
POProject project = null;
if ((preds == null
|| preds.get(0).getResultType() != DataType.BAG
&& oldSort.getResultType() == DataType.BAG) // sort to remove do change the result type
&& (succs == null || !(succs.get(0) instanceof PORelationToExprProject))) // successor is not PORelationToExprProject
{
project = new PORelationToExprProject(new OperatorKey(
scope, NodeIdGenerator.getGenerator()
.getNextNodeId(scope)), oldSort
.getRequestedParallelism());
project.setInputs(oldSort.getInputs());
project.setResultType(DataType.BAG);
project.setStar(true);
}
if (project == null)
sortToRemove.plan.removeAndReconnect(sortToRemove.oper);
else
sortToRemove.plan.replace(oldSort, project);
sortToRemove.forEach.getLeaves();
}
} catch (PlanException e) {
int errorCode = 2202;
throw new VisitorException(
"Error change distinct/sort to use secondary key optimizer",
errorCode, e);
}
if (secondarySortKeyInfo != null) {
// Adjust POLocalRearrange, POPackage, MapReduceOper to use the
// secondary key
numMRUseSecondaryKey++;
mr.setUseSecondaryKey(true);
mr.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
int indexOfRearrangeToChange = -1;
for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
.getColumnChains()) {
ColumnInfo currentColumn = columnChainInfo.getColumnInfos()
.get(0);
int index = currentColumn.columns.get(0);
if (indexOfRearrangeToChange == -1)
indexOfRearrangeToChange = index;
else if (indexOfRearrangeToChange != index) {
int errorCode = 2203;
throw new VisitorException("Sort on columns from different inputs.", errorCode);
}
}
if (mapLeaf instanceof POLocalRearrange) {
((POLocalRearrange) mapLeaf).setUseSecondaryKey(true);
setSecondaryPlan(mr.mapPlan, (POLocalRearrange) mapLeaf,
secondarySortKeyInfo);
} else if (mapLeaf instanceof POUnion) {
List<PhysicalOperator> preds = mr.mapPlan
.getPredecessors(mapLeaf);
boolean found = false;
for (PhysicalOperator pred : preds) {
POLocalRearrange rearrange = (POLocalRearrange) pred;
rearrange.setUseSecondaryKey(true);
if (rearrange.getIndex() == indexOfRearrangeToChange) {
// Try to find the POLocalRearrange for the secondary key
found = true;
setSecondaryPlan(mr.mapPlan, rearrange, secondarySortKeyInfo);
}
}
if (!found)
{
int errorCode = 2214;
throw new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
}
}
POPackage pack = (POPackage) root;
pack.getPkgr().setUseSecondaryKey(true);
}
}