in src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java [282:391]
public Result getNextTuple() throws ExecException {
inp = null;
Result res = ERR_RESULT;
while (true) {
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) {
break;
}
if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
}
for (PhysicalPlan ep : plans) {
ep.attachInput((Tuple)inp.result);
}
List<Result> resLst = new ArrayList<Result>();
if (secondaryPlans!=null) {
for (PhysicalPlan ep : secondaryPlans) {
ep.attachInput((Tuple)inp.result);
}
}
List<Result> secondaryResLst = null;
if (secondaryLeafOps!=null) {
secondaryResLst = new ArrayList<Result>();
}
for (ExpressionOperator op : leafOps){
switch(op.getResultType()){
case DataType.BAG:
case DataType.BOOLEAN:
case DataType.BYTEARRAY:
case DataType.CHARARRAY:
case DataType.DOUBLE:
case DataType.FLOAT:
case DataType.INTEGER:
case DataType.LONG:
case DataType.BIGINTEGER:
case DataType.BIGDECIMAL:
case DataType.DATETIME:
case DataType.MAP:
case DataType.TUPLE:
res = op.getNext(op.getResultType());
break;
default:
log.error("Invalid result type: " + DataType.findType(op.getResultType()));
break;
}
if (res.returnStatus != POStatus.STATUS_OK) {
return res;
}
resLst.add(res);
}
if (secondaryLeafOps!=null)
{
for (ExpressionOperator op : secondaryLeafOps){
switch(op.getResultType()){
case DataType.BAG:
case DataType.BOOLEAN:
case DataType.BYTEARRAY:
case DataType.CHARARRAY:
case DataType.DOUBLE:
case DataType.BIGINTEGER:
case DataType.BIGDECIMAL:
case DataType.FLOAT:
case DataType.INTEGER:
case DataType.LONG:
case DataType.DATETIME:
case DataType.MAP:
case DataType.TUPLE:
res = op.getNext(op.getResultType());
break;
default:
log.error("Invalid result type: " + DataType.findType(op.getResultType()));
break;
}
// allow null as group by key
if (res.returnStatus != POStatus.STATUS_OK && res.returnStatus != POStatus.STATUS_NULL) {
return new Result();
}
secondaryResLst.add(res);
}
}
// If we are using secondary sort key, our new key is:
// (nullable, index, (key, secondary key), value)
res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);
res.returnStatus = POStatus.STATUS_OK;
detachPlans(plans);
if(secondaryPlans != null) {
detachPlans(secondaryPlans);
}
res.result = illustratorMarkup(inp.result, res.result, 0);
return res;
}
return inp;
}