public PigStats launchPig()

in src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java [110:304]


    public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
        synchronized (this) {
            if (executor == null) {
                executor = Executors.newSingleThreadExecutor(namedThreadFactory);
            }
        }
        if (pc.getExecType().isLocal()) {
            pc.getProperties().setProperty(TezConfiguration.TEZ_LOCAL_MODE, "true");
            pc.getProperties().setProperty(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
            pc.getProperties().setProperty(TezConfiguration.TEZ_IGNORE_LIB_URIS, "true");
            pc.getProperties().setProperty(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, DAGSchedulerNaturalOrderControlled.class.getName());
        }
        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
        // Make sure MR counter does not exceed limit
        if (conf.get(TezConfiguration.TEZ_COUNTERS_MAX) != null) {
            conf.setInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTERS_MAX_KEY, Math.max(
                    conf.getInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTERS_MAX_KEY, 0),
                    conf.getInt(TezConfiguration.TEZ_COUNTERS_MAX, 0)));
        }
        if (conf.get(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS) != null) {
            conf.setInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTER_GROUPS_MAX_KEY, Math.max(
                    conf.getInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTER_GROUPS_MAX_KEY, 0),
                    conf.getInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 0)));
        }

        // This is hacky, but Limits cannot be initialized twice
        try {
            Field f = Limits.class.getDeclaredField("isInited");
            f.setAccessible(true);
            f.setBoolean(null, false);
            Limits.init(conf);
        } catch (Throwable e) {
            log.warn("Error when setting counter limit: " + e.getMessage());
        }

        if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) {
            pc.defaultParallel = 1;
        }
        aggregateWarning = conf.getBoolean("aggregate.warning", false);

        TezResourceManager tezResourceManager = TezResourceManager.getInstance();
        tezResourceManager.init(pc, conf);

        String stagingDir = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR);
        String resourcesDir = tezResourceManager.getResourcesDir().toString();
        if (stagingDir == null) {
            // If not set in tez-site.xml, use Pig's tez resources directory as staging directory
            // instead of TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT
            stagingDir = resourcesDir;
            conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, resourcesDir);
        }
        log.info("Tez staging directory is " + stagingDir + " and resources directory is " + resourcesDir);


        List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();

        tezScriptState = TezScriptState.get();
        tezStats = new TezPigScriptStats(pc);
        PigStats.start(tezStats);

        conf.setIfUnset(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
        TezJobCompiler jc = new TezJobCompiler(pc, conf);
        TezPlanContainer tezPlanContainer = compile(php, pc);

        tezStats.initialize(tezPlanContainer);
        tezScriptState.emitInitialPlanNotification(tezPlanContainer);
        tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch

        boolean stop_on_failure =
                Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
        boolean stoppedOnFailure = false;

        TezPlanContainerNode tezPlanContainerNode;
        TezOperPlan tezPlan;
        int processedDAGs = 0;
        while ((tezPlanContainerNode = tezPlanContainer.getNextPlan(processedPlans)) != null) {
            tezPlan = tezPlanContainerNode.getTezOperPlan();
            processLoadAndParallelism(tezPlan, pc);
            processedPlans.add(tezPlan);
            ProgressReporter reporter = new ProgressReporter(tezPlanContainer.size(), processedDAGs);
            if (tezPlan.size()==1 && tezPlan.getRoots().get(0) instanceof NativeTezOper) {
                // Native Tez Plan
                NativeTezOper nativeOper = (NativeTezOper)tezPlan.getRoots().get(0);
                tezScriptState.emitJobsSubmittedNotification(1);
                nativeOper.runJob(tezPlanContainerNode.getOperatorKey().toString());
            } else {
                TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
                pkgAnnotator.visit();

                runningJob = jc.compile(tezPlanContainerNode, tezPlanContainer);
                //TODO: Exclude vertex groups from numVerticesToLaunch ??
                tezScriptState.dagLaunchNotification(runningJob.getName(), tezPlan, tezPlan.size());
                runningJob.setPigStats(tezStats);

                // Set the thread UDFContext so registered classes are available.
                final UDFContext udfContext = UDFContext.getUDFContext();
                Runnable task = new Runnable() {
                    @Override
                    public void run() {
                        Thread.currentThread().setContextClassLoader(PigContext.getClassLoader());
                        UDFContext.setUdfContext(udfContext.clone());
                        runningJob.run();
                    }
                };

                // Mark the times that the jobs were submitted so it's reflected in job
                // history props. TODO: Fix this. unused now
                long scriptSubmittedTimestamp = System.currentTimeMillis();
                // Job.getConfiguration returns the shared configuration object
                Configuration jobConf = runningJob.getConfiguration();
                jobConf.set("pig.script.submitted.timestamp",
                        Long.toString(scriptSubmittedTimestamp));
                jobConf.set("pig.job.submitted.timestamp",
                        Long.toString(System.currentTimeMillis()));

                Future<?> future = executor.submit(task);
                tezScriptState.emitJobsSubmittedNotification(1);

                boolean jobStarted = false;

                while (!future.isDone()) {
                    if (!jobStarted && runningJob.getApplicationId() != null) {
                        jobStarted = true;
                        String appId = runningJob.getApplicationId().toString();
                        //For Oozie Pig action job id matching compatibility with MR mode
                        log.info("HadoopJobId: "+ appId.replace("application", "job"));
                        tezScriptState.emitJobStartedNotification(appId);
                        tezScriptState.dagStartedNotification(runningJob.getName(), appId);
                    }
                    reporter.notifyUpdate();
                    Thread.sleep(1000);
                }
                // For tez_local mode where PigProcessor destroys all UDFContext
                UDFContext.setUdfContext(udfContext);
                try {
                    // In case of FutureTask there is no uncaught exception
                    // Need to do future.get() to get any exception
                    future.get();
                } catch (ExecutionException e) {
                    setJobException(e.getCause());
                }
            }
            processedDAGs++;
            if (tezPlanContainer.size() == processedDAGs) {
                tezScriptState.emitProgressUpdatedNotification(100);
            } else {
                tezScriptState.emitProgressUpdatedNotification(
                    ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
            }
            handleUnCaughtException(pc);
            boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed();
            tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded);
            // if stop_on_failure is enabled, we need to stop immediately when any job has failed
            if (!tezDAGSucceeded) {
                if (stop_on_failure) {
                    stoppedOnFailure = true;
                    break;
                } else {
                    log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you "
                            + "want Pig to stop immediately on failure.");
                }
            }
        }

        tezStats.finish();
        tezScriptState.emitLaunchCompletedNotification(tezStats.getNumberSuccessfulJobs());

        for (OutputStats output : tezStats.getOutputStats()) {
            POStore store = output.getPOStore();
            try {
                if (!output.isSuccessful()) {
                    store.getStoreFunc().cleanupOnFailure(
                            store.getSFile().getFileName(),
                            Job.getInstance(output.getConf()));
                } else {
                    store.getStoreFunc().cleanupOnSuccess(
                            store.getSFile().getFileName(),
                            Job.getInstance(output.getConf()));
                }
            } catch (IOException e) {
                throw new ExecException(e);
            } catch (AbstractMethodError nsme) {
                // Just swallow it.  This means we're running against an
                // older instance of a StoreFunc that doesn't implement
                // this method.
            }
        }

        if (stoppedOnFailure) {
            throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017,
                    PigException.REMOTE_ENVIRONMENT);
        }

        return tezStats;
    }