in src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java [349:530]
protected Result processPlan() throws ExecException{
if (schema != null && tupleMaker == null) {
// Note here that if SchemaTuple is currently turned on, then any UDF's in the chain
// must follow good practices. Namely, they should not append to the Tuple that comes
// out of an iterator (a practice which is fairly common, but is not recommended).
tupleMaker = SchemaTupleFactory.getInstance(schema, false, GenContext.FOREACH);
if (tupleMaker != null) {
knownSize = true;
}
}
if (tupleMaker == null) {
tupleMaker = TupleFactory.getInstance();
}
Result res = new Result();
//We check if all the databags have exhausted the tuples. If so we enforce the reading of new data by setting data and its to null
if(its != null) {
boolean restartIts = true;
for(int i = 0; i < noItems; ++i) {
if(its[i] != null && isToBeFlattenedArray[i] == true) {
restartIts &= !its[i].hasNext();
}
}
//this means that all the databags have reached their last elements. so we need to force reading of fresh databags
if(restartIts) {
its = null;
data = null;
}
}
if(its == null) {
if (endOfAllInputProcessed) {
return RESULT_EOP;
}
//getNext being called for the first time OR starting with a set of new data from inputs
its = new Iterator[noItems];
bags = new Object[noItems];
earlyTermination = new BitSet(noItems);
for(int i = 0; i < noItems; ++i) {
//Getting the iterators
//populate the input data
Result inputData = null;
switch(resultTypes[i]) {
case DataType.BAG:
case DataType.TUPLE :
case DataType.BYTEARRAY :
case DataType.MAP :
case DataType.BOOLEAN :
case DataType.INTEGER :
case DataType.DOUBLE :
case DataType.LONG :
case DataType.FLOAT :
case DataType.BIGINTEGER :
case DataType.BIGDECIMAL :
case DataType.DATETIME :
case DataType.CHARARRAY :
inputData = planLeafOps[i].getNext(resultTypes[i]);
break;
default: {
int errCode = 2080;
String msg = "Foreach currently does not handle type " + DataType.findTypeName(resultTypes[i]);
throw new ExecException(msg, errCode, PigException.BUG);
}
}
//we accrue information about what accumulators have early terminated
//in the case that they all do, we can finish
if (inputData.returnStatus == POStatus.STATUS_EARLY_TERMINATION) {
if (!earlyTermination.get(i))
earlyTermination.set(i);
continue;
}
if (inputData.returnStatus == POStatus.STATUS_BATCH_OK) {
continue;
}
if(inputData.returnStatus == POStatus.STATUS_EOP) {
//we are done with all the elements. Time to return.
its = null;
bags = null;
return inputData;
}
// if we see a error just return it
if(inputData.returnStatus == POStatus.STATUS_ERR) {
return inputData;
}
// Object input = null;
bags[i] = inputData.result;
if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) {
its[i] = ((DataBag)bags[i]).iterator();
} else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) {
its[i] = ((Map)bags[i]).entrySet().iterator();
} else {
// This includes FLATTEN(null) for bag and map
// in addition to non-null values from other data types
its[i] = null;
}
}
if (parentPlan!=null && parentPlan.endOfAllInput && endOfAllInputProcessing) {
endOfAllInputProcessed = true;
}
}
// if accumulating, we haven't got data yet for some fields, just return
if (isAccumulative() && isAccumStarted()) {
if (earlyTermination.cardinality() < noItems) {
res.returnStatus = POStatus.STATUS_BATCH_OK;
} else {
res.returnStatus = POStatus.STATUS_EARLY_TERMINATION;
}
return res;
}
while(true) {
if(data == null) {
//getNext being called for the first time or starting on new input data
//we instantiate the template array and start populating it with data
data = new Object[noItems];
for(int i = 0; i < noItems; ++i) {
if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) {
if(its[i].hasNext()) {
data[i] = its[i].next();
} else {
//the input set is null, so we return. This is
// caught above and this function recalled with
// new inputs.
its = null;
data = null;
res.returnStatus = POStatus.STATUS_NULL;
return res;
}
} else {
data[i] = bags[i];
}
}
if(getReporter()!=null) {
getReporter().progress();
}
res.result = createTuple(data);
res.returnStatus = POStatus.STATUS_OK;
return res;
} else {
//we try to find the last expression which needs flattening and start iterating over it
//we also try to update the template array
for(int index = noItems - 1; index >= 0; --index) {
if(its[index] != null && isToBeFlattenedArray[index]) {
if(its[index].hasNext()) {
data[index] = its[index].next();
res.result = createTuple(data);
res.returnStatus = POStatus.STATUS_OK;
return res;
}
else{
// reset this index's iterator so cross product can be achieved
// we would be resetting this way only for the indexes from the end
// when the first index which needs to be flattened has reached the
// last element in its iterator, we won't come here - instead, we reset
// all iterators at the beginning of this method.
if(bags[index] instanceof DataBag) {
its[index] = ((DataBag)bags[index]).iterator();
} else { // if (bags[i] instanceof Map)) {
its[index] = ((Map)bags[index]).entrySet().iterator();
}
data[index] = its[index].next();
}
}
}
}
}
//return null;
}