public boolean check()

in src/org/apache/pig/newplan/logical/rules/PushDownForEachFlatten.java [76:182]


        public boolean check(OperatorPlan matched) throws FrontendException {
            // the foreach with flatten can be swapped with an order by
            // as the order by will have lesser number of records to sort
            // also the sort does not alter the records that are processed
            
            // the foreach with flatten can be pushed down a cross or a join
            // for the same reason. In this case the foreach has to be first
            // unflattened and then a new foreach has to be inserted after
            // the cross or join. In both cross and foreach the actual columns
            // from the foreach are not altered but positions might be changed
            
            // in the case of union the column is transformed and as a result
            // the foreach flatten cannot be pushed down
            
            // for distinct the output before flattening and the output
            // after flattening might be different. For example, consider
            // {(1), (1)}. Distinct of this bag is still {(1), (1)}.
            // distinct(flatten({(1), (1)})) is (1). However,
            // flatten(distinct({(1), (1)})) is (1), (1)
            
            // in both cases correctness is not affected
            
            LOForEach foreach = (LOForEach)matched.getSources().get(0);
            LOGenerate gen = OptimizerUtils.findGenerate( foreach );
            
            if( !OptimizerUtils.hasFlatten( gen ) )
                return false;
            
            // If a foreach contains a nondeterministic udf, we shouldn't push it down.
            for (LogicalExpressionPlan p : gen.getOutputPlans()) {
                if (OptimizerUtils.planHasNonDeterministicUdf(p))
                    return false;
            }
            
            List<Operator> succs = currentPlan.getSuccessors( foreach );
            if( succs == null || succs.size() != 1 )
                return false;
            
            List<Long> uids = getNonFlattenFieldUids( gen );

            Operator succ = succs.get( 0  );
            if( !( succ instanceof LOSort || succ instanceof LOJoin || succ instanceof LOCross ) )
                return false;
            
            if( succ instanceof LOSort ) {
                // Check if the expressions for the foreach generate are purely projection including flatten fields.
                List<LogicalExpressionPlan> exprs = gen.getOutputPlans();
                for( LogicalExpressionPlan expr : exprs ) {
                    if( !isPureProjection( expr ) )
                        return false;
                }

                // Check if flatten fields are required by the successor.
                LOSort sort = (LOSort)succ;
                List<LogicalExpressionPlan> exps = sort.getSortColPlans();
                for( int i = 0; i < exps.size(); i++ ) {
                    LogicalExpressionPlan exp = exps.get( i );
                    ProjectExpression proj = (ProjectExpression)exp.getOperators().next();
                    if( !uids.contains( proj.getFieldSchema().uid ) )
                        return false;
                }

                return true;
            } else {
                List<Operator> preds = currentPlan.getPredecessors( succ );
                
                // We do not optimize if peer is ForEach with flatten. This is 
                // a simplification, may change in the future.
                for( Operator op : preds ) {
                    if( op == foreach )
                        continue;
                    else if( op instanceof LOForEach && 
                            OptimizerUtils.hasFlatten( OptimizerUtils.findGenerate( (LOForEach)op ) ) )
                        return false;
                }
                
                if( ( (LogicalRelationalOperator)succ ).getSchema() == null )
                    return false;
                
                if( succ instanceof LOCross ) {
                    return true;
                } else {
                    LOJoin join = (LOJoin)succ;
                    for( int i = 0; i < preds.size(); i++ ) {
                        Operator op = preds.get( i );
                        if( op == foreach ) {
                            // Don't optimize if the flattened side is outer side of an outer join
                            // See PIG-3826
                            if (join.getInnerFlags()[i]==false) {
                                return false;
                            }
                            Collection<LogicalExpressionPlan> exprs = join.getJoinPlan( i );
                            for( LogicalExpressionPlan expr : exprs ) {
                                List<ProjectExpression> projs = getProjectExpressions( expr );
                                for( ProjectExpression proj : projs ) {
                                    if( !uids.contains( proj.getFieldSchema().uid ) ) {
                                        return false;
                                    }
                                }
                            }
                            break;
                        }
                    }
                    return true;
                }
            }
        }