public void visit()

in src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java [80:218]


    public void visit(LOLoad load) throws FrontendException {
        if(! requiredItems.containsKey( load ) ) {
            return;
        }

        Pair<Map<Integer,Set<String>>,Set<Integer>> required =
            requiredItems.get(load);

        RequiredFieldList requiredFields = new RequiredFieldList();

        LogicalSchema s = load.getSchema();
        for (int i=0;i<s.size();i++) {
            RequiredField requiredField = null;
            // As we have done processing ahead, we assume that 
            // a column is not present in both ColumnPruner and 
            // MapPruner
            if( required.first != null && required.first.containsKey(i) ) {
                requiredField = new RequiredField();
                requiredField.setIndex(i);
                requiredField.setAlias(s.getField(i).alias);
                requiredField.setType(s.getField(i).type);
                List<RequiredField> subFields = new ArrayList<RequiredField>();
                for( String key : required.first.get(i) ) {
                    RequiredField subField = new RequiredField(key,-1,null,DataType.BYTEARRAY);
                    subFields.add(subField);
                }
                requiredField.setSubFields(subFields);
                requiredFields.add(requiredField);
            }
            if( required.second != null && required.second.contains(i) ) {
                requiredField = new RequiredField();
                requiredField.setIndex(i);
                requiredField.setAlias(s.getField(i).alias);
                requiredField.setType(s.getField(i).type);
                requiredFields.add(requiredField);
            }
        }

        boolean[] columnRequired = new boolean[s.size()];
        for (RequiredField rf : requiredFields.getFields())
            columnRequired[rf.getIndex()] = true;

        List<Pair<Integer, Integer>> pruneList = new ArrayList<Pair<Integer, Integer>>();
        for (int i=0;i<columnRequired.length;i++)
        {
            if (!columnRequired[i])
                pruneList.add(new Pair<Integer, Integer>(0, i));
        }
        StringBuffer message = new StringBuffer();
        if (pruneList.size()!=0)
        {
            message.append("Columns pruned for " + load.getAlias() + ": ");
            for (int i=0;i<pruneList.size();i++)
            {
                message.append("$"+pruneList.get(i).second);
                if (i!=pruneList.size()-1)
                    message.append(", ");
            }
            log.info(message);
        }

        message = new StringBuffer();
        for(RequiredField rf: requiredFields.getFields()) {
            List<RequiredField> sub = rf.getSubFields();
            if (sub != null) {
                message.append("Map key required for " + load.getAlias() + ": $" + rf.getIndex() + "->" + sub + "\n");
            }
        }
        if (message.length()!=0)
            log.info(message);

        LoadPushDown.RequiredFieldResponse response = null;
        try {
            LoadFunc loadFunc = load.getLoadFunc();
            if (loadFunc instanceof LoadPushDown) {
                response = ((LoadPushDown)loadFunc).pushProjection(requiredFields);
            }

        } catch (FrontendException e) {
            log.warn("pushProjection on "+load+" throw an exception, skip it");
        }

        // Loader does not support column pruning, insert foreach
        if (columnPrune) {
            if (response==null || !response.getRequiredFieldResponse()) {
                LogicalPlan p = (LogicalPlan)load.getPlan();
                Operator next = p.getSuccessors(load).get(0);
                // if there is already a LOForEach after load, we don't need to
                // add another LOForEach
                if (next instanceof LOForEach) {
                    return;
                }

                LOForEach foreach = new LOForEach(load.getPlan());

                // add foreach to the base plan
                p.add(foreach);

                p.insertBetween(load, foreach, next);

                LogicalPlan innerPlan = new LogicalPlan();
                foreach.setInnerPlan(innerPlan);

                // build foreach inner plan
                List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
                LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[requiredFields.getFields().size()]);
                innerPlan.add(gen);

                for (int i=0; i<requiredFields.getFields().size(); i++) {
                    LoadPushDown.RequiredField rf = requiredFields.getFields().get(i);
                    LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, rf.getIndex());
                    innerPlan.add(innerLoad);
                    innerPlan.connect(innerLoad, gen);

                    LogicalExpressionPlan exp = new LogicalExpressionPlan();
                    ProjectExpression prj = new ProjectExpression(exp, i, -1, gen);
                    exp.add(prj);
                    exps.add(exp);
                }

            } else {
                // columns are pruned, reset schema for LOLoader
                List<Integer> requiredIndexes = new ArrayList<Integer>();
                List<LoadPushDown.RequiredField> fieldList = requiredFields.getFields();
                for (int i=0; i<fieldList.size(); i++) {
                    requiredIndexes.add(fieldList.get(i).getIndex());
                }

                load.setRequiredFields(requiredIndexes);

                LogicalSchema newSchema = new LogicalSchema();
                for (int i=0; i<fieldList.size(); i++) {
                    newSchema.addField(s.getField(fieldList.get(i).getIndex()));
                }

                load.setSchema(newSchema);
            }
        }
    }