in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java [2085:2249]
private MapReduceOper getSortJob(
POSort sort,
MapReduceOper quantJob,
FileSpec lFile,
FileSpec quantFile,
int rp,
Pair<POProject, Byte>[] fields) throws PlanException{
MapReduceOper mro = startNew(lFile, quantJob);
mro.setQuantFile(quantFile.getFileName());
mro.setGlobalSort(true);
mro.requestedParallelism = rp;
long limit = sort.getLimit();
mro.limit = limit;
List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
byte keyType = DataType.UNKNOWN;
boolean[] sortOrder;
List<Boolean> sortOrderList = sort.getMAscCols();
if(sortOrderList != null) {
sortOrder = new boolean[sortOrderList.size()];
for(int i = 0; i < sortOrderList.size(); ++i) {
sortOrder[i] = sortOrderList.get(i);
}
mro.setSortOrder(sortOrder);
}
if (fields == null) {
// This is project *
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prj.setStar(true);
prj.setOverloaded(false);
prj.setResultType(DataType.TUPLE);
ep.add(prj);
eps1.add(ep);
} else {
/*
for (int i : fields) {
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,
nig.getNextNodeId(scope)));
prj.setColumn(i);
prj.setOverloaded(false);
prj.setResultType(DataType.BYTEARRAY);
ep.add(prj);
eps1.add(ep);
}
*/
// Attach the sort plans to the local rearrange to get the
// projection.
eps1.addAll(sort.getSortPlans());
// Visit the first sort plan to figure out our key type. We only
// have to visit the first because if we have more than one plan,
// then the key type will be tuple.
try {
FindKeyTypeVisitor fktv =
new FindKeyTypeVisitor(sort.getSortPlans().get(0));
fktv.visit();
keyType = fktv.keyType;
} catch (VisitorException ve) {
int errCode = 2035;
String msg = "Internal error. Could not compute key type of sort operator.";
throw new PlanException(msg, errCode, PigException.BUG, ve);
}
}
POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
try {
lr.setIndex(0);
} catch (ExecException e) {
int errCode = 2058;
String msg = "Unable to set index on newly created POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
keyType);
lr.setPlans(eps1);
lr.setResultType(DataType.TUPLE);
lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
mro.mapPlan.addAsLeaf(lr);
mro.setMapDone(true);
if (limit!=-1) {
POPackage pkg_c = new POPackage(new OperatorKey(scope,
nig.getNextNodeId(scope)));
LitePackager pkgr = new LitePackager();
pkgr.setKeyType((fields.length > 1) ? DataType.TUPLE : keyType);
pkg_c.setPkgr(pkgr);
pkg_c.setNumInps(1);
mro.combinePlan.add(pkg_c);
List<PhysicalPlan> eps_c1 = new ArrayList<PhysicalPlan>();
List<Boolean> flat_c1 = new ArrayList<Boolean>();
PhysicalPlan ep_c1 = new PhysicalPlan();
POProject prj_c1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prj_c1.setColumn(1);
prj_c1.setOverloaded(false);
prj_c1.setResultType(DataType.BAG);
ep_c1.add(prj_c1);
eps_c1.add(ep_c1);
flat_c1.add(true);
POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
-1, eps_c1, flat_c1);
fe_c1.setResultType(DataType.TUPLE);
mro.combinePlan.addAsLeaf(fe_c1);
POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit.setLimit(limit);
mro.combinePlan.addAsLeaf(pLimit);
List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
eps_c2.addAll(sort.getSortPlans());
POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
try {
lr_c2.setIndex(0);
} catch (ExecException e) {
int errCode = 2058;
String msg = "Unable to set index on newly created POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
lr_c2.setPlans(eps_c2);
lr_c2.setResultType(DataType.TUPLE);
mro.combinePlan.addAsLeaf(lr_c2);
}
POPackage pkg = new POPackage(new OperatorKey(scope,
nig.getNextNodeId(scope)));
LitePackager pkgr = new LitePackager();
pkgr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE : keyType);
pkg.setPkgr(pkgr);
pkg.setNumInps(1);
mro.reducePlan.add(pkg);
PhysicalPlan ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prj.setColumn(1);
prj.setOverloaded(false);
prj.setResultType(DataType.BAG);
ep.add(prj);
List<PhysicalPlan> eps2 = new ArrayList<PhysicalPlan>();
eps2.add(ep);
List<Boolean> flattened = new ArrayList<Boolean>();
flattened.add(true);
POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps2,flattened);
mro.reducePlan.add(nfe1);
mro.reducePlan.connect(pkg, nfe1);
mro.phyToMRMap.put(sort, nfe1);
if (limit!=-1)
{
POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
pLimit2.setLimit(limit);
mro.reducePlan.addAsLeaf(pLimit2);
mro.phyToMRMap.put(sort, pLimit2);
}
return mro;
}