public void visitTezOp()

in src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java [172:381]


    public void visitTezOp(TezOperator tezOp) throws VisitorException {
        if (!tezOp.isUnion()) {
            return;
        }

        if (!isOptimizable(tezOp)) {
            return;
        }

        TezOperator unionOp = tezOp;
        String scope = unionOp.getOperatorKey().scope;
        PhysicalPlan unionOpPlan = unionOp.plan;

        Set<OperatorKey> uniqueUnionMembers = new HashSet<OperatorKey>(unionOp.getUnionMembers());
        List<TezOperator> predecessors = new ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
        List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null ? null
                : new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));

        if (uniqueUnionMembers.size() != 1) {

            if (!isOptimizableStoreFunc(tezOp, supportedStoreFuncs, unsupportedStoreFuncs)) {
                return;
            }

            if (successors != null) {
                for (TezOperator succ : successors) {
                    for (TezOperator pred : predecessors) {
                        if (succ.inEdges.containsKey(pred.getOperatorKey())) {
                            // Stop here, we cannot convert the node into vertex group
                            // Otherwise, we will end up with a parallel edge between pred
                            // and succ
                            return;
                        }
                    }
                }
            }

            // TODO: PIG-3856 Handle replicated join and skewed join sample.
            // Replicate join small table/skewed join sample that was broadcast to union vertex
            // now needs to be broadcast to all the union predecessors. How do we do that??
            // Wait for shared edge and do it or write multiple times??
            // For now don't optimize except in the case of Split where we need to write only once
            if (predecessors.size() > unionOp.getUnionMembers().size()) {
                return;
            }
        }

        if (uniqueUnionMembers.size() == 1) {
            // We actually don't need VertexGroup in this case. The multiple
            // sub-plans of Split can write to same MROutput or the Tez LogicalOutput
            OperatorKey splitPredKey = uniqueUnionMembers.iterator().next();
            TezOperator splitPredOp = tezPlan.getOperator(splitPredKey);
            PhysicalPlan splitPredPlan = splitPredOp.plan;
            if (splitPredPlan.getLeaves().get(0) instanceof POSplit) { //It has to be. But check anyways

                for( TezOperator op : predecessors ) {
                    if( !op.getOperatorKey().equals(splitPredKey)) {
                        Set<TezOperator> allNonMemberPredecessorsAncestors = new HashSet<TezOperator>();
                        TezCompilerUtil.addAllPredecessors(tezPlan, op, allNonMemberPredecessorsAncestors);
                        // If any of the nonMemberPredecessor's ancestors(recursive predecessor)
                        // is from the single unionmember, then we stop the merge effort to avoid creating
                        // an illegal loop.
                        if( allNonMemberPredecessorsAncestors.contains(splitPredOp) ) {
                            return;
                        }
                    }
                }

                try {
                    connectUnionNonMemberPredecessorsToSplit(unionOp, splitPredOp, predecessors);

                    // Remove POShuffledValueInputTez from union plan root
                    unionOpPlan.remove(unionOpPlan.getRoots().get(0));
                    // Clone union plan into split subplans
                    for (int i=0; i < Collections.frequency(unionOp.getUnionMembers(), splitPredKey); i++ ) {
                        cloneAndMergeUnionPlan(unionOp, splitPredOp);
                    }
                    copyOperatorProperties(splitPredOp, unionOp);
                    tezPlan.disconnect(splitPredOp, unionOp);

                    connectSplitOpToUnionSuccessors(unionOp, splitPredOp, successors);
                } catch (PlanException e) {
                    throw new VisitorException(e);
                }

                //Remove union operator from the plan
                tezPlan.remove(unionOp);
                return;
            } else {
                throw new VisitorException("Expected POSplit but found " + splitPredPlan.getLeaves().get(0));
            }
        }

        // Create vertex group operator for each store. Union followed by Split
        // followed by Store could have multiple stores
        List<POStoreTez> unionStoreOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
        TezOperator[] storeVertexGroupOps = new TezOperator[unionStoreOutputs.size()];
        for (int i=0; i < storeVertexGroupOps.length; i++) {
            TezOperator existingVertexGroup = null;
            if (successors != null) {
                for (TezOperator succ : successors) {
                    if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) {
                        existingVertexGroup = succ;
                        break;
                    }
                }
            }
            if (existingVertexGroup == null) {
                // In the case of union + split + union + store, the different stores in the Split
                // will be writing to same location after second union operator is optimized.
                // So while optimizing the first union, we should just make it write to one vertex group
                for (int j = 0; j < i; j++) {
                    if (unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile())) {
                        storeVertexGroupOps[i] = storeVertexGroupOps[j];
                        break;
                    }
                }
                if (storeVertexGroupOps[i] != null) {
                    continue;
                }
                storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
                storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i)));
                storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
                storeVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers()));
                tezPlan.add(storeVertexGroupOps[i]);
            } else {
                storeVertexGroupOps[i] = existingVertexGroup;
                existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
                existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
                existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
            }
        }

        // Create vertex group operator for each output. Case of split, orderby,
        // skewed join, rank, etc will have multiple outputs
        List<TezOutput> unionOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, TezOutput.class);
        // One TezOutput can write to multiple LogicalOutputs (POCounterTez, POValueOutputTez, etc)
        List<String> unionOutputKeys = new ArrayList<String>();
        for (TezOutput output : unionOutputs) {
            if (output instanceof POStoreTez) {
                continue;
            }
            for (String key : output.getTezOutputs()) {
                unionOutputKeys.add(key);
            }
        }
        TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()];
        String[] newOutputKeys = new String[unionOutputKeys.size()];
        for (int i=0; i < outputVertexGroupOps.length; i++) {
            TezOperator existingVertexGroup = null;
            if (successors != null) {
                for (TezOperator succ : successors) {
                    if (succ.isVertexGroup()
                        && unionOutputKeys.get(i).equals(succ.getVertexGroupInfo().getOutput()) ) {
                        existingVertexGroup = succ;
                        break;
                    }
                }
            }
            if (existingVertexGroup == null) {
                for (int j = 0; j < i; j++) {
                    if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) {
                        outputVertexGroupOps[i] = outputVertexGroupOps[j];
                        break;
                    }
                }
                if (outputVertexGroupOps[i] != null) {
                    continue;
                }
                outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
                outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
                outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
                outputVertexGroupOps[i].setVertexGroupMembers(new ArrayList<OperatorKey>(unionOp.getUnionMembers()));
            } else {
                outputVertexGroupOps[i] = existingVertexGroup;
                existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
                existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
                existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
            }
            newOutputKeys[i] = outputVertexGroupOps[i].getOperatorKey().toString();
            tezPlan.add(outputVertexGroupOps[i]);
        }

        // Change plan from Predecessors -> Union -> Successor(s) to
        // Predecessors -> Vertex Group(s) -> Successor(s)
        try {
             // Remove POShuffledValueInputTez from union plan root
            unionOpPlan.remove(unionOpPlan.getRoots().get(0));

            for (OperatorKey predKey : unionOp.getUnionMembers()) {
                TezOperator pred = tezPlan.getOperator(predKey);
                PhysicalPlan clonePlan = cloneAndMergeUnionPlan(unionOp, pred);
                connectPredecessorsToVertexGroups(unionOp, pred, clonePlan,
                        storeVertexGroupOps, outputVertexGroupOps);
            }

            connectVertexGroupsToSuccessors(unionOp, successors,
                    unionOutputKeys, outputVertexGroupOps);

            replaceSuccessorInputsAndDisconnect(unionOp, successors, unionOutputKeys, newOutputKeys);

            //Remove union operator from the plan
            tezPlan.remove(unionOp);
        } catch (VisitorException e) {
            throw e;
        }  catch (Exception e) {
            throw new VisitorException(e);
        }

    }