public void visitTezOp()

in src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java [58:307]


    public void visitTezOp(TezOperator tezOp) throws VisitorException {
        try {
            if (!tezOp.isSplitter()) {
                return;
            }

            // Using a list instead of set to have consistently ordered plans
            List<TezOperator> splittees = new ArrayList<TezOperator>();
            Set<TezOperator> mergedNonPackageInputSuccessors = new HashSet<TezOperator>();

            // When there is a union successor with unsupported storefunc, those splittees
            // can only be merged into the split if all the union members will be from the split
            // This is to ensure that there are no vertex groups created with unsupported storefunc.
            Map<TezOperator, Set<OperatorKey>> tentativeMergeUnionMembers = new HashMap<TezOperator, Set<OperatorKey>>();

            List<TezOperator> successors = getPlan().getSuccessors(tezOp);

            Set<OperatorKey> splitterAndSuccessorKeys = new HashSet<OperatorKey>();
            splitterAndSuccessorKeys.add(tezOp.getOperatorKey());
            for (TezOperator successor : successors) {
                splitterAndSuccessorKeys.add(successor.getOperatorKey());
            }

            for (TezOperator successor : successors) {
                HashSet<TezOperator> predecessors = new HashSet<TezOperator>(getPlan().getPredecessors(successor));
                predecessors.remove(tezOp);
                if (!predecessors.isEmpty()) {
                    // If has other dependency that conflicts with other splittees, don't merge into split
                    // For eg: self replicate join/skewed join
                    // But if replicate input is from a different operator allow it, but ensure
                    // that we don't have more than one input coming from that operator into the split

                    // Check if other splittees or its predecessors (till the root) are not present in
                    // the predecessors (till the root) of this splittee.
                    // Need to check the whole predecessors hierarchy till root as the conflict
                    // could be multiple levels up
                    for (TezOperator predecessor : getPlan().getPredecessors(successor)) {
                        if (predecessor != tezOp) {
                            predecessors.add(predecessor);
                            TezCompilerUtil.addAllPredecessors(getPlan(), predecessor, predecessors);
                        }
                    }
                    Set<TezOperator> toMergeSuccPredecessors = new HashSet<TezOperator>(successors);
                    toMergeSuccPredecessors.remove(successor);
                    for (TezOperator splittee : splittees) {
                        for (TezOperator spliteePred : getPlan().getPredecessors(splittee)) {
                            if (spliteePred != tezOp) {
                                toMergeSuccPredecessors.add(spliteePred);
                                TezCompilerUtil.addAllPredecessors(getPlan(), spliteePred, toMergeSuccPredecessors);
                            }
                        }
                    }
                    if (predecessors.removeAll(toMergeSuccPredecessors)) {
                        continue;
                    }
                }

                // Split contains right input of different skewed joins
                if (successor.getSampleOperator() != null
                        && tezOp.getSampleOperator() != null
                        && !successor.getSampleOperator().equals(
                                tezOp.getSampleOperator())) {
                    continue;
                }

                // Detect diamond shape into successor operator, we cannot merge it into split,
                // since Tez does not handle double edge between vertexes
                // Successor could be
                //    - union operator (if no union optimizer changing it to vertex group which supports multiple edges)
                //    - self replicate join, self skewed join or scalar
                //    - POPackage (Self hash joins can write to same output edge and is handled by POShuffleTezLoad)
                Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>();
                // These successors should not be merged due to diamond shape
                Set<TezOperator> toNotMergeSuccessors = new HashSet<TezOperator>();
                // These successors can be merged
                Set<TezOperator> toMergeSuccessors = new HashSet<TezOperator>();
                // These successors (Scalar, POFRJoinTez) can be merged if they are the only input.
                // Only in case of POPackage(POShuffleTezLoad) multiple inputs can be handled from a Split
                Set<TezOperator> nonPackageInputSuccessors = new HashSet<TezOperator>();
                boolean canMerge = true;
                Set<TezOperator> successorUnsupportedStoreUnions = new HashSet<TezOperator>();

                mergedSuccessors.addAll(successors);
                for (TezOperator splittee : splittees) {
                    if (getPlan().getSuccessors(splittee) != null) {
                        mergedSuccessors.addAll(getPlan().getSuccessors(splittee));
                    }
                }
                if (getPlan().getSuccessors(successor) != null) {
                    nonPackageInputSuccessors.clear();
                    toMergeSuccessors.clear();
                    for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) {
                        if (succSuccessor.isUnion()) {
                            if (!(unionOptimizerOn &&
                                    UnionOptimizer.isOptimizable(succSuccessor))) {
                                toNotMergeSuccessors.add(succSuccessor);
                            } else {
                                if (unionOptimizerOn && !UnionOptimizer.isOptimizableStoreFunc(succSuccessor,unionSupportedStoreFuncs,unionUnsupportedStoreFuncs)) {
                                    // This optimization of using UnionOptimizer for unsupported storefunc
                                    // is only good for one level of split and does not handle multiple level of split.
                                    Set<OperatorKey> unionMembers = new HashSet<OperatorKey>(succSuccessor.getUnionMembers());
                                    unionMembers.removeAll(splitterAndSuccessorKeys);
                                    if(unionMembers.isEmpty()) {
                                        successorUnsupportedStoreUnions.add(succSuccessor);
                                    } else {
                                        toNotMergeSuccessors.add(succSuccessor);
                                        continue;
                                    }
                                }
                                if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) {
                                    // Output goes to scalar or POFRJoinTez in the union operator
                                    // We need to ensure it is the only one to avoid parallel edges
                                    canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false;
                                } else {
                                    toMergeSuccessors.add(succSuccessor);
                                }
                                List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor);
                                if (unionSuccessors != null) {
                                    for (TezOperator unionSuccessor : unionSuccessors) {
                                        if (TezCompilerUtil.isNonPackageInput(succSuccessor.getOperatorKey().toString(), unionSuccessor)) {
                                            canMerge = canMerge ? nonPackageInputSuccessors.add(unionSuccessor) : false;
                                        } else {
                                            toMergeSuccessors.add(unionSuccessor);
                                        }
                                    }
                                }
                            }
                        } else if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) {
                            // Output goes to scalar or POFRJoinTez instead of POPackage
                            // POPackage/POShuffleTezLoad can handle multiple inputs from a Split.
                            // But if input is sent to any other operator like
                            // scalar, POFRJoinTez then we need to ensure it is the only one.
                            canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false;
                        } else {
                            toMergeSuccessors.add(succSuccessor);
                        }
                    }
                }

                if (canMerge) {
                    if (!nonPackageInputSuccessors.isEmpty() || !mergedNonPackageInputSuccessors.isEmpty()) {
                        // If a non-POPackage input successor is already merged or
                        // if there is a POPackage and non-POPackage to be merged,
                        // then skip as it will become diamond shape
                        // For eg: POFRJoinTez+Scalar, POFRJoinTez/Scalar+POPackage
                        if (nonPackageInputSuccessors.removeAll(mergedSuccessors)
                                || toMergeSuccessors.removeAll(mergedNonPackageInputSuccessors)
                                || toMergeSuccessors.removeAll(nonPackageInputSuccessors)) {
                            continue;
                        }
                    }
                } else {
                    continue;
                }

                mergedSuccessors.retainAll(toNotMergeSuccessors);
                if (mergedSuccessors.isEmpty()) { // no shared edge after merge
                    mergedNonPackageInputSuccessors.addAll(nonPackageInputSuccessors);
                    if (successorUnsupportedStoreUnions.isEmpty()) {
                        splittees.add(successor);
                    } else {
                        // If all other conditions were satisfied, but it had a successor union
                        // with unsupported storefunc keep it in the tentative list
                        for (TezOperator unionOp : successorUnsupportedStoreUnions) {
                            Set<OperatorKey> tentativeSuccessors = tentativeMergeUnionMembers.get(unionOp);
                            if (tentativeSuccessors == null) {
                                tentativeSuccessors = new HashSet<OperatorKey>();
                                tentativeMergeUnionMembers.put(unionOp, tentativeSuccessors);
                            }
                            tentativeSuccessors.add(successor.getOperatorKey());
                        }
                    }
                }
            }

            Set<TezOperator> spliteesToRemove = new HashSet<TezOperator>();

            for (Entry<TezOperator, Set<OperatorKey>> entry : tentativeMergeUnionMembers.entrySet()) {
                Set<OperatorKey> unionMembers = new HashSet<OperatorKey>(entry.getKey().getUnionMembers());
                if (entry.getValue().containsAll(unionMembers)) {
                    // If all the union members were tentative splittees then add them
                    for (OperatorKey key : entry.getValue()) {
                        TezOperator splittee = getPlan().getOperator(key);
                        if (!splittees.contains(splittee)) {
                            splittees.add(splittee);
                        }
                    }
                } else {
                    for (OperatorKey key : entry.getValue()) {
                        spliteesToRemove.add(getPlan().getOperator(key));
                    }
                }
            }

            for (TezOperator op : spliteesToRemove) {
                splittees.remove(op);
            }

            if (splittees.size() == 0) {
                return;
            }

            if (splittees.size()==1 && successors.size()==1) {
                // We don't need a POSplit here, we can merge the splittee into spliter
                PhysicalOperator firstNodeLeaf = tezOp.plan.getLeaves().get(0);
                PhysicalOperator firstNodeLeafPred  = tezOp.plan.getPredecessors(firstNodeLeaf).get(0);

                TezOperator singleSplitee = splittees.get(0);
                PhysicalOperator secondNodeRoot =  singleSplitee.plan.getRoots().get(0);
                PhysicalOperator secondNodeSucc = singleSplitee.plan.getSuccessors(secondNodeRoot).get(0);

                tezOp.plan.remove(firstNodeLeaf);
                singleSplitee.plan.remove(secondNodeRoot);

                tezOp.plan.merge(singleSplitee.plan);
                tezOp.plan.connect(firstNodeLeafPred, secondNodeSucc);

                addSubPlanPropertiesToParent(tezOp, singleSplitee);

                removeSplittee(getPlan(), tezOp, singleSplitee);
            } else {
                POValueOutputTez valueOutput = (POValueOutputTez)tezOp.plan.getLeaves().get(0);
                POSplit split = new POSplit(OperatorKey.genOpKey(valueOutput.getOperatorKey().getScope()));
                split.copyAliasFrom(valueOutput);
                for (TezOperator splitee : splittees) {
                    PhysicalOperator spliteeRoot =  splitee.plan.getRoots().get(0);
                    splitee.plan.remove(spliteeRoot);
                    split.addPlan(splitee.plan);

                    addSubPlanPropertiesToParent(tezOp, splitee);

                    removeSplittee(getPlan(), tezOp, splitee);
                    valueOutput.removeOutputKey(splitee.getOperatorKey().toString());
                }
                if (valueOutput.getTezOutputs().length > 0) {
                    // We still need valueOutput
                    PhysicalPlan phyPlan = new PhysicalPlan();
                    phyPlan.addAsLeaf(valueOutput);
                    split.addPlan(phyPlan);
                }
                PhysicalOperator pred = tezOp.plan.getPredecessors(valueOutput).get(0);
                tezOp.plan.disconnect(pred, valueOutput);
                tezOp.plan.remove(valueOutput);
                tezOp.plan.add(split);
                tezOp.plan.connect(pred, split);
            }
        } catch (PlanException e) {
            throw new VisitorException(e);
        }
    }