src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java [599:685]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            SortKeyInfo keyInfo = new SortKeyInfo();
            for (int i = 0; i < sort.getSortPlans().size(); i++) {
                PhysicalPlan sortPlan = sort.getSortPlans().get(i);
                ColumnChainInfo sortChainInfo = null;
                try {
                    sortChainInfo = (ColumnChainInfo) columnChainInfo.clone();
                } catch (CloneNotSupportedException e) { // We implement
                                                         // Clonable, impossible
                                                         // to get here
                }
                boolean r = false;
                try {
                    r = collectColumnChain(sortPlan, sortChainInfo);
                } catch (PlanException e) {
                    int errorCode = 2206;
                    throw new FrontendException("Error visiting POSort inner plan",
                            errorCode, e);
                }
                if (r==true) // if we saw physical operator other than project in sort plan
                {
                    return true;
                }
                keyInfo.insertColumnChainInfo(i, sortChainInfo, sort
                        .getMAscCols().get(i));
            }
            // if it is part of main key
            for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
                if (sortKeyInfo.moreSpecificThan(keyInfo)) {
                    sortsToRemove.add(sort);
                    return false;
                }
            }
            // if it is part of secondary key
            if (secondarySortKeyInfo != null
                    && secondarySortKeyInfo.moreSpecificThan(keyInfo)) {
                sortsToRemove.add(sort);
                return false;
            }

            // Now set the secondary key
            if (secondarySortKeyInfo == null) {
                sortsToRemove.add(sort);
                secondarySortKeyInfo = keyInfo;
            }
            return false;
        }

        public List<POSort> getSortsToRemove() {
            return sortsToRemove;
        }

        public List<PODistinct> getDistinctsToChange() {
            return distinctsToChange;
        }

        public SortKeyInfo getSecondarySortKeyInfo() {
            return secondarySortKeyInfo;
        }
    }

    // Return true if we saw physical operators other than project in the plan
    static private boolean collectColumnChain(PhysicalPlan plan,
            ColumnChainInfo columnChainInfo) throws PlanException {
        if (plan.getRoots().size() != 1) {
            return true;
        }

        PhysicalOperator currentNode = plan.getRoots().get(0);

        while (currentNode != null) {
            if (currentNode instanceof POProject) {
                POProject project = (POProject) currentNode;
                columnChainInfo.insertInReduce(project);
            } else {
                return true;
            }
            List<PhysicalOperator> succs = plan.getSuccessors(currentNode);
            if (succs == null)
                break;
            if (succs.size() != 1) {
                int errorCode = 2208;
                throw new PlanException(
                        "Exception visiting foreach inner plan", errorCode);
            }
            currentNode = succs.get(0);
        }
        return false;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java [553:639]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            SortKeyInfo keyInfo = new SortKeyInfo();
            for (int i = 0; i < sort.getSortPlans().size(); i++) {
                PhysicalPlan sortPlan = sort.getSortPlans().get(i);
                ColumnChainInfo sortChainInfo = null;
                try {
                    sortChainInfo = (ColumnChainInfo) columnChainInfo.clone();
                } catch (CloneNotSupportedException e) { // We implement
                                                         // Clonable, impossible
                                                         // to get here
                }
                boolean r = false;
                try {
                    r = collectColumnChain(sortPlan, sortChainInfo);
                } catch (PlanException e) {
                    int errorCode = 2206;
                    throw new FrontendException("Error visiting POSort inner plan",
                            errorCode, e);
                }
                if (r==true) // if we saw physical operator other than project in sort plan
                {
                    return true;
                }
                keyInfo.insertColumnChainInfo(i, sortChainInfo, sort
                        .getMAscCols().get(i));
            }
            // if it is part of main key
            for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
                if (sortKeyInfo.moreSpecificThan(keyInfo)) {
                    sortsToRemove.add(sort);
                    return false;
                }
            }
            // if it is part of secondary key
            if (secondarySortKeyInfo != null
                    && secondarySortKeyInfo.moreSpecificThan(keyInfo)) {
                sortsToRemove.add(sort);
                return false;
            }

            // Now set the secondary key
            if (secondarySortKeyInfo == null) {
                sortsToRemove.add(sort);
                secondarySortKeyInfo = keyInfo;
            }
            return false;
        }

        public List<POSort> getSortsToRemove() {
            return sortsToRemove;
        }

        public List<PODistinct> getDistinctsToChange() {
            return distinctsToChange;
        }

        public SortKeyInfo getSecondarySortKeyInfo() {
            return secondarySortKeyInfo;
        }
    }

    // Return true if we saw physical operators other than project in the plan
    static private boolean collectColumnChain(PhysicalPlan plan,
            ColumnChainInfo columnChainInfo) throws PlanException {
        if (plan.getRoots().size() != 1) {
        	return true;
        }

        PhysicalOperator currentNode = plan.getRoots().get(0);

        while (currentNode != null) {
            if (currentNode instanceof POProject) {
                POProject project = (POProject) currentNode;
                columnChainInfo.insertInReduce(project);
            } else {
                return true;
            }
            List<PhysicalOperator> succs = plan.getSuccessors(currentNode);
            if (succs == null)
                break;
            if (succs.size() != 1) {
                int errorCode = 2208;
                throw new PlanException(
                        "Exception visiting foreach inner plan", errorCode);
            }
            currentNode = succs.get(0);
        }
        return false;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



