in src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java [291:441]
public void visit(LOForEach foreach) throws FrontendException {
if (!columnPrune) {
return;
}
// get column numbers from input uids
Set<Long> inputUids = (Set<Long>)foreach.getAnnotation(ColumnPruneHelper.INPUTUIDS);
// Get all top level projects
LogicalPlan innerPlan = foreach.getInnerPlan();
List<LOInnerLoad> innerLoads= new ArrayList<LOInnerLoad>();
List<Operator> sources = innerPlan.getSources();
for (Operator s : sources) {
if (s instanceof LOInnerLoad)
innerLoads.add((LOInnerLoad)s);
}
// If project of the innerLoad is not in INPUTUIDS, remove this innerLoad
Set<LOInnerLoad> innerLoadsToRemove = new HashSet<LOInnerLoad>();
for (LOInnerLoad innerLoad: innerLoads) {
ProjectExpression project = innerLoad.getProjection();
if (project.isProjectStar()) {
LogicalSchema.LogicalFieldSchema tupleFS = project.getFieldSchema();
// Check the first component of the star projection
long uid = tupleFS.schema.getField(0).uid;
if (!inputUids.contains(uid))
innerLoadsToRemove.add(innerLoad);
}
else {
if (!inputUids.contains(project.getFieldSchema().uid))
innerLoadsToRemove.add(innerLoad);
}
}
// Find the logical operator immediate precede LOGenerate which should be removed (the whole branch)
Set<LogicalRelationalOperator> branchHeadToRemove = new HashSet<LogicalRelationalOperator>();
for (LOInnerLoad innerLoad : innerLoadsToRemove) {
Operator op = innerLoad;
while (!(innerPlan.getSuccessors(op).get(0) instanceof LOGenerate)) {
op = innerPlan.getSuccessors(op).get(0);
}
branchHeadToRemove.add((LogicalRelationalOperator)op);
}
// Find the expression plan to remove
LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
List<LogicalExpressionPlan> genPlansToRemove = new ArrayList<LogicalExpressionPlan>();
List<LogicalExpressionPlan> genPlans = gen.getOutputPlans();
for (int i=0;i<genPlans.size();i++) {
LogicalExpressionPlan expPlan = genPlans.get(i);
List<Operator> expSources = expPlan.getSinks();
for (Operator expSrc : expSources) {
if (expSrc instanceof ProjectExpression) {
LogicalRelationalOperator reference = ((ProjectExpression)expSrc).findReferent();
if (branchHeadToRemove.contains(reference)) {
genPlansToRemove.add(expPlan);
}
}
}
}
// Build the temporary structure based on genPlansToRemove, which include:
// * flattenList
// * outputPlanSchemas
// * uidOnlySchemas
// * inputsRemoved
// We first construct inputsNeeded, and inputsRemoved = (all inputs) - inputsNeeded.
// We cannot figure out inputsRemoved directly since the inputs may be used by other output plan.
// We can only get inputsRemoved after visiting all output plans.
List<Boolean> flattenList = new ArrayList<Boolean>();
Set<Integer> inputsNeeded = new HashSet<Integer>();
Set<Integer> inputsRemoved = new HashSet<Integer>();
List<LogicalSchema> outputPlanSchemas = new ArrayList<LogicalSchema>();
List<LogicalSchema> uidOnlySchemas = new ArrayList<LogicalSchema>();
List<LogicalSchema> userDefinedSchemas = null;
if (gen.getUserDefinedSchema()!=null)
userDefinedSchemas = new ArrayList<LogicalSchema>();
for (int i=0;i<genPlans.size();i++) {
LogicalExpressionPlan genPlan = genPlans.get(i);
if (!genPlansToRemove.contains(genPlan)) {
flattenList.add(gen.getFlattenFlags()[i]);
outputPlanSchemas.add(gen.getOutputPlanSchemas().get(i));
uidOnlySchemas.add(gen.getUidOnlySchemas().get(i));
if (gen.getUserDefinedSchema()!=null) {
userDefinedSchemas.add(gen.getUserDefinedSchema().get(i));
}
List<Operator> sinks = genPlan.getSinks();
for(Operator s: sinks) {
if (s instanceof ProjectExpression) {
inputsNeeded.add(((ProjectExpression)s).getInputNum());
}
}
}
}
List<Operator> preds = innerPlan.getPredecessors(gen);
if (preds!=null) { // otherwise, all gen plan are based on constant, no need to adjust
for (int i=0;i<preds.size();i++) {
if (!inputsNeeded.contains(i))
inputsRemoved.add(i);
}
}
// Change LOGenerate: remove unneeded output expression plan
// change flatten flag, outputPlanSchema, uidOnlySchemas
boolean[] flatten = new boolean[flattenList.size()];
for (int i=0;i<flattenList.size();i++)
flatten[i] = flattenList.get(i);
gen.setFlattenFlags(flatten);
gen.setOutputPlanSchemas(outputPlanSchemas);
gen.setUidOnlySchemas(uidOnlySchemas);
gen.setUserDefinedSchema(userDefinedSchemas);
for (LogicalExpressionPlan genPlanToRemove : genPlansToRemove) {
genPlans.remove(genPlanToRemove);
}
// shift project input
if (!inputsRemoved.isEmpty()) {
for (LogicalExpressionPlan genPlan : genPlans) {
List<Operator> sinks = genPlan.getSinks();
for(Operator s: sinks) {
if (s instanceof ProjectExpression) {
int input = ((ProjectExpression)s).getInputNum();
int numToShift = 0;
for (int i :inputsRemoved) {
if (i<input)
numToShift++;
}
((ProjectExpression)s).setInputNum(input-numToShift);
}
}
}
}
// Prune unneeded LOInnerLoad
List<LogicalRelationalOperator> predToRemove = new ArrayList<LogicalRelationalOperator>();
for (int i : inputsRemoved) {
predToRemove.add((LogicalRelationalOperator)preds.get(i));
}
for (LogicalRelationalOperator pred : predToRemove) {
removeSubTree(pred);
}
}