public PigStats launchPig()

in src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java [163:562]


    public PigStats launchPig(PhysicalPlan php,
            String grpName,
            PigContext pc) throws PlanException,
            VisitorException,
            IOException,
            ExecException,
            JobCreationException,
            Exception {
        long sleepTime = 500;
        aggregateWarning = Boolean.valueOf(pc.getProperties().getProperty("aggregate.warning"));
        MROperPlan mrp = compile(php, pc);

        ConfigurationValidator.validatePigProperties(pc.getProperties());
        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());

        MRExecutionEngine exe = (MRExecutionEngine) pc.getExecutionEngine();
        Properties defaultProperties = new Properties();
        JobConf defaultJobConf = exe.getLocalConf();
        Utils.recomputeProperties(defaultJobConf, defaultProperties);

        // This is a generic JobClient for checking progress of the jobs
        JobClient statsJobClient = new JobClient(exe.getJobConf());

        JobControlCompiler jcc = new JobControlCompiler(pc, conf, ConfigurationUtil.toConfiguration(defaultProperties));

        MRScriptState.get().addWorkflowAdjacenciesToConf(mrp, conf);

        // start collecting statistics
        PigStats.start(pc.getExecutionEngine().instantiatePigStats());
        MRPigStatsUtil.startCollection(pc, statsJobClient, jcc, mrp);

        // Find all the intermediate data stores. The plan will be destroyed during compile/execution
        // so this needs to be done before.
        MRIntermediateDataVisitor intermediateVisitor = new MRIntermediateDataVisitor(mrp);
        intermediateVisitor.visit();

        List<Job> failedJobs = new LinkedList<Job>();
        List<NativeMapReduceOper> failedNativeMR = new LinkedList<NativeMapReduceOper>();
        List<Job> completeFailedJobsInThisRun = new LinkedList<Job>();
        List<Job> succJobs = new LinkedList<Job>();
        int totalMRJobs = mrp.size();
        int numMRJobsCompl = 0;
        double lastProg = -1;
        long scriptSubmittedTimestamp = System.currentTimeMillis();

        //create the exception handler for the job control thread
        //and register the handler with the job control thread
        JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();

        boolean stop_on_failure =
            Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
        boolean stoppedOnFailure = false;

        // jc is null only when mrp.size == 0
        while(mrp.size() != 0 && !stoppedOnFailure) {
            jc = jcc.compile(mrp, grpName);
            if(jc == null) {
                List<MapReduceOper> roots = new LinkedList<MapReduceOper>();
                roots.addAll(mrp.getRoots());

                // run the native mapreduce roots first then run the rest of the roots
                for(MapReduceOper mro: roots) {
                    if(mro instanceof NativeMapReduceOper) {
                        NativeMapReduceOper natOp = (NativeMapReduceOper)mro;
                        try {
                            MRScriptState.get().emitJobsSubmittedNotification(1);
                            natOp.runJob();
                            numMRJobsCompl++;
                        } catch (IOException e) {

                            mrp.trimBelow(natOp);
                            failedNativeMR.add(natOp);

                            String msg = "Error running native mapreduce" +
                                    " operator job :" + natOp.getJobId() + e.getMessage();

                            String stackTrace = Utils.getStackStraceStr(e);
                            LogUtils.writeLog(msg,
                                    stackTrace,
                                    pc.getProperties().getProperty("pig.logfile"),
                                    log
                                    );
                            log.info(msg);

                            if (stop_on_failure) {
                                int errCode = 6017;

                                throw new ExecException(msg, errCode,
                                        PigException.REMOTE_ENVIRONMENT);
                            }

                        }
                        double prog = ((double)numMRJobsCompl)/totalMRJobs;
                        notifyProgress(prog, lastProg);
                        lastProg = prog;
                        mrp.remove(natOp);
                    }
                }
                continue;
            }
            // Initially, all jobs are in wait state.
            List<Job> jobsWithoutIds = jc.getWaitingJobs();
            log.info(jobsWithoutIds.size() +" map-reduce job(s) waiting for submission.");
            //notify listeners about jobs submitted
            MRScriptState.get().emitJobsSubmittedNotification(jobsWithoutIds.size());

            // update Pig stats' job DAG with just compiled jobs
            MRPigStatsUtil.updateJobMroMap(jcc.getJobMroMap());

            // determine job tracker url
            String jobTrackerLoc;
            JobConf jobConf = jobsWithoutIds.get(0).getJobConf();
            try {
                String port = jobConf.get(MRConfiguration.JOB_TRACKER_HTTP_ADDRESS);
                String jobTrackerAdd = jobConf.get(MRConfiguration.JOB_TRACKER);

                jobTrackerLoc = jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"))
                        + port.substring(port.indexOf(":"));
            }
            catch(Exception e){
                // Could not get the job tracker location, most probably we are running in local mode.
                // If it is the case, we don't print out job tracker location,
                // because it is meaningless for local mode.
                jobTrackerLoc = null;
                log.debug("Failed to get job tracker location.");
            }

            completeFailedJobsInThisRun.clear();

            // Set the thread UDFContext so registered classes are available.
            final UDFContext udfContext = UDFContext.getUDFContext();
            Thread jcThread = new Thread(jc, "JobControl") {
                @Override
                public void run() {
                    UDFContext.setUdfContext(udfContext.clone()); //PIG-2576
                    super.run();
                }
            };

            jcThread.setUncaughtExceptionHandler(jctExceptionHandler);

            jcThread.setContextClassLoader(PigContext.getClassLoader());

            // mark the times that the jobs were submitted so it's reflected in job history props
            for (Job job : jc.getWaitingJobs()) {
                JobConf jobConfCopy = job.getJobConf();
                jobConfCopy.set("pig.script.submitted.timestamp",
                        Long.toString(scriptSubmittedTimestamp));
                jobConfCopy.set("pig.job.submitted.timestamp",
                        Long.toString(System.currentTimeMillis()));
                job.setJobConf(jobConfCopy);
            }

            //All the setup done, now lets launch the jobs.
            jcThread.start();

            try {
                // a flag whether to warn failure during the loop below, so users can notice failure earlier.
                boolean warn_failure = true;

                // Now wait, till we are finished.
                while(!jc.allFinished()){

                    jcThread.join(sleepTime);

                    List<Job> jobsAssignedIdInThisRun = new ArrayList<Job>();

                    for(Job job : jobsWithoutIds){
                        if (job.getAssignedJobID() != null){

                            jobsAssignedIdInThisRun.add(job);
                            log.info("HadoopJobId: "+job.getAssignedJobID());

                            // display the aliases being processed
                            MapReduceOper mro = jcc.getJobMroMap().get(job);
                            if (mro != null) {
                                String alias = MRScriptState.get().getAlias(mro);
                                log.info("Processing aliases " + alias);
                                String aliasLocation = MRScriptState.get().getAliasLocation(mro);
                                log.info("detailed locations: " + aliasLocation);
                            }

                            // update statistics for this job so jobId is set
                            MRPigStatsUtil.addJobStats(job);
                            MRScriptState.get().emitJobStartedNotification(
                                    job.getAssignedJobID().toString());
                        }
                        else{
                            // This job is not assigned an id yet.
                        }
                    }
                    jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);

                    double prog = (numMRJobsCompl+calculateProgress(jc))/totalMRJobs;
                    if (notifyProgress(prog, lastProg)) {
                        List<Job> runnJobs = jc.getRunningJobs();
                        if (runnJobs != null) {
                            StringBuilder msg = new StringBuilder();
                            for (Object object : runnJobs) {
                                Job j = (Job) object;
                                if (j != null) {
                                    msg.append(j.getAssignedJobID()).append(",");
                                }
                            }
                            if (msg.length() > 0) {
                                msg.setCharAt(msg.length() - 1, ']');
                                log.info("Running jobs are [" + msg);
                            }
                        }
                        lastProg = prog;
                    }

                    // collect job stats by frequently polling of completed jobs (PIG-1829)
                    MRPigStatsUtil.accumulateStats(jc);

                    // if stop_on_failure is enabled, we need to stop immediately when any job has failed
                    stoppedOnFailure = stopJobsOnFailure(stop_on_failure);
                    // otherwise, we just display a warning message if there's any failure
                    if (!stop_on_failure && warn_failure && !jc.getFailedJobs().isEmpty()) {
                        // we don't warn again for this group of jobs
                        warn_failure = false;
                        log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you "
                                + "want Pig to stop immediately on failure.");
                    }
                }

                //check for the jobControlException first
                //if the job controller fails before launching the jobs then there are
                //no jobs to check for failure
                if (jobControlException != null) {
                    if (jobControlException instanceof PigException) {
                        if (jobControlExceptionStackTrace != null) {
                            LogUtils.writeLog("Error message from job controller",
                                    jobControlExceptionStackTrace, pc
                                    .getProperties().getProperty(
                                            "pig.logfile"), log);
                        }
                        throw jobControlException;
                    } else {
                        int errCode = 2117;
                        String msg = "Unexpected error when launching map reduce job.";
                        throw new ExecException(msg, errCode, PigException.BUG,
                                jobControlException);
                    }
                }

                if (!jc.getFailedJobs().isEmpty() ) {
                    // stop if stop_on_failure is enabled
                    stoppedOnFailure = stopJobsOnFailure(stop_on_failure);

                    if (!stoppedOnFailure) {
                        // If we only have one store and that job fail, then we sure
                        // that the job completely fail, and we shall stop dependent jobs
                        for (Job job : jc.getFailedJobs()) {
                            completeFailedJobsInThisRun.add(job);
                            log.info("job " + job.getAssignedJobID() + " has failed! Stop running all dependent jobs");
                        }
                    }
                    failedJobs.addAll(jc.getFailedJobs());
                }

                int removedMROp = jcc.updateMROpPlan(completeFailedJobsInThisRun);

                numMRJobsCompl += removedMROp;

                List<Job> jobs = jc.getSuccessfulJobs();
                jcc.moveResults(jobs);
                succJobs.addAll(jobs);

                // collecting final statistics
                MRPigStatsUtil.accumulateStats(jc);

            }
            catch (Exception e) {
                throw e;
            }
            finally {
                jc.stop();
            }
        }

        MRScriptState.get().emitProgressUpdatedNotification(100);

        log.info( "100% complete");

        boolean failed = false;

        if(failedNativeMR.size() > 0){
            failed = true;
        }

        if (Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_DELETE_TEMP_FILE, "true"))) {
            // Clean up all the intermediate data
            for (String path : intermediateVisitor.getIntermediate()) {
                // Skip non-file system paths such as hbase, see PIG-3617
                if (HadoopShims.hasFileSystemImpl(new Path(path), conf)) {
                    FileLocalizer.delete(path, pc);
                }
            }
        }

        // Look to see if any jobs failed.  If so, we need to report that.
        if (failedJobs != null && failedJobs.size() > 0) {

            Exception backendException = null;
            for (Job fj : failedJobs) {
                try {
                    getStats(fj, true, pc);
                } catch (Exception e) {
                    backendException = e;
                }
                List<POStore> sts = jcc.getStores(fj);
                for (POStore st: sts) {
                    failureMap.put(st.getSFile(), backendException);
                }
                MRPigStatsUtil.setBackendException(fj, backendException);
            }
            failed = true;
        }

        // stats collection is done, log the results
        MRPigStatsUtil.stopCollection(true);

        // PigStatsUtil.stopCollection also computes the return code based on
        // total jobs to run, jobs successful and jobs failed
        failed = failed || !PigStats.get().isSuccessful();

        Map<Enum, Long> warningAggMap = new HashMap<Enum, Long>();

        if (succJobs != null) {
            for (Job job : succJobs) {
                List<POStore> sts = jcc.getStores(job);
                for (POStore st : sts) {
                    if (!st.isTmpStore()) {
                        // create an "_SUCCESS" file in output location if
                        // output location is a filesystem dir
                        createSuccessFile(job, st);
                    } else {
                        log.debug("Successfully stored result in: \""
                                + st.getSFile().getFileName() + "\"");
                    }
                }

                getStats(job, false, pc);
                if (aggregateWarning) {
                    computeWarningAggregate(job, warningAggMap);
                }
            }

        }

        if(aggregateWarning) {
            CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log) ;
        }

        if (!failed) {
            log.info("Success!");
        } else {
            if (succJobs != null && succJobs.size() > 0) {
                log.info("Some jobs have failed! Stop running all dependent jobs");
            } else {
                log.info("Failed!");
            }
        }
        jcc.reset();

        int ret = failed ? ((succJobs != null && succJobs.size() > 0)
                ? ReturnCode.PARTIAL_FAILURE
                        : ReturnCode.FAILURE)
                        : ReturnCode.SUCCESS;

        PigStats pigStats = PigStatsUtil.getPigStats(ret);
        // run cleanup for all of the stores
        for (OutputStats output : pigStats.getOutputStats()) {
            POStore store = output.getPOStore();
            try {
                if (!output.isSuccessful()) {
                    store.getStoreFunc().cleanupOnFailure(
                            store.getSFile().getFileName(),
                            new org.apache.hadoop.mapreduce.Job(output.getConf()));
                } else {
                    store.getStoreFunc().cleanupOnSuccess(
                            store.getSFile().getFileName(),
                            new org.apache.hadoop.mapreduce.Job(output.getConf()));
                }
            } catch (IOException e) {
                throw new ExecException(e);
            } catch (AbstractMethodError nsme) {
                // Just swallow it.  This means we're running against an
                // older instance of a StoreFunc that doesn't implement
                // this method.
            }
        }

        if (stoppedOnFailure) {
            throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017,
                    PigException.REMOTE_ENVIRONMENT);
        }
        return pigStats;
    }