in src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java [1142:1347]
public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
try{
if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2) {
int errCode=1101;
throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
}
curTezOp = phyToTezOpMap.get(joinOp.getInputs().get(0));
TezOperator rightTezOpr = null;
TezOperator rightTezOprAggr = null;
if (curTezOp.equals(compiledInputs[0])) {
rightTezOpr = compiledInputs[1];
} else {
rightTezOpr = compiledInputs[0];
}
// We will first operate on right side which is indexer job.
// First yank plan of the compiled right input and set that as an inner plan of right operator.
PhysicalPlan rightPipelinePlan;
if(!rightTezOpr.closed){
PhysicalPlan rightPlan = rightTezOpr.plan;
if(rightPlan.getRoots().size() != 1){
int errCode = 2171;
String errMsg = "Expected one but found more then one root physical operator in physical plan.";
throw new TezCompilerException(errMsg,errCode,PigException.BUG);
}
PhysicalOperator rightLoader = rightPlan.getRoots().get(0);
if(! (rightLoader instanceof POLoad)){
int errCode = 2172;
String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
throw new TezCompilerException(errMsg,errCode);
}
if (rightPlan.getSuccessors(rightLoader) == null || rightPlan.getSuccessors(rightLoader).isEmpty())
// Load - Join case.
rightPipelinePlan = null;
else{ // We got something on right side. Yank it and set it as inner plan of right input.
rightPipelinePlan = rightPlan.clone();
PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
rightPipelinePlan.disconnect(root, rightPipelinePlan.getSuccessors(root).get(0));
rightPipelinePlan.remove(root);
rightPlan.trimBelow(rightLoader);
}
}
else{
int errCode = 2022;
String msg = "Right input plan have been closed. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
joinOp.setupRightPipeline(rightPipelinePlan);
// At this point, we must be operating on input plan of right input and it would contain nothing else other then a POLoad.
POLoad rightLoader = (POLoad)rightTezOpr.plan.getRoots().get(0);
joinOp.setSignature(rightLoader.getSignature());
LoadFunc rightLoadFunc = rightLoader.getLoadFunc();
List<String> udfs = new ArrayList<String>();
if(IndexableLoadFunc.class.isAssignableFrom(rightLoadFunc.getClass())) {
joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
udfs.add(rightLoader.getLFile().getFuncSpec().toString());
// we don't need the right TezOper since
// the right loader is an IndexableLoadFunc which can handle the index
// itself
tezPlan.remove(rightTezOpr);
if(rightTezOpr == compiledInputs[0]) {
compiledInputs[0] = null;
} else if(rightTezOpr == compiledInputs[1]) {
compiledInputs[1] = null;
}
rightTezOpr = null;
// validate that the join keys in merge join are only
// simple column projections or '*' and not expression - expressions
// cannot be handled when the index is built by the storage layer on the sorted
// data when the sorted data (and corresponding index) is written.
// So merge join will be restricted not have expressions as
// join keys
int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
for(int i = 0; i < numInputs; i++) {
List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
for (PhysicalPlan keyPlan : keyPlans) {
for(PhysicalOperator op : keyPlan) {
if(!(op instanceof POProject)) {
int errCode = 1106;
String errMsg = "Merge join is possible only for simple column or '*' join keys when using " +
rightLoader.getLFile().getFuncSpec() + " as the loader";
throw new TezCompilerException(errMsg, errCode, PigException.INPUT);
}
}
}
}
} else {
joinOp = new POMergeJoinTez(joinOp);
LoadFunc loadFunc = rightLoader.getLoadFunc();
//Replacing POLoad with indexer is disabled for 'merge-sparse' joins. While
// this feature would be useful, the current implementation of
// DefaultIndexableLoader
//is not designed to handle multiple calls to seekNear. Specifically, it rereads the entire index
//for each call. Some refactoring of this class is required - and then the check below could be removed.
if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
int errCode = 1104;
String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
"The specified loader " + loadFunc + " doesn't implement it";
throw new TezCompilerException(errMsg,errCode);
}
// Replace POLoad with indexer.
if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
int errCode = 1104;
String errMsg = "Right input of merge-join must implement " +
"OrderedLoadFunc interface. The specified loader "
+ loadFunc + " doesn't implement it";
throw new TezCompilerException(errMsg,errCode);
}
String[] indexerArgs = new String[6];
List<PhysicalPlan> rightInpPlans = joinOp.getInnerPlansOf(1);
FileSpec origRightLoaderFileSpec = rightLoader.getLFile();
indexerArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
indexerArgs[1] = ObjectSerializer.serialize((Serializable)rightInpPlans);
indexerArgs[2] = ObjectSerializer.serialize(rightPipelinePlan);
indexerArgs[3] = rightLoader.getSignature();
indexerArgs[4] = rightLoader.getOperatorKey().scope;
indexerArgs[5] = Boolean.toString(true);
FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
rightLoader.setLFile(lFile);
// Loader of operator will return a tuple of form -
// (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
rightTezOprAggr = getTezOp();
tezPlan.add(rightTezOprAggr);
TezCompilerUtil.simpleConnectTwoVertex(tezPlan, rightTezOpr, rightTezOprAggr, scope, nig);
rightTezOprAggr.setRequestedParallelism(1); // we need exactly one task for indexing job.
rightTezOprAggr.setDontEstimateParallelism(true);
// Convert the index as a broadcast input
POValueOutputTez rightTezOprAggrOutput = new POValueOutputTez(OperatorKey.genOpKey(scope));
rightTezOprAggr.plan.addAsLeaf(rightTezOprAggrOutput);
rightTezOprAggrOutput.addOutputKey(curTezOp.getOperatorKey().toString());
TezEdgeDescriptor edge = new TezEdgeDescriptor();
TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp, edge);
((POMergeJoinTez) joinOp).setInputKey(rightTezOprAggr.getOperatorKey().toString());
// set up the TezIndexableLoader for the join operator
String[] tezIndexableLoaderArgs = new String[3];
tezIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
tezIndexableLoaderArgs[1] = joinOp.getOperatorKey().scope;
tezIndexableLoaderArgs[2] = origRightLoaderFileSpec.getFileName();
joinOp.setRightLoaderFuncSpec(
(new FuncSpec(TezIndexableLoader.class.getName(), tezIndexableLoaderArgs)));
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
}
if(joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
curTezOp.markMergeSparseJoin();
} else {
curTezOp.markMergeJoin();
}
// We are done with right side. Lets work on left now.
// Join will be materialized in leftTezOper.
if (!curTezOp.isClosed()) {// Life is easy
curTezOp.plan.addAsLeaf(joinOp);
}
else{
int errCode = 2022;
String msg = "Input plan has been closed. This is unexpected while compiling.";
throw new PlanException(msg, errCode, PigException.BUG);
}
if(rightTezOprAggr != null) {
rightTezOprAggr.markIndexer();
}
phyToTezOpMap.put(joinOp, curTezOp);
// no combination of small splits as there is currently no way to guarantee the sortness
// of the combined splits.
curTezOp.noCombineSmallSplits();
curTezOp.UDFs.addAll(udfs);
}
catch(PlanException e){
int errCode = 2034;
String msg = "Error compiling operator " + joinOp.getClass().getCanonicalName();
throw new TezCompilerException(msg, errCode, PigException.BUG, e);
}
catch (IOException e){
int errCode = 3000;
String errMsg = "IOException caught while compiling POMergeJoin";
throw new TezCompilerException(errMsg, errCode,e);
}
catch(CloneNotSupportedException e){
int errCode = 2127;
String errMsg = "Cloning exception caught while compiling POMergeJoin";
throw new TezCompilerException(errMsg, errCode, PigException.BUG, e);
}
}