public void visit()

in src/org/apache/pig/newplan/logical/visitor/ProjectStarExpander.java [253:458]


    public void visit(LOForEach foreach) throws FrontendException{
        //in case of LOForeach , expand when inner plan has a single project-star
        // and its input LOInnerLoad also is a project-star
        // then Reset the input number in project expressions
        
        LogicalPlan innerPlan = foreach.getInnerPlan();
        
        //visit the inner plan first
        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
        pushWalker(newWalker);
        currentWalker.walk(this);
        popWalker();
        
        //get the LOGenerate
        List<Operator> feOutputs = innerPlan.getSinks();
        LOGenerate gen = null;
        for( Operator op  : feOutputs){
            if(op instanceof LOGenerate){
                if(gen != null){
                    String msg = "Expected single LOGenerate output in innerplan of foreach";
                    throw new VisitorException(foreach,
                            msg,
                            2266,
                            PigException.BUG
                    );
                }
                gen = (LOGenerate) op;
            }
        }
        
        //work on the generate plan, flatten and user schema
        List<LogicalExpressionPlan> expPlans = gen.getOutputPlans();
        List<LogicalExpressionPlan> newExpPlans = new ArrayList<LogicalExpressionPlan>();
        
        List<Operator> loGenPreds = innerPlan.getPredecessors(gen);
        
        if(loGenPreds == null){
            // there are no LOInnerLoads , must be working on just constants
            // no project-star expansion to be done
            return;
        }
        
        List<LogicalSchema> userSchema = gen.getUserDefinedSchema();
        List<LogicalSchema> newUserSchema = null;
        if(userSchema != null){
            newUserSchema = new ArrayList<LogicalSchema>();
        }
        
        boolean[] flattens = gen.getFlattenFlags();
        List<Boolean> newFlattens = new ArrayList<Boolean>(flattens.length);

        //get mapping of LOGenerate predecessor current position to object
        Map<Integer, LogicalRelationalOperator> oldPos2Rel =
            new HashMap<Integer, LogicalRelationalOperator>();
        
        for(int i=0; i<loGenPreds.size(); i++){
            oldPos2Rel.put(i, (LogicalRelationalOperator) loGenPreds.get(i));
        }
        
        //get schema of predecessor, project-star expansion needs a schema
        LogicalRelationalOperator pred =
            (LogicalRelationalOperator) foreach.getPlan().getPredecessors(foreach).get(0);
        LogicalSchema inpSch = pred.getSchema();
 
        //store mapping between the projection in inner plans of
        // of LOGenerate to the input relation object
        Map<ProjectExpression, LogicalRelationalOperator> proj2InpRel =
            new HashMap<ProjectExpression, LogicalRelationalOperator>();
        
        
        for(int i=0; i<expPlans.size(); i++){
            LogicalExpressionPlan expPlan = expPlans.get(i);
            ProjectExpression projStar = getProjectLonelyStar(expPlan, oldPos2Rel);

            boolean foundExpandableProject = false;
            if(projStar != null){              
                //there is a project-star to be expanded

                LogicalSchema userStarSch = null;
                if(userSchema != null && userSchema.get(i) != null){
                    userStarSch = userSchema.get(i);
                }


                //the range values are set in the project in LOInnerLoad
                ProjectExpression loInnerProj = ((LOInnerLoad)oldPos2Rel.get(projStar.getInputNum())).getProjection();

                int firstProjCol = 0;
                int lastProjCol = 0;
                
                if(loInnerProj.isRangeProject()){
                    loInnerProj.setColumnNumberFromAlias();
                    firstProjCol = loInnerProj.getStartCol();
                    lastProjCol = loInnerProj.getEndCol();
                }

                
                boolean isProjectToEnd = loInnerProj.isProjectStar() || 
                    (loInnerProj.isRangeProject() && lastProjCol == -1); 
                
                //can't expand if there is no input schema, and this is
                // as project star or project-range-to-end
                if( !(inpSch == null && isProjectToEnd) ){
                    
                    foundExpandableProject = true;

                    if(isProjectToEnd)
                        lastProjCol = inpSch.size() - 1;

                    //replacing the existing project star with new ones
                    expPlan.remove(projStar);

                    //remove the LOInnerLoad with star
                    LOInnerLoad oldLOInnerLoad = (LOInnerLoad)oldPos2Rel.get(projStar.getInputNum());
                    innerPlan.disconnect(oldLOInnerLoad, gen);
                    innerPlan.remove(oldLOInnerLoad);


                    //generate new exp plan, inner load for each field in schema
                    for(int j = firstProjCol; j <= lastProjCol; j++){

                        //add new LOInnerLoad
                        LOInnerLoad newInLoad = new LOInnerLoad(innerPlan, foreach, j);
                        innerPlan.add(newInLoad);
                        innerPlan.connect(newInLoad, gen);


                        // new expression plan and proj
                        LogicalExpressionPlan newExpPlan = new LogicalExpressionPlan();
                        newExpPlans.add(newExpPlan);

                        ProjectExpression newProj =
                            new ProjectExpression(newExpPlan, -2, -1, gen);

                        proj2InpRel.put(newProj, newInLoad);

                        newFlattens.add(flattens[i]);
                        if(newUserSchema != null ){
                            //index into user specified schema
                            int schIdx = j - firstProjCol;
                            if(userStarSch != null 
                                    && userStarSch.getFields().size() > schIdx
                                    && userStarSch.getField(schIdx) != null){

                                //if the project-star field has user specified schema, use the
                                // j'th field for this column
                                LogicalSchema sch = new LogicalSchema();
                                sch.addField(new LogicalFieldSchema(userStarSch.getField(schIdx)));
                                newUserSchema.add(sch);
                            }
                            else{
                                newUserSchema.add(null);
                            }
                        }
                    }
                }
            }

            if(!foundExpandableProject){ //no project-star that could be expanded

                //get all projects in here 
                FindProjects findProjs = new FindProjects(expPlan);
                findProjs.visit();
                List<ProjectExpression> projs = findProjs.getProjs();

                //create a mapping of project expression to their inputs
                for(ProjectExpression proj : projs){
                    proj2InpRel.put(proj, oldPos2Rel.get(proj.getInputNum()));
                }

                newExpPlans.add(expPlan);

                newFlattens.add(flattens[i]);
                if(newUserSchema != null)
                    newUserSchema.add(userSchema.get(i));

            }
        }

        //get mapping of LoGenerate input relation to current position
        Map<LogicalRelationalOperator, Integer> rel2pos = new HashMap<LogicalRelationalOperator, Integer>();
        List<Operator> newGenPreds = innerPlan.getPredecessors(gen);
        int numNewGenPreds = 0;
        if(newGenPreds != null)
            numNewGenPreds = newGenPreds.size();
            
        for(int i=0; i<numNewGenPreds; i++){
            rel2pos.put((LogicalRelationalOperator) newGenPreds.get(i),i);
        }
        
        //correct the input num for projects
        for(Entry<ProjectExpression, LogicalRelationalOperator> projAndInp : proj2InpRel.entrySet()){
           ProjectExpression proj = projAndInp.getKey();
           LogicalRelationalOperator rel = projAndInp.getValue();
           proj.setInputNum(rel2pos.get(rel));
        }
        
        // set the new lists
        gen.setOutputPlans(newExpPlans);
        gen.setFlattenFlags(Booleans.toArray(newFlattens));
        gen.setUserDefinedSchema(newUserSchema);
        
        gen.resetSchema();
        foreach.resetSchema();
        
    }