protected Result processPlan()

in src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java [349:530]


    protected Result processPlan() throws ExecException{
        if (schema != null && tupleMaker == null) {
            // Note here that if SchemaTuple is currently turned on, then any UDF's in the chain
            // must follow good practices. Namely, they should not append to the Tuple that comes
            // out of an iterator (a practice which is fairly common, but is not recommended).
            tupleMaker = SchemaTupleFactory.getInstance(schema, false, GenContext.FOREACH);
            if (tupleMaker != null) {
                knownSize = true;
            }
        }
        if (tupleMaker == null) {
            tupleMaker = TupleFactory.getInstance();
        }

        Result res = new Result();

        //We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
        if(its != null) {
            boolean restartIts = true;
            for(int i = 0; i < noItems; ++i) {
                if(its[i] != null && isToBeFlattenedArray[i] == true) {
                    restartIts &= !its[i].hasNext();
                }
            }
            //this means that all the databags have reached their last elements. so we need to force reading of fresh databags
            if(restartIts) {
                its = null;
                data = null;
            }
        }


        if(its == null) {
            if (endOfAllInputProcessed) {
                return RESULT_EOP;
            }
            //getNext being called for the first time OR starting with a set of new data from inputs
            its = new Iterator[noItems];
            bags = new Object[noItems];
            earlyTermination = new BitSet(noItems);

            for(int i = 0; i < noItems; ++i) {
                //Getting the iterators
                //populate the input data
                Result inputData = null;
                switch(resultTypes[i]) {
                case DataType.BAG:
                case DataType.TUPLE :
                case DataType.BYTEARRAY :
                case DataType.MAP :
                case DataType.BOOLEAN :
                case DataType.INTEGER :
                case DataType.DOUBLE :
                case DataType.LONG :
                case DataType.FLOAT :
                case DataType.BIGINTEGER :
                case DataType.BIGDECIMAL :
                case DataType.DATETIME :
                case DataType.CHARARRAY :
                    inputData = planLeafOps[i].getNext(resultTypes[i]);
                    break;
                default: {
                    int errCode = 2080;
                    String msg = "Foreach currently does not handle type " + DataType.findTypeName(resultTypes[i]);
                    throw new ExecException(msg, errCode, PigException.BUG);
                }

                }

                //we accrue information about what accumulators have early terminated
                //in the case that they all do, we can finish
                if (inputData.returnStatus == POStatus.STATUS_EARLY_TERMINATION) {
                    if (!earlyTermination.get(i))
                        earlyTermination.set(i);

                    continue;
                }

                if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) {
                    continue;
                }

                if(inputData.returnStatus == POStatus.STATUS_EOP) {
                    //we are done with all the elements. Time to return.
                    its = null;
                    bags = null;
                    return inputData;
                }
                // if we see a error just return it
                if(inputData.returnStatus == POStatus.STATUS_ERR) {
                    return inputData;
                }

//                Object input = null;

                bags[i] = inputData.result;

                if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
                    its[i] = ((DataBag)bags[i]).iterator();
                } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
                    its[i] = ((Map)bags[i]).entrySet().iterator();
                } else {
                    // This includes FLATTEN(null) for bag and map
                    // in addition to non-null values from other data types
                    its[i] = null;
                }
            }
            if (parentPlan!=null && parentPlan.endOfAllInput && endOfAllInputProcessing) {
                endOfAllInputProcessed = true;
            }
        }

        // if accumulating, we haven't got data yet for some fields, just return
        if (isAccumulative() && isAccumStarted()) {
            if (earlyTermination.cardinality() < noItems) {
            res.returnStatus = POStatus.STATUS_BATCH_OK;
            } else {
                res.returnStatus = POStatus.STATUS_EARLY_TERMINATION;
            }
            return res;
        }

        while(true) {
            if(data == null) {
                //getNext being called for the first time or starting on new input data
                //we instantiate the template array and start populating it with data
                data = new Object[noItems];
                for(int i = 0; i < noItems; ++i) {
                    if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) {
                        if(its[i].hasNext()) {
                            data[i] = its[i].next();
                        } else {
                            //the input set is null, so we return.  This is
                            // caught above and this function recalled with
                            // new inputs.
                            its = null;
                            data = null;
                            res.returnStatus = POStatus.STATUS_NULL;
                            return res;
                        }
                    } else {
                        data[i] = bags[i];
                    }

                }
                if(getReporter()!=null) {
                    getReporter().progress();
                }
                res.result = createTuple(data);
                res.returnStatus = POStatus.STATUS_OK;
                return res;
            } else {
                //we try to find the last expression which needs flattening and start iterating over it
                //we also try to update the template array
                for(int index = noItems - 1; index >= 0; --index) {
                    if(its[index] != null && isToBeFlattenedArray[index]) {
                        if(its[index].hasNext()) {
                            data[index] =  its[index].next();
                            res.result = createTuple(data);
                            res.returnStatus = POStatus.STATUS_OK;
                            return res;
                        }
                        else{
                            // reset this index's iterator so cross product can be achieved
                            // we would be resetting this way only for the indexes from the end
                            // when the first index which needs to be flattened has reached the
                            // last element in its iterator, we won't come here - instead, we reset
                            // all iterators at the beginning of this method.
                            if(bags[index] instanceof DataBag) {
                                its[index] = ((DataBag)bags[index]).iterator();
                            } else { // if (bags[i] instanceof Map)) {
                                its[index] = ((Map)bags[index]).entrySet().iterator();
                            }
                            data[index] = its[index].next();
                        }
                    }
                }
            }
        }

        //return null;
    }