in src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java [76:182]
public boolean check(OperatorPlan matched) throws FrontendException {
// the foreach with flatten can be swapped with an order by
// as the order by will have lesser number of records to sort
// also the sort does not alter the records that are processed
// the foreach with flatten can be pushed down a cross or a join
// for the same reason. In this case the foreach has to be first
// unflattened and then a new foreach has to be inserted after
// the cross or join. In both cross and foreach the actual columns
// from the foreach are not altered but positions might be changed
// in the case of union the column is transformed and as a result
// the foreach flatten cannot be pushed down
// for distinct the output before flattening and the output
// after flattening might be different. For example, consider
// {(1), (1)}. Distinct of this bag is still {(1), (1)}.
// distinct(flatten({(1), (1)})) is (1). However,
// flatten(distinct({(1), (1)})) is (1), (1)
// in both cases correctness is not affected
LOForEach foreach = (LOForEach)matched.getSources().get(0);
LOGenerate gen = OptimizerUtils.findGenerate( foreach );
if( !OptimizerUtils.hasFlatten( gen ) )
return false;
// If a foreach contains a nondeterministic udf, we shouldn't push it down.
for (LogicalExpressionPlan p : gen.getOutputPlans()) {
if (OptimizerUtils.planHasNonDeterministicUdf(p))
return false;
}
List<Operator> succs = currentPlan.getSuccessors( foreach );
if( succs == null || succs.size() != 1 )
return false;
List<Long> uids = getNonFlattenFieldUids( gen );
Operator succ = succs.get( 0 );
if( !( succ instanceof LOSort || succ instanceof LOJoin || succ instanceof LOCross ) )
return false;
if( succ instanceof LOSort ) {
// Check if the expressions for the foreach generate are purely projection including flatten fields.
List<LogicalExpressionPlan> exprs = gen.getOutputPlans();
for( LogicalExpressionPlan expr : exprs ) {
if( !isPureProjection( expr ) )
return false;
}
// Check if flatten fields are required by the successor.
LOSort sort = (LOSort)succ;
List<LogicalExpressionPlan> exps = sort.getSortColPlans();
for( int i = 0; i < exps.size(); i++ ) {
LogicalExpressionPlan exp = exps.get( i );
ProjectExpression proj = (ProjectExpression)exp.getOperators().next();
if( !uids.contains( proj.getFieldSchema().uid ) )
return false;
}
return true;
} else {
List<Operator> preds = currentPlan.getPredecessors( succ );
// We do not optimize if peer is ForEach with flatten. This is
// a simplification, may change in the future.
for( Operator op : preds ) {
if( op == foreach )
continue;
else if( op instanceof LOForEach &&
OptimizerUtils.hasFlatten( OptimizerUtils.findGenerate( (LOForEach)op ) ) )
return false;
}
if( ( (LogicalRelationalOperator)succ ).getSchema() == null )
return false;
if( succ instanceof LOCross ) {
return true;
} else {
LOJoin join = (LOJoin)succ;
for( int i = 0; i < preds.size(); i++ ) {
Operator op = preds.get( i );
if( op == foreach ) {
// Don't optimize if the flattened side is outer side of an outer join
// See PIG-3826
if (join.getInnerFlags()[i]==false) {
return false;
}
Collection<LogicalExpressionPlan> exprs = join.getJoinPlan( i );
for( LogicalExpressionPlan expr : exprs ) {
List<ProjectExpression> projs = getProjectExpressions( expr );
for( ProjectExpression proj : projs ) {
if( !uids.contains( proj.getFieldSchema().uid ) ) {
return false;
}
}
}
break;
}
}
return true;
}
}
}