public void visit()

in src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java [291:441]


    public void visit(LOForEach foreach) throws FrontendException {
        if (!columnPrune) {
            return;
        }

        // get column numbers from input uids
        Set<Long> inputUids = (Set<Long>)foreach.getAnnotation(ColumnPruneHelper.INPUTUIDS);

        // Get all top level projects
        LogicalPlan innerPlan = foreach.getInnerPlan();
        List<LOInnerLoad> innerLoads= new ArrayList<LOInnerLoad>();
        List<Operator> sources = innerPlan.getSources();
        for (Operator s : sources) {
            if (s instanceof LOInnerLoad)
                innerLoads.add((LOInnerLoad)s);
        }

        // If project of the innerLoad is not in INPUTUIDS, remove this innerLoad
        Set<LOInnerLoad> innerLoadsToRemove = new HashSet<LOInnerLoad>();
        for (LOInnerLoad innerLoad: innerLoads) {
            ProjectExpression project = innerLoad.getProjection();
            if (project.isProjectStar()) {
                LogicalSchema.LogicalFieldSchema tupleFS = project.getFieldSchema();
                // Check the first component of the star projection
                long uid = tupleFS.schema.getField(0).uid;
                if (!inputUids.contains(uid))
                    innerLoadsToRemove.add(innerLoad);
            }
            else {
                if (!inputUids.contains(project.getFieldSchema().uid))
                    innerLoadsToRemove.add(innerLoad);
            }
        }

        // Find the logical operator immediate precede LOGenerate which should be removed (the whole branch)
        Set<LogicalRelationalOperator> branchHeadToRemove = new HashSet<LogicalRelationalOperator>();
        for (LOInnerLoad innerLoad : innerLoadsToRemove) {
            Operator op = innerLoad;
            while (!(innerPlan.getSuccessors(op).get(0) instanceof LOGenerate)) {
                op = innerPlan.getSuccessors(op).get(0);
            }
            branchHeadToRemove.add((LogicalRelationalOperator)op);
        }

        // Find the expression plan to remove
        LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
        List<LogicalExpressionPlan> genPlansToRemove = new ArrayList<LogicalExpressionPlan>();

        List<LogicalExpressionPlan> genPlans = gen.getOutputPlans();
        for (int i=0;i<genPlans.size();i++) {
            LogicalExpressionPlan expPlan = genPlans.get(i);
            List<Operator> expSources = expPlan.getSinks();

            for (Operator expSrc : expSources) {
                if (expSrc instanceof ProjectExpression) {
                    LogicalRelationalOperator reference = ((ProjectExpression)expSrc).findReferent();
                    if (branchHeadToRemove.contains(reference)) {
                        genPlansToRemove.add(expPlan);
                    }
                }
            }
        }

        // Build the temporary structure based on genPlansToRemove, which include:
        // * flattenList
        // * outputPlanSchemas
        // * uidOnlySchemas
        // * inputsRemoved
        //     We first construct inputsNeeded, and inputsRemoved = (all inputs) - inputsNeeded.
        //     We cannot figure out inputsRemoved directly since the inputs may be used by other output plan.
        //     We can only get inputsRemoved after visiting all output plans.
        List<Boolean> flattenList = new ArrayList<Boolean>();
        Set<Integer> inputsNeeded = new HashSet<Integer>();
        Set<Integer> inputsRemoved = new HashSet<Integer>();
        List<LogicalSchema> outputPlanSchemas = new ArrayList<LogicalSchema>();
        List<LogicalSchema> uidOnlySchemas = new ArrayList<LogicalSchema>();
        List<LogicalSchema> userDefinedSchemas = null;

        if (gen.getUserDefinedSchema()!=null)
            userDefinedSchemas = new ArrayList<LogicalSchema>();

        for (int i=0;i<genPlans.size();i++) {
            LogicalExpressionPlan genPlan = genPlans.get(i);
            if (!genPlansToRemove.contains(genPlan)) {
                flattenList.add(gen.getFlattenFlags()[i]);
                outputPlanSchemas.add(gen.getOutputPlanSchemas().get(i));
                uidOnlySchemas.add(gen.getUidOnlySchemas().get(i));
                if (gen.getUserDefinedSchema()!=null) {
                    userDefinedSchemas.add(gen.getUserDefinedSchema().get(i));
                }
                List<Operator> sinks = genPlan.getSinks();
                for(Operator s: sinks) {
                    if (s instanceof ProjectExpression) {
                        inputsNeeded.add(((ProjectExpression)s).getInputNum());
                    }
                }
            }
        }

        List<Operator> preds = innerPlan.getPredecessors(gen);

        if (preds!=null) {  // otherwise, all gen plan are based on constant, no need to adjust
            for (int i=0;i<preds.size();i++) {
                if (!inputsNeeded.contains(i))
                    inputsRemoved.add(i);
            }
        }


        // Change LOGenerate: remove unneeded output expression plan
        // change flatten flag, outputPlanSchema, uidOnlySchemas
        boolean[] flatten = new boolean[flattenList.size()];
        for (int i=0;i<flattenList.size();i++)
            flatten[i] = flattenList.get(i);

        gen.setFlattenFlags(flatten);
        gen.setOutputPlanSchemas(outputPlanSchemas);
        gen.setUidOnlySchemas(uidOnlySchemas);
        gen.setUserDefinedSchema(userDefinedSchemas);

        for (LogicalExpressionPlan genPlanToRemove : genPlansToRemove) {
            genPlans.remove(genPlanToRemove);
        }

        // shift project input
        if (!inputsRemoved.isEmpty()) {
            for (LogicalExpressionPlan genPlan : genPlans) {
                List<Operator> sinks = genPlan.getSinks();
                for(Operator s: sinks) {
                    if (s instanceof ProjectExpression) {
                        int input = ((ProjectExpression)s).getInputNum();
                        int numToShift = 0;
                        for (int i :inputsRemoved) {
                            if (i<input)
                                numToShift++;
                        }
                        ((ProjectExpression)s).setInputNum(input-numToShift);
                    }
                }
            }
        }

        // Prune unneeded LOInnerLoad
        List<LogicalRelationalOperator> predToRemove = new ArrayList<LogicalRelationalOperator>();
        for (int i : inputsRemoved) {
            predToRemove.add((LogicalRelationalOperator)preds.get(i));
        }
        for (LogicalRelationalOperator pred : predToRemove) {
            removeSubTree(pred);
        }
    }