private Vertex newVertex()

in src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java [614:1054]


    private Vertex newVertex(TezOperator tezOp) throws IOException,
            ClassNotFoundException, InterruptedException {

        ProcessorDescriptor procDesc = ProcessorDescriptor.create(
                tezOp.getProcessorName());

        // Pass physical plans to vertex as user payload.
        JobConf payloadConf = new JobConf(pigContextConf);

        // We do this so that dag.getCredentials(), job.getCredentials(),
        // job.getConfiguration().getCredentials() all reference the same Credentials object
        // Unfortunately there is no setCredentials() on Job
        payloadConf.setCredentials(dag.getCredentials());
        // We won't actually use this job, but we need it to talk with the Load Store funcs
        @SuppressWarnings("deprecation")
        Job job = new Job(payloadConf);
        payloadConf = (JobConf) job.getConfiguration();
        //TODO: Investigate. Setting as map writes empty output.
        //payloadConf.setBoolean(MRConfig.IS_MAP_PROCESSOR, tezOp.isUseMRMapSettings());
        payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
        payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
        payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
                PigInputFormatTez.class, InputFormat.class);
        setOutputFormat(job);
        payloadConf.set("udf.import.list", serializedUDFImportList);
        payloadConf.set("exectype", "TEZ");
        payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal());
        payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties()));

        DateTimeWritable.setupAvailableZoneIds();

        // Process stores
        LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);

        // Process UserFuncs
        processUserFuncs(tezOp, job);

        Configuration inputPayLoad = null;
        Configuration outputPayLoad = null;

        if (!stores.isEmpty()) {
            outputPayLoad = new Configuration(payloadConf);
            outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES,
                    ObjectSerializer.serialize(new ArrayList<POStore>()));
        }

        if (!(tezOp.getLoaderInfo().getLoads().isEmpty())) {
            payloadConf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getLoads()));
            payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
            payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
            inputPayLoad = new Configuration(payloadConf);
        }

        if (tezOp.getSampleOperator() != null) {
            payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
        }

        if (tezOp.getSortOperator() != null) {
            // Required by Sample Aggregation job for estimating quantiles
            payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.getSortOperator().getOperatorKey().toString());
            // PIG-4162: Order by/Skew Join in intermediate stage.
            // Increasing order by parallelism may not be required as it is
            // usually followed by limit other than store. But would benefit
            // cases like skewed join followed by group by.
            if (tezOp.getSortOperator().getEstimatedParallelism() != -1
                    && tezOp.getSortOperator().isIntermediateReducer()) {
                payloadConf.setLong(
                        InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                        intermediateTaskInputSize);
            }

        }

        // Set parent plan for all operators in the Tez plan.
        new PhyPlanSetter(tezOp.plan).visit();

        // Set the endOfAllInput flag on the physical plan if certain operators that
        // use this property (such as STREAM) are present in the plan.
        EndOfAllInputSetter.EndOfAllInputChecker checker =
                new EndOfAllInputSetter.EndOfAllInputChecker(tezOp.plan);
        checker.visit();
        if (checker.isEndOfAllInputPresent()) {
            payloadConf.set(JobControlCompiler.END_OF_INP_IN_MAP, "true");
        }

        // Configure the classes for incoming shuffles to this TezOp
        // TODO: Refactor out resetting input keys, PIG-3957
        List<PhysicalOperator> roots = tezOp.plan.getRoots();
        if (roots.size() == 1 && roots.get(0) instanceof POPackage) {
            POPackage pack = (POPackage) roots.get(0);

            List<PhysicalOperator> succsList = tezOp.plan.getSuccessors(pack);
            if (succsList != null) {
                succsList = new ArrayList<PhysicalOperator>(succsList);
            }
            byte keyType = pack.getPkgr().getKeyType();
            tezOp.plan.remove(pack);
            payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));

            POShuffleTezLoad newPack = new POShuffleTezLoad(pack);
            if (tezOp.isSkewedJoin()) {
                newPack.setSkewedJoins(true);
            }
            tezOp.plan.add(newPack);

            boolean isMergedInput = false;
            // Set input keys for POShuffleTezLoad. This is used to identify
            // the inputs that are attached to the POShuffleTezLoad in the
            // backend.
            Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>();
            TezOperator from = null;
            for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
                if (tezOp.getSampleOperator() != null && tezOp.getSampleOperator() == pred) {
                    // skip sample vertex input
                } else {
                    String inputKey = pred.getOperatorKey().toString();
                    boolean isVertexGroup = false;
                    if (pred.isVertexGroup()) {
                        isVertexGroup = true;
                        pred = mPlan.getOperator(pred.getVertexGroupMembers().get(0));
                    }
                    LinkedList<POLocalRearrangeTez> lrs =
                            PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class);
                    for (POLocalRearrangeTez lr : lrs) {
                        if (lr.isConnectedToPackage()
                                && lr.containsOutputKey(tezOp.getOperatorKey().toString())) {
                            localRearrangeMap.put((int) lr.getIndex(), inputKey);
                            if (isVertexGroup) {
                                isMergedInput = true;
                            }
                            from = pred;
                        }
                    }
                }
            }
            for (Map.Entry<Integer, String> entry : localRearrangeMap.entrySet()) {
                newPack.addInputKey(entry.getValue());
            }

            if (succsList != null) {
                for (PhysicalOperator succs : succsList) {
                    tezOp.plan.connect(newPack, succs);
                }
            }

            //POShuffleTezLoad accesses the comparator setting
            selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);

            if (tezOp.isUseSecondaryKey()) {
                TezEdgeDescriptor edge = tezOp.inEdges.get(from.getOperatorKey());
                // Currently only PigSecondaryKeyGroupingComparator is used in POShuffleTezLoad.
                // When PIG-4685: SecondaryKeyOptimizerTez does not optimize cogroup is fixed
                // in future, PigSecondaryKeyComparator will have to be used and that will require this.
                payloadConf.set("pig.secondarySortOrder", ObjectSerializer
                        .serialize(edge.getSecondarySortOrder()));
            }

        }

        // set parent plan in all operators. currently the parent plan is really
        // used only when POStream, POSplit are present in the plan
        new PhyPlanSetter(tezOp.plan).visit();

        // Serialize the execution plan
        payloadConf.set(PigProcessor.PLAN,
                ObjectSerializer.serialize(tezOp.plan));

        udfContextSeparator.serializeUDFContext(payloadConf, tezOp);

        if (!pc.inIllustrator) {
            for (POStore store : stores) {
                // unset inputs for POStore, otherwise, map/reduce plan will be unnecessarily deserialized
                store.setInputs(null);
                store.setParentPlan(null);
            }
            // We put them in the reduce because PigOutputCommitter checks the
            // ID of the task to see if it's a map, and if not, calls the reduce
            // committers.
            payloadConf.set(JobControlCompiler.PIG_MAP_STORES,
                    ObjectSerializer.serialize(new ArrayList<POStore>()));
            payloadConf.set(JobControlCompiler.PIG_REDUCE_STORES,
                    ObjectSerializer.serialize(stores));
        }

        if (tezOp.isNeedEstimateParallelism()) {
            payloadConf.setBoolean(PigProcessor.ESTIMATE_PARALLELISM, true);
            log.info("Estimate quantile for sample aggregation vertex " + tezOp.getOperatorKey().toString());
        }

        // set various parallelism into the job conf for later analysis, PIG-2779
        payloadConf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pc.defaultParallel);
        payloadConf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, tezOp.getRequestedParallelism());
        payloadConf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, tezOp.getEstimatedParallelism());

        TezScriptState ss = TezScriptState.get();
        ss.addVertexSettingsToConf(dag.getName(), tezOp, payloadConf);

        // Take our assembled configuration and create a vertex
        UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
        TezDAGScriptInfo dagScriptInfo = TezScriptState.get().getDAGScriptInfo(dag.getName());
        String alias = dagScriptInfo.getAlias(tezOp);
        String aliasLocation = dagScriptInfo.getAliasLocation(tezOp);
        String features = dagScriptInfo.getPigFeatures(tezOp);
        String vertexInfo = aliasLocation + " (" + features + ")" ;
        procDesc.setUserPayload(userPayload).setHistoryText(TezUtils.convertToHistoryText(vertexInfo, payloadConf));

        String vmPluginName = null;
        Configuration vmPluginConf = null;
        boolean containScatterGather = false;
        boolean containCustomPartitioner = false;
        for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
            if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
                containScatterGather = true;
            }
            if (edge.partitionerClass != null) {
                containCustomPartitioner = true;
            }
        }

        if(containScatterGather) {
            vmPluginName = ShuffleVertexManager.class.getName();
            vmPluginConf = new Configuration(shuffleVertexManagerBaseConf);
        }

        // Set the right VertexManagerPlugin
        if (tezOp.getEstimatedParallelism() != -1) {
            boolean autoParallelism = false;
            if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
                if (tezOp.getVertexParallelism()==-1 && (
                        tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1||
                        tezOp.isSkewedJoin() &&getPlan().getPredecessors(tezOp).size()==2)) {
                    // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
                    // to decrease/increase parallelism of sorting vertex dynamically
                    // based on the numQuantiles calculated by sample aggregation vertex
                    vmPluginName = PartitionerDefinedVertexManager.class.getName();
                    autoParallelism = true;
                    log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
                }
            } else {
                if (containScatterGather && !containCustomPartitioner) {

                    // For Intermediate reduce, set the bytes per reducer to be block size.
                    long bytesPerReducer = intermediateTaskInputSize;
                    // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user.
                    // If not as default use 384MB for group bys and 256 MB for joins. Not using
                    // default 1G as that value was suited for mapreduce logic where numReducers=(map input size/bytesPerReducer).
                    // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce
                    // as map input sizes are mostly always high compared to map output.
                    if (stores.size() > 0) {
                        if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
                            bytesPerReducer = pigContextConf.getLong(
                                            InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                                            InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
                        } else if (tezOp.isGroupBy()) {
                            bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT;
                        } else {
                            bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
                        }
                    }

                    // Use auto-parallelism feature of ShuffleVertexManager to dynamically
                    // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager
                    // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on
                    if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
                            && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()
                            && tezOp.getCrossKeys() == null) {
                        vmPluginName = PigGraceShuffleVertexManager.class.getName();
                        tezOp.setUseGraceParallelism(true);
                        vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
                        vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext);
                        vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer);
                    }
                    vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
                    vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer);
                    autoParallelism = true;
                    log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
                }
            }
            if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) {
                disableDAGRecovery = true;
            }
        }
        if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
                vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
            if (tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName())) {
                // Setting SRC_FRACTION to 0.00001 so that even if there are 100K source tasks,
                // limit job starts when 1 source task finishes.
                // If limit is part of a group by or join because their parallelism is 1,
                // we should leave the configuration with the defaults.
                vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf;
                vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, "0.00001");
                vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, "0.00001");
                log.info("Set " + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 for limit vertex " + tezOp.getOperatorKey().toString());
            }
        }

        int parallel = tezOp.getVertexParallelism();
        if (tezOp.isUseGraceParallelism()) {
            parallel = -1;
        }
        Resource resource = tezOp.isUseMRMapSettings() ? mapTaskResource : reduceTaskResource;

        Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, parallel, resource);

        if (tezOp.isUseMRMapSettings()) {
            vertex.setTaskLaunchCmdOpts(mapTaskLaunchCmdOpts);
            vertex.setTaskEnvironment(mapTaskEnv);
        } else {
            vertex.setTaskLaunchCmdOpts(reduceTaskLaunchCmdOpts);
            vertex.setTaskEnvironment(reduceTaskEnv);
        }

        MRToTezHelper.setVertexConfig(vertex, tezOp.isUseMRMapSettings(), globalConf);

        log.info("For vertex - " + tezOp.getOperatorKey().toString()
                + ": parallelism=" + tezOp.getVertexParallelism()
                + ", memory=" + vertex.getTaskResource().getMemory()
                + ", java opts=" + vertex.getTaskLaunchCmdOpts()
                );
        log.info("Processing aliases: " + alias);
        log.info("Detailed locations: " + aliasLocation);
        log.info("Pig features in the vertex: " + features);
        // Right now there can only be one of each of these. Will need to be
        // more generic when there can be more.
        for (POLoad ld : tezOp.getLoaderInfo().getLoads()) {

            // TODO: These should get the globalConf, or a merged version that
            // keeps settings like pig.maxCombinedSplitSize
            Builder userPayLoadBuilder = MRRuntimeProtos.MRInputUserPayloadProto.newBuilder();

            InputSplitInfo inputSplitInfo = tezOp.getLoaderInfo().getInputSplitInfo();
            Map<String, LocalResource> additionalLocalResources = null;
            int spillThreshold = payloadConf
                    .getInt(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD,
                            PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD_DEFAULT);

            // Currently inputSplitInfo is always InputSplitInfoMem at this point
            if (inputSplitInfo instanceof InputSplitInfoMem) {
                MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder();
                Pair<Long, Boolean> serializationInfo = TezInputHelper.createSplitsProto(inputSplitInfo, pigContextConf, splitsBuilder,
                        spillThreshold);
                MRSplitsProto splitsProto = splitsBuilder.build();
                if(!serializationInfo.second) {
                    //write to disk
                    inputPayLoad.setBoolean(
                            org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
                            false);
                      // Write splits to disk
                      Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
                      log.info("Writing input splits to " + inputSplitsDir
                            + " for vertex " + vertex.getName()
                            + " as the partially serialized size in memory is "
                            + serializationInfo.first + ". Configured "
                            + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
                            + " is " + spillThreshold);
                      inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
                            (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, fs, splitsProto);
                      additionalLocalResources = new HashMap<String, LocalResource>();
                      MRToTezHelper.updateLocalResourcesForInputSplits(
                            fs, inputSplitInfo,
                            additionalLocalResources);
                      inputSplitInDiskVertices.add(vertex.getName());
                } else {
                      // Send splits via RPC to AM
                      userPayLoadBuilder.setSplits(splitsProto);
                }
                //Free up memory
                tezOp.getLoaderInfo().setInputSplitInfo(null);
            }

            udfContextSeparator.serializeUDFContext(inputPayLoad, tezOp, UDFType.LOADFUNC);
            userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(inputPayLoad));

            vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()));
            vertex.addDataSource(ld.getOperatorKey().toString(),
                    DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
                            .setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer())),
                    InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
                    inputSplitInfo.getNumTasks(),
                    dag.getCredentials(),
                    null,
                    additionalLocalResources));
        }

        // Union within a split can have multiple stores writing to same output
        Set<String> uniqueStoreOutputs = new HashSet<String>();
        for (POStore store : stores) {

            ArrayList<POStore> singleStore = new ArrayList<POStore>();
            singleStore.add(store);

            Configuration outPayLoad = new Configuration(outputPayLoad);
            udfContextSeparator.serializeUDFContext(outPayLoad, tezOp, store);
            outPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
                    ObjectSerializer.serialize(singleStore));

            OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
                    MROutput.class.getName()).setUserPayload(TezUtils
                    .createUserPayloadFromConf(outPayLoad));
            if (tezOp.getVertexGroupStores() != null) {
                OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
                if (vertexGroupKey != null) {
                    getPlan().getOperator(vertexGroupKey).getVertexGroupInfo()
                            .setStoreOutputDescriptor(storeOutDescriptor);
                    continue;
                }
            }
            String outputKey = ((POStoreTez) store).getOutputKey();
            if (!uniqueStoreOutputs.contains(outputKey)) {
                vertex.addDataSink(outputKey.toString(),
                        DataSinkDescriptor.create(storeOutDescriptor,
                        OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
                        dag.getCredentials()));
                uniqueStoreOutputs.add(outputKey);
            }
        }

        // LoadFunc and StoreFunc add delegation tokens to Job Credentials in
        // setLocation and setStoreLocation respectively. For eg: HBaseStorage
        // InputFormat add delegation token in getSplits and OutputFormat in
        // checkOutputSpecs. For eg: FileInputFormat and FileOutputFormat
        if (stores.size() > 0) {
            new PigOutputFormat().checkOutputSpecs(job);
        }

        // else if(tezOp.isLimitAfterSort())
        // TODO: PIG-4049 If standalone Limit we need a new VertexManager or new input
        // instead of ShuffledMergedInput. For limit part of the sort (order by parallel 1) itself
        // need to enhance PartitionerDefinedVertexManager

        if (vmPluginName != null) {
            VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName);
            if (vmPluginConf != null) {
                vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
            }
            vertex.setVertexManagerPlugin(vmPluginDescriptor);
        }
        // Reset udfcontext jobconf. It is not supposed to be set in the front end
        UDFContext.getUDFContext().addJobConf(null);
        return vertex;
    }