src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java [489:592]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static class SecondaryKeyDiscover {
        PhysicalPlan mPlan;

        List<POSort> sortsToRemove = new ArrayList<POSort>();

        List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();

        List<SortKeyInfo> sortKeyInfos;

        SortKeyInfo secondarySortKeyInfo;

        ColumnChainInfo columnChainInfo = null;

        // PhysicalPlan here is foreach inner plan
        SecondaryKeyDiscover(PhysicalPlan plan,
                List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) {
            this.mPlan = plan;
            this.sortKeyInfos = sortKeyInfos;
            this.secondarySortKeyInfo = secondarySortKeyInfo;
        }

        public void process() throws FrontendException
        {
            List<PhysicalOperator> roots = mPlan.getRoots();
            for (PhysicalOperator root : roots) {
                columnChainInfo = new ColumnChainInfo();
                processRoot(root);
            }
        }

        public void processRoot(PhysicalOperator root) throws FrontendException {
            PhysicalOperator currentNode = root;
            while (currentNode!=null) {
                boolean sawInvalidPhysicalOper = false;
                if (currentNode instanceof PODistinct)
                    sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
                else if (currentNode instanceof POSort)
                    sawInvalidPhysicalOper = processSort((POSort)currentNode);
                else if (currentNode instanceof POProject)
                    sawInvalidPhysicalOper = processProject((POProject)currentNode);
                else if (currentNode instanceof POUserFunc ||
                         currentNode instanceof POUnion ||
                         // We don't process foreach, since foreach is too complex to get right
                         currentNode instanceof POForEach)
                    break;

                if (sawInvalidPhysicalOper)
                    break;

                List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
                if (succs==null)
                    currentNode = null;
                else {
                    if (succs.size()>1) {
                        int errorCode = 2215;
                        throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
                                errorCode);
                    }
                    currentNode = succs.get(0);
                }
            }
        }

        // We see PODistinct, check which key it is using
        public boolean processDistinct(PODistinct distinct) throws FrontendException {
            SortKeyInfo keyInfos = new SortKeyInfo();
            try {
                keyInfos.insertColumnChainInfo(0,
                        (ColumnChainInfo) columnChainInfo.clone(), true);
            } catch (CloneNotSupportedException e) { // We implement Clonable,
                                                     // impossible to get here
            }

            // if it is part of main key
            for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
                if (sortKeyInfo.moreSpecificThan(keyInfos)) {
                    distinctsToChange.add(distinct);
                    return false;
                }
            }

            // if it is part of secondary key
            if (secondarySortKeyInfo != null
                    && secondarySortKeyInfo.moreSpecificThan(keyInfos)) {
                distinctsToChange.add(distinct);
                return false;
            }

            // Now set the secondary key
            if (secondarySortKeyInfo == null) {
                distinctsToChange.add(distinct);
                secondarySortKeyInfo = keyInfos;
            }
            return false;
        }

        // Accumulate column info
        public boolean processProject(POProject project) throws FrontendException {
            columnChainInfo.insertInReduce(project);
            return false;
        }

        // We see POSort, check which key it is using
        public boolean processSort(POSort sort) throws FrontendException{
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java [449:552]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private static class SecondaryKeyDiscover {
        PhysicalPlan mPlan;
        
        List<POSort> sortsToRemove = new ArrayList<POSort>();

        List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();

        List<SortKeyInfo> sortKeyInfos;

        SortKeyInfo secondarySortKeyInfo;

        ColumnChainInfo columnChainInfo = null;

        // PhysicalPlan here is foreach inner plan
        SecondaryKeyDiscover(PhysicalPlan plan,
                List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) {
            this.mPlan = plan;
            this.sortKeyInfos = sortKeyInfos;
            this.secondarySortKeyInfo = secondarySortKeyInfo;
        }
        
        public void process() throws FrontendException
        {
            List<PhysicalOperator> roots = mPlan.getRoots();
            for (PhysicalOperator root : roots) {
                columnChainInfo = new ColumnChainInfo();
                processRoot(root);
            }
        }
        
        public void processRoot(PhysicalOperator root) throws FrontendException {
            PhysicalOperator currentNode = root;
            while (currentNode!=null) {
                boolean sawInvalidPhysicalOper = false;
                if (currentNode instanceof PODistinct)
                    sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
                else if (currentNode instanceof POSort)
                    sawInvalidPhysicalOper = processSort((POSort)currentNode);
                else if (currentNode instanceof POProject)
                    sawInvalidPhysicalOper = processProject((POProject)currentNode);
                else if (currentNode instanceof POUserFunc ||
                         currentNode instanceof POUnion ||
                         // We don't process foreach, since foreach is too complex to get right
                         currentNode instanceof POForEach)
                    break;
                
                if (sawInvalidPhysicalOper)
                    break;
                
                List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
                if (succs==null)
                    currentNode = null;
                else {
                    if (succs.size()>1) {
                        int errorCode = 2215;
                        throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
                                errorCode);
                    }
                    currentNode = succs.get(0);
                }
            }
        }

        // We see PODistinct, check which key it is using
        public boolean processDistinct(PODistinct distinct) throws FrontendException {
            SortKeyInfo keyInfos = new SortKeyInfo();
            try {
                keyInfos.insertColumnChainInfo(0,
                        (ColumnChainInfo) columnChainInfo.clone(), true);
            } catch (CloneNotSupportedException e) { // We implement Clonable,
                                                     // impossible to get here
            }

            // if it is part of main key
            for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
                if (sortKeyInfo.moreSpecificThan(keyInfos)) {
                    distinctsToChange.add(distinct);
                    return false;
                }
            }

            // if it is part of secondary key
            if (secondarySortKeyInfo != null
                    && secondarySortKeyInfo.moreSpecificThan(keyInfos)) {
                distinctsToChange.add(distinct);
                return false;
            }

            // Now set the secondary key
            if (secondarySortKeyInfo == null) {
                distinctsToChange.add(distinct);
                secondarySortKeyInfo = keyInfos;
            }
            return false;
        }

        // Accumulate column info
        public boolean processProject(POProject project) throws FrontendException {
            columnChainInfo.insertInReduce(project);
            return false;
        }

        // We see POSort, check which key it is using
        public boolean processSort(POSort sort) throws FrontendException{
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



