public void visitMROp()

in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java [144:378]


    public void visitMROp(MapReduceOper mr) throws VisitorException {
        log.trace("Entering SecondaryKeyOptimizer.visitMROp, skip optimizing");
        List<SortKeyInfo> sortKeyInfos = new ArrayList<SortKeyInfo>();
        SortKeyInfo secondarySortKeyInfo = null;
        List<POToChange> sortsToRemove = null;
        List<POToChange> distinctsToChange = null;

        // Only optimize for Cogroup case
        if (mr.isGlobalSort())
            return;

        // Don't optimize when we already have a custom partitioner
        if (mr.getCustomPartitioner()!=null)
            return;

        List<PhysicalOperator> mapLeaves = mr.mapPlan.getLeaves();
        if (mapLeaves == null || mapLeaves.size() != 1) {
            log
                    .debug("Expected map to have single leaf! Skip secondary key optimizing");
            return;
        }
        PhysicalOperator mapLeaf = mapLeaves.get(0);

        // Figure out the main key of the map-reduce job from POLocalRearrange
        try {
            if (mapLeaf instanceof POLocalRearrange) {
                SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) mapLeaf);
                if (sortKeyInfo == null) {
                    log
                            .debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
                    return;
                }
                sortKeyInfos.add(sortKeyInfo);
            } else if (mapLeaf instanceof POUnion) {
                List<PhysicalOperator> preds = mr.mapPlan
                        .getPredecessors(mapLeaf);
                for (PhysicalOperator pred : preds) {
                    if (pred instanceof POLocalRearrange) {
                        SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) pred);
                        if (sortKeyInfo == null) {
                            log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
                            return;
                        }
                        sortKeyInfos.add(sortKeyInfo);
                    }
                }
            } else {
                log.debug("Cannot find POLocalRearrange or POUnion in map leaf, skip secondary key optimizing");
                return;
            }
        } catch (ExecException e) {
            log
                    .debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
            return;
        }

        if (mr.reducePlan.isEmpty()) {
            log.debug("Reduce plan is empty, skip secondary key optimizing");
            return;
        }

        List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();
        if (reduceRoots.size() != 1) {
            log
                    .debug("Expected reduce to have single root, skip secondary key optimizing");
            return;
        }

        PhysicalOperator root = reduceRoots.get(0);
        if (!(root instanceof POPackage)) {
            log
                    .debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
            return;
        }

        // visit the POForEach of the reduce plan. We can have Limit and Filter
        // in the middle
        PhysicalOperator currentNode = root;
        POForEach foreach = null;
        while (currentNode != null) {
            if (currentNode instanceof POPackage
                    && !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
                    || currentNode instanceof POFilter
                    || currentNode instanceof POLimit) {
                List<PhysicalOperator> succs = mr.reducePlan
                        .getSuccessors(currentNode);
                if (succs == null) // We didn't find POForEach
                    return;
                if (succs.size() != 1) {
                    log.debug("See multiple output for " + currentNode
                            + " in reduce plan, skip secondary key optimizing");
                    return;
                }
                currentNode = succs.get(0);
            } else if (currentNode instanceof POForEach) {
                foreach = (POForEach) currentNode;
                break;
            } else { // Skip optimization
                return;
            }
        }
        
        // We do not find a foreach (we shall not come here, a trick to fool findbugs)
        if (foreach==null)
            return;

        sortsToRemove = new ArrayList<POToChange>();
        distinctsToChange = new ArrayList<POToChange>();

        for (PhysicalPlan innerPlan : foreach.getInputPlans()) {
            // visit inner plans to figure out the sort order for distinct /
            // sort
            SecondaryKeyDiscover innerPlanDiscover = new SecondaryKeyDiscover(
                    innerPlan, sortKeyInfos, secondarySortKeyInfo);
            try {
                innerPlanDiscover.process();
            } catch (FrontendException e) {
                int errorCode = 2213;
                throw new VisitorException("Error visiting inner plan for ForEach", errorCode, e);
            }
            secondarySortKeyInfo = innerPlanDiscover.getSecondarySortKeyInfo();
            if (innerPlanDiscover.getSortsToRemove() != null) {
                for (POSort sort : innerPlanDiscover.getSortsToRemove()) {
                    sortsToRemove.add(new POToChange(sort, innerPlan, foreach));
                }
            }
            if (innerPlanDiscover.getDistinctsToChange() != null) {
                for (PODistinct distinct : innerPlanDiscover
                        .getDistinctsToChange()) {
                    distinctsToChange.add(new POToChange(distinct, innerPlan,
                            foreach));
                }
            }
        }

        try {
            // Change PODistinct to use POSortedDistinct, which assume the input
            // data is sorted
            for (POToChange distinctToChange : distinctsToChange) {
                numDistinctChanged++;
                PODistinct oldDistinct = (PODistinct) distinctToChange.oper;
                String scope = oldDistinct.getOperatorKey().scope;
                POSortedDistinct newDistinct = new POSortedDistinct(
                        new OperatorKey(scope, NodeIdGenerator.getGenerator()
                                .getNextNodeId(scope)), oldDistinct
                                .getRequestedParallelism(), oldDistinct
                                .getInputs());
                newDistinct.setInputs(oldDistinct.getInputs());
                newDistinct.setResultType(oldDistinct.getResultType());
                distinctToChange.plan.replace(oldDistinct, newDistinct);
                distinctToChange.forEach.getLeaves();
            }
            // Removed POSort, if the successor require a databag, we need to
            // add a PORelationToExprProject
            // to convert tuples into databag
            for (POToChange sortToRemove : sortsToRemove) {
                numSortRemoved++;
                POSort oldSort = (POSort) sortToRemove.oper;
                String scope = oldSort.getOperatorKey().scope;
                List<PhysicalOperator> preds = sortToRemove.plan
                        .getPredecessors(sortToRemove.oper);
                List<PhysicalOperator> succs = sortToRemove.plan
                .getSuccessors(sortToRemove.oper);
                POProject project = null;
                if ((preds == null
                        || preds.get(0).getResultType() != DataType.BAG
                        && oldSort.getResultType() == DataType.BAG) // sort to remove do change the result type
                        && (succs == null || !(succs.get(0) instanceof PORelationToExprProject))) // successor is not PORelationToExprProject
                {
                    project = new PORelationToExprProject(new OperatorKey(
                            scope, NodeIdGenerator.getGenerator()
                                    .getNextNodeId(scope)), oldSort
                            .getRequestedParallelism());
                    project.setInputs(oldSort.getInputs());
                    project.setResultType(DataType.BAG);
                    project.setStar(true);
                }
                if (project == null)
                    sortToRemove.plan.removeAndReconnect(sortToRemove.oper);
                else
                    sortToRemove.plan.replace(oldSort, project);
                sortToRemove.forEach.getLeaves();
            }
        } catch (PlanException e) {
            int errorCode = 2202;
            throw new VisitorException(
                    "Error change distinct/sort to use secondary key optimizer",
                    errorCode, e);
        }
        if (secondarySortKeyInfo != null) {
            // Adjust POLocalRearrange, POPackage, MapReduceOper to use the
            // secondary key
            numMRUseSecondaryKey++;
            mr.setUseSecondaryKey(true);
            mr.setSecondarySortOrder(secondarySortKeyInfo.getAscs());
            int indexOfRearrangeToChange = -1;
            for (ColumnChainInfo columnChainInfo : secondarySortKeyInfo
                    .getColumnChains()) {
                ColumnInfo currentColumn = columnChainInfo.getColumnInfos()
                        .get(0);
                int index = currentColumn.columns.get(0);
                if (indexOfRearrangeToChange == -1)
                    indexOfRearrangeToChange = index;
                else if (indexOfRearrangeToChange != index) {
                    int errorCode = 2203;
                    throw new VisitorException("Sort on columns from different inputs.", errorCode);
                }
            }
            if (mapLeaf instanceof POLocalRearrange) {
                ((POLocalRearrange) mapLeaf).setUseSecondaryKey(true);
                setSecondaryPlan(mr.mapPlan, (POLocalRearrange) mapLeaf,
                        secondarySortKeyInfo);
            } else if (mapLeaf instanceof POUnion) {
                List<PhysicalOperator> preds = mr.mapPlan
                        .getPredecessors(mapLeaf);
                boolean found = false;
                for (PhysicalOperator pred : preds) {
                    POLocalRearrange rearrange = (POLocalRearrange) pred;
                    rearrange.setUseSecondaryKey(true);
                    if (rearrange.getIndex() == indexOfRearrangeToChange) { 
                        // Try to find the POLocalRearrange for the secondary key
                        found = true;
                        setSecondaryPlan(mr.mapPlan, rearrange, secondarySortKeyInfo);
                    }
                }
                if (!found)
                {
                    int errorCode = 2214;
                    throw new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
                }
            }
            POPackage pack = (POPackage) root;
            pack.getPkgr().setUseSecondaryKey(true);
        }
    }