in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java [1810:1978]
public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
try {
if (compiledInputs.length != 2) {
int errCode = 2255;
throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.", errCode);
}
//change plan to store the first join input into a temp file
FileSpec fSpec = getTempFileSpec();
MapReduceOper mro = compiledInputs[0];
POStore str = getStore();
str.setSFile(fSpec);
if (!mro.isMapDone()) {
mro.mapPlan.addAsLeaf(str);
mro.setMapDoneSingle(true);
} else if (mro.isMapDone() && !mro.isReduceDone()) {
mro.reducePlan.addAsLeaf(str);
mro.setReduceDone(true);
} else {
int errCode = 2022;
String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
FileSpec partitionFile = getTempFileSpec();
int rp = op.getRequestedParallelism();
Pair<MapReduceOper, Integer> sampleJobPair = getSkewedJoinSampleJob(op, mro, fSpec, partitionFile, rp);
rp = sampleJobPair.second;
// set parallelism of SkewedJoin as the value calculated by sampling job
// if "parallel" is specified in join statement, "rp" is equal to that number
// if not specified, use the value that sampling process calculated
// based on default.
op.setRequestedParallelism(rp);
// load the temp file for first table as input of join
MapReduceOper[] joinInputs = new MapReduceOper[] {startNew(fSpec, sampleJobPair.first), compiledInputs[1]};
MapReduceOper[] rearrangeOutputs = new MapReduceOper[2];
compiledInputs = new MapReduceOper[] {joinInputs[0]};
// run POLocalRearrange for first join table
POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
try {
lr.setIndex(0);
} catch (ExecException e) {
int errCode = 2058;
String msg = "Unable to set index on newly created POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
List<PhysicalOperator> l = plan.getPredecessors(op);
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
List<PhysicalPlan> groups = joinPlans.get(l.get(0));
// check the type of group keys, if there are more than one field, the key is TUPLE.
byte type = DataType.TUPLE;
if (groups.size() == 1) {
type = groups.get(0).getLeaves().get(0).getResultType();
}
lr.setKeyType(type);
lr.setPlans(groups);
lr.setResultType(DataType.TUPLE);
lr.visit(this);
if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
curMROp.requestedParallelism = lr.getRequestedParallelism();
rearrangeOutputs[0] = curMROp;
compiledInputs = new MapReduceOper[] {joinInputs[1]};
// if the map for current input is already closed, then start a new job
if (compiledInputs[0].isMapDone() && !compiledInputs[0].isReduceDone()) {
FileSpec f = getTempFileSpec();
POStore s = getStore();
s.setSFile(f);
compiledInputs[0].reducePlan.addAsLeaf(s);
compiledInputs[0].setReduceDone(true);
compiledInputs[0] = startNew(f, compiledInputs[0]);
}
// run POPartitionRearrange for second join table
POPartitionRearrange pr =
new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
pr.setPigContext(pigContext);
lr = pr;
try {
lr.setIndex(1);
} catch (ExecException e) {
int errCode = 2058;
String msg = "Unable to set index on newly created POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
groups = joinPlans.get(l.get(1));
lr.setPlans(groups);
lr.setKeyType(type);
lr.setResultType(DataType.BAG);
lr.visit(this);
if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
curMROp.requestedParallelism = lr.getRequestedParallelism();
rearrangeOutputs[1] = curMROp;
compiledInputs = rearrangeOutputs;
// create POGlobalRearrange
POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
// Skewed join has its own special partitioner
gr.setResultType(DataType.TUPLE);
gr.visit(this);
if(gr.getRequestedParallelism() > curMROp.requestedParallelism)
curMROp.requestedParallelism = gr.getRequestedParallelism();
compiledInputs = new MapReduceOper[] {curMROp};
// create POPakcage
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
Packager pkgr = pkg.getPkgr();
pkgr.setKeyType(type);
pkg.setResultType(DataType.TUPLE);
pkg.setNumInps(2);
boolean [] inner = op.getInnerFlags();
pkgr.setInner(inner);
pkg.visit(this);
compiledInputs = new MapReduceOper[] {curMROp};
// create POForEach
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
List<Boolean> flat = new ArrayList<Boolean>();
PhysicalPlan ep;
// Add corresponding POProjects
for (int i=0; i < 2; i++ ) {
ep = new PhysicalPlan();
POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prj.setColumn(i+1);
prj.setOverloaded(false);
prj.setResultType(DataType.BAG);
ep.add(prj);
eps.add(ep);
if (!inner[i]) {
// Add an empty bag for outer join.
if (i == 0) {
// For right outer, add IsFirstReduceOfKey UDF as well
CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKey.class.getName());
} else {
CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), false, IsFirstReduceOfKey.class.getName());
}
}
flat.add(true);
}
POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
fe.setResultType(DataType.TUPLE);
fe.visit(this);
curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
phyToMROpMap.put(op, curMROp);
}catch(PlanException e) {
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}catch(IOException e) {
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
}