in src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java [172:381]
public void visitTezOp(TezOperator tezOp) throws VisitorException {
if (!tezOp.isUnion()) {
return;
}
if (!isOptimizable(tezOp)) {
return;
}
TezOperator unionOp = tezOp;
String scope = unionOp.getOperatorKey().scope;
PhysicalPlan unionOpPlan = unionOp.plan;
Set<OperatorKey> uniqueUnionMembers = new HashSet<OperatorKey>(unionOp.getUnionMembers());
List<TezOperator> predecessors = new ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null ? null
: new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
if (uniqueUnionMembers.size() != 1) {
if (!isOptimizableStoreFunc(tezOp, supportedStoreFuncs, unsupportedStoreFuncs)) {
return;
}
if (successors != null) {
for (TezOperator succ : successors) {
for (TezOperator pred : predecessors) {
if (succ.inEdges.containsKey(pred.getOperatorKey())) {
// Stop here, we cannot convert the node into vertex group
// Otherwise, we will end up with a parallel edge between pred
// and succ
return;
}
}
}
}
// TODO: PIG-3856 Handle replicated join and skewed join sample.
// Replicate join small table/skewed join sample that was broadcast to union vertex
// now needs to be broadcast to all the union predecessors. How do we do that??
// Wait for shared edge and do it or write multiple times??
// For now don't optimize except in the case of Split where we need to write only once
if (predecessors.size() > unionOp.getUnionMembers().size()) {
return;
}
}
if (uniqueUnionMembers.size() == 1) {
// We actually don't need VertexGroup in this case. The multiple
// sub-plans of Split can write to same MROutput or the Tez LogicalOutput
OperatorKey splitPredKey = uniqueUnionMembers.iterator().next();
TezOperator splitPredOp = tezPlan.getOperator(splitPredKey);
PhysicalPlan splitPredPlan = splitPredOp.plan;
if (splitPredPlan.getLeaves().get(0) instanceof POSplit) { //It has to be. But check anyways
for( TezOperator op : predecessors ) {
if( !op.getOperatorKey().equals(splitPredKey)) {
Set<TezOperator> allNonMemberPredecessorsAncestors = new HashSet<TezOperator>();
TezCompilerUtil.addAllPredecessors(tezPlan, op, allNonMemberPredecessorsAncestors);
// If any of the nonMemberPredecessor's ancestors(recursive predecessor)
// is from the single unionmember, then we stop the merge effort to avoid creating
// an illegal loop.
if( allNonMemberPredecessorsAncestors.contains(splitPredOp) ) {
return;
}
}
}
try {
connectUnionNonMemberPredecessorsToSplit(unionOp, splitPredOp, predecessors);
// Remove POShuffledValueInputTez from union plan root
unionOpPlan.remove(unionOpPlan.getRoots().get(0));
// Clone union plan into split subplans
for (int i=0; i < Collections.frequency(unionOp.getUnionMembers(), splitPredKey); i++ ) {
cloneAndMergeUnionPlan(unionOp, splitPredOp);
}
copyOperatorProperties(splitPredOp, unionOp);
tezPlan.disconnect(splitPredOp, unionOp);
connectSplitOpToUnionSuccessors(unionOp, splitPredOp, successors);
} catch (PlanException e) {
throw new VisitorException(e);
}
//Remove union operator from the plan
tezPlan.remove(unionOp);
return;
} else {
throw new VisitorException("Expected POSplit but found " + splitPredPlan.getLeaves().get(0));
}
}
// Create vertex group operator for each store. Union followed by Split
// followed by Store could have multiple stores
List<POStoreTez> unionStoreOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
TezOperator[] storeVertexGroupOps = new TezOperator[unionStoreOutputs.size()];
for (int i=0; i < storeVertexGroupOps.length; i++) {
TezOperator existingVertexGroup = null;
if (successors != null) {
for (TezOperator succ : successors) {
if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) {
existingVertexGroup = succ;
break;
}
}
}
if (existingVertexGroup == null) {
// In the case of union + split + union + store, the different stores in the Split
// will be writing to same location after second union operator is optimized.
// So while optimizing the first union, we should just make it write to one vertex group
for (int j = 0; j < i; j++) {
if (unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile())) {
storeVertexGroupOps[i] = storeVertexGroupOps[j];
break;
}
}
if (storeVertexGroupOps[i] != null) {
continue;
}
storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i)));
storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
storeVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers()));
tezPlan.add(storeVertexGroupOps[i]);
} else {
storeVertexGroupOps[i] = existingVertexGroup;
existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
}
}
// Create vertex group operator for each output. Case of split, orderby,
// skewed join, rank, etc will have multiple outputs
List<TezOutput> unionOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, TezOutput.class);
// One TezOutput can write to multiple LogicalOutputs (POCounterTez, POValueOutputTez, etc)
List<String> unionOutputKeys = new ArrayList<String>();
for (TezOutput output : unionOutputs) {
if (output instanceof POStoreTez) {
continue;
}
for (String key : output.getTezOutputs()) {
unionOutputKeys.add(key);
}
}
TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()];
String[] newOutputKeys = new String[unionOutputKeys.size()];
for (int i=0; i < outputVertexGroupOps.length; i++) {
TezOperator existingVertexGroup = null;
if (successors != null) {
for (TezOperator succ : successors) {
if (succ.isVertexGroup()
&& unionOutputKeys.get(i).equals(succ.getVertexGroupInfo().getOutput()) ) {
existingVertexGroup = succ;
break;
}
}
}
if (existingVertexGroup == null) {
for (int j = 0; j < i; j++) {
if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) {
outputVertexGroupOps[i] = outputVertexGroupOps[j];
break;
}
}
if (outputVertexGroupOps[i] != null) {
continue;
}
outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
outputVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers()));
} else {
outputVertexGroupOps[i] = existingVertexGroup;
existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
}
newOutputKeys[i] = outputVertexGroupOps[i].getOperatorKey().toString();
tezPlan.add(outputVertexGroupOps[i]);
}
// Change plan from Predecessors -> Union -> Successor(s) to
// Predecessors -> Vertex Group(s) -> Successor(s)
try {
// Remove POShuffledValueInputTez from union plan root
unionOpPlan.remove(unionOpPlan.getRoots().get(0));
for (OperatorKey predKey : unionOp.getUnionMembers()) {
TezOperator pred = tezPlan.getOperator(predKey);
PhysicalPlan clonePlan = cloneAndMergeUnionPlan(unionOp, pred);
connectPredecessorsToVertexGroups(unionOp, pred, clonePlan,
storeVertexGroupOps, outputVertexGroupOps);
}
connectVertexGroupsToSuccessors(unionOp, successors,
unionOutputKeys, outputVertexGroupOps);
replaceSuccessorInputsAndDisconnect(unionOp, successors, unionOutputKeys, newOutputKeys);
//Remove union operator from the plan
tezPlan.remove(unionOp);
} catch (VisitorException e) {
throw e;
} catch (Exception e) {
throw new VisitorException(e);
}
}