in src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java [1146:1445]
public void visit(LOJoin loj) throws FrontendException {
String scope = DEFAULT_SCOPE;
// List of join predicates
List<Operator> inputs = loj.getPlan().getPredecessors(loj);
// mapping of inner join physical plans corresponding to inner physical operators.
MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new LinkedMultiMap<PhysicalOperator, PhysicalPlan>();
// Outer list corresponds to join predicates. Inner list is inner physical plan of each predicate.
List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
// List of physical operator corresponding to join predicates.
List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
// Outer list corresponds to join predicates and inner list corresponds to type of keys for each predicate.
List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();
boolean[] innerFlags = loj.getInnerFlags();
String alias = loj.getAlias();
SourceLocation location = loj.getLocation();
int parallel = loj.getRequestedParallelism();
for (int i=0;i<inputs.size();i++) {
Operator op = inputs.get(i);
PhysicalOperator physOp = logToPhyMap.get(op);
inp.add(physOp);
List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>)loj.getJoinPlan(i);
List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
ppLists.add(exprPlans);
joinPlans.put(physOp, exprPlans);
// Key could potentially be a tuple. So, we visit all exprPlans to get types of members of tuples.
List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
for(PhysicalPlan exprPlan : exprPlans)
tupleKeyMemberTypes.add(exprPlan.getLeaves().get(0).getResultType());
keyTypes.add(tupleKeyMemberTypes);
}
if (loj.getJoinType() == LOJoin.JOINTYPE.SKEWED) {
POSkewedJoin skj;
try {
skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
parallel,inp, innerFlags);
skj.addOriginalLocation(alias, location);
skj.setJoinPlans(joinPlans);
}
catch (Exception e) {
int errCode = 2015;
String msg = "Skewed Join creation failed";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
skj.setResultType(DataType.TUPLE);
for (int i=0; i < inputs.size(); i++) {
Operator op = inputs.get(i);
if (!innerFlags[i]) {
try {
LogicalSchema s = ((LogicalRelationalOperator)op).getSchema();
// if the schema cannot be determined
if (s == null) {
throw new FrontendException(loj, "Cannot determine skewed join schema", 2247);
}
skj.addSchema(Util.translateSchema(s));
} catch (FrontendException e) {
int errCode = 2015;
String msg = "Couldn't set the schema for outer join" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
} else {
// This will never be retrieved. It just guarantees that the index will be valid when
// MRCompiler is trying to read the schema
skj.addSchema(null);
}
}
currentPlan.add(skj);
for (Operator op : inputs) {
try {
currentPlan.connect(logToPhyMap.get(op), skj);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
logToPhyMap.put(loj, skj);
}
else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
Schema[] inputSchemas = new Schema[inputs.size()];
Schema[] keySchemas = new Schema[inputs.size()];
outer: for (int i = 0; i < inputs.size(); i++) {
LogicalSchema logicalSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
if (logicalSchema == null) {
continue;
}
Schema toGen = Schema.getPigSchema(new ResourceSchema(logicalSchema));
// This registers the value piece
SchemaTupleFrontend.registerToGenerateIfPossible(toGen, false, GenContext.FR_JOIN);
inputSchemas[i] = toGen;
Schema keyToGen = new Schema();
for (Byte byt : keyTypes.get(i)) {
// We cannot generate any nested code because that information is thrown away
if (byt == null || DataType.isComplex(byt.byteValue())) {
continue outer;
}
keyToGen.add(new FieldSchema(null, byt));
}
SchemaTupleFrontend.registerToGenerateIfPossible(keyToGen, false, GenContext.FR_JOIN);
keySchemas[i] = keyToGen;
}
int fragment = 0;
POFRJoin pfrj;
try {
boolean isLeftOuter = false;
// We dont check for bounds issue as we assume that a join
// involves atleast two inputs
isLeftOuter = !innerFlags[1];
Tuple nullTuple = null;
if( isLeftOuter ) {
try {
// We know that in a Left outer join its only a two way
// join, so we assume index of 1 for the right input
LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
// We check if we have a schema before the join
if(inputSchema == null) {
int errCode = 1109;
String msg = "Input (" + ((LogicalRelationalOperator)inputs.get(1)).getAlias() + ") " +
"on which outer join is desired should have a valid schema";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
}
// Using the schema we decide the number of columns/fields
// in the nullTuple
nullTuple = TupleFactory.getInstance().newTuple(inputSchema.size());
for(int j = 0; j < inputSchema.size(); j++) {
nullTuple.set(j, null);
}
} catch( FrontendException e ) {
int errCode = 2104;
String msg = "Error while determining the schema of input";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
parallel,
inp,
ppLists,
keyTypes,
null,
fragment,
isLeftOuter,
nullTuple,
inputSchemas,
keySchemas);
pfrj.addOriginalLocation(alias, location);
} catch (ExecException e1) {
int errCode = 2058;
String msg = "Unable to set index on newly create POLocalRearrange.";
throw new VisitorException(msg, errCode, PigException.BUG, e1);
}
pfrj.setResultType(DataType.TUPLE);
currentPlan.add(pfrj);
for (Operator op : inputs) {
try {
currentPlan.connect(logToPhyMap.get(op), pfrj);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
logToPhyMap.put(loj, pfrj);
} else if ( (loj.getJoinType() == LOJoin.JOINTYPE.MERGE || loj.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE)
&& (new MapSideMergeValidator().validateMapSideMerge(inputs,loj.getPlan()))) {
PhysicalOperator smj;
boolean usePOMergeJoin = inputs.size() == 2 && innerFlags[0] && innerFlags[1] ;
if(usePOMergeJoin){
// We register the merge join schema information for code generation
LogicalSchema logicalSchema = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
Schema leftSchema = null;
if (logicalSchema != null) {
leftSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
}
logicalSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
Schema rightSchema = null;
if (logicalSchema != null) {
rightSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
}
logicalSchema = loj.getSchema();
Schema mergedSchema = null;
if (logicalSchema != null) {
mergedSchema = Schema.getPigSchema(new ResourceSchema(logicalSchema));
}
if (leftSchema != null) {
SchemaTupleFrontend.registerToGenerateIfPossible(leftSchema, false, GenContext.MERGE_JOIN);
}
if (rightSchema != null) {
SchemaTupleFrontend.registerToGenerateIfPossible(rightSchema, false, GenContext.MERGE_JOIN);
}
if (mergedSchema != null) {
SchemaTupleFrontend.registerToGenerateIfPossible(mergedSchema, false, GenContext.MERGE_JOIN);
}
// inner join on two sorted inputs. We have less restrictive
// implementation here in a form of POMergeJoin which doesn't
// require loaders to implement collectable interface.
try {
smj = new POMergeJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
parallel,
inp,
joinPlans,
keyTypes,
loj.getJoinType(),
leftSchema,
rightSchema,
mergedSchema);
}
catch (PlanException e) {
int errCode = 2042;
String msg = "Merge Join creation failed";
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
logToPhyMap.put(loj, smj);
} else {
// in all other cases we fall back to POMergeCogroup + Flattening FEs
smj = compileToMergeCogrp(loj, loj.getExpressionPlans());
}
smj.setResultType(DataType.TUPLE);
currentPlan.add(smj);
smj.addOriginalLocation(alias, location);
for (Operator op : inputs) {
try {
currentPlan.connect(logToPhyMap.get(op), smj);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
if(!usePOMergeJoin){
// Now create and configure foreach which will flatten the output
// of cogroup.
POForEach fe = compileFE4Flattening(innerFlags, scope, parallel, alias, location, inputs);
currentPlan.add(fe);
try {
currentPlan.connect(smj, fe);
} catch (PlanException e) {
throw new LogicalToPhysicalTranslatorException(e.getMessage(),e.getErrorCode(),e.getErrorSource(),e);
}
logToPhyMap.put(loj, fe);
}
return;
}
else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){
POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans());
POForEach fe = compileFE4Flattening(innerFlags, scope, parallel, alias, location, inputs);
currentPlan.add(fe);
try {
currentPlan.connect(poPackage, fe);
} catch (PlanException e) {
throw new LogicalToPhysicalTranslatorException(e.getDetailedMessage(),
e.getErrorCode(),e.getErrorSource(),e);
}
logToPhyMap.put(loj, fe);
if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) {
if (innerFlags.length == 2) {
if (innerFlags[0] == false && innerFlags[1] == false) {
throw new LogicalToPhysicalTranslatorException(
"Error at " + loj.getLocation() + " with alias "+ loj.getAlias() +
". Bloom join cannot be used with a FULL OUTER join.",
1109,
PigException.INPUT);
}
}
poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN);
} else {
poPackage.getPkgr().setPackageType(PackageType.JOIN);
}
}
translateSoftLinks(loj);
}