static int run()

in src/org/apache/pig/Main.java [178:688]


    static int run(String args[], PigProgressNotificationListener listener) {
        DateTime startTime = new DateTime();
        int rc = 1;
        boolean verbose = false;
        boolean gruntCalled = false;
        boolean deleteTempFiles = true;
        String logFileName = null;
        boolean printScriptRunTime = true;
        PigContext pigContext = null;

        try {
            Configuration conf = new Configuration(false);
            GenericOptionsParser parser = new GenericOptionsParser(conf, args);
            conf = parser.getConfiguration();

            Properties properties = new Properties();
            PropertiesUtil.loadDefaultProperties(properties);
            properties.putAll(ConfigurationUtil.toProperties(conf));

            if (listener == null) {
                listener = makeListener(properties);
            }
            String[] pigArgs = parser.getRemainingArgs();

            boolean userSpecifiedLog = false;
            boolean checkScriptOnly = false;

            BufferedReader pin = null;
            boolean debug = false;
            boolean dryrun = false;
            boolean embedded = false;
            List<String> params = new ArrayList<String>();
            List<String> paramFiles = new ArrayList<String>();
            HashSet<String> disabledOptimizerRules = new HashSet<String>();

            CmdLineParser opts = new CmdLineParser(pigArgs);
            opts.registerOpt('4', "log4jconf", CmdLineParser.ValueExpected.REQUIRED);
            opts.registerOpt('b', "brief", CmdLineParser.ValueExpected.NOT_ACCEPTED);
            opts.registerOpt('c', "check", CmdLineParser.ValueExpected.NOT_ACCEPTED);
            opts.registerOpt('d', "debug", CmdLineParser.ValueExpected.REQUIRED);
            opts.registerOpt('e', "execute", CmdLineParser.ValueExpected.NOT_ACCEPTED);
            opts.registerOpt('f', "file", CmdLineParser.ValueExpected.REQUIRED);
            opts.registerOpt('g', "embedded", CmdLineParser.ValueExpected.REQUIRED);
            opts.registerOpt('h', "help", CmdLineParser.ValueExpected.OPTIONAL);
            opts.registerOpt('i', "version", CmdLineParser.ValueExpected.OPTIONAL);
            opts.registerOpt('l', "logfile", CmdLineParser.ValueExpected.REQUIRED);
            opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
            opts.registerOpt('p', "param", CmdLineParser.ValueExpected.OPTIONAL);
            opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
            opts.registerOpt('t', "optimizer_off", CmdLineParser.ValueExpected.REQUIRED);
            opts.registerOpt('v', "verbose", CmdLineParser.ValueExpected.NOT_ACCEPTED);
            opts.registerOpt('w', "warning", CmdLineParser.ValueExpected.NOT_ACCEPTED);
            opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
            opts.registerOpt('F', "stop_on_failure", CmdLineParser.ValueExpected.NOT_ACCEPTED);
            opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.NOT_ACCEPTED);
            opts.registerOpt('N', "no_fetch", CmdLineParser.ValueExpected.NOT_ACCEPTED);
            opts.registerOpt('P', "propertyFile", CmdLineParser.ValueExpected.REQUIRED);

            ExecMode mode = ExecMode.UNKNOWN;
            String file = null;
            String engine = null;

            // set up client side system properties in UDF context
            UDFContext.getUDFContext().setClientSystemProps(properties);

            char opt;
            while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
                switch (opt) {
                case '4':
                    String log4jconf = opts.getValStr();
                    if(log4jconf != null){
                        properties.setProperty(LOG4J_CONF, log4jconf);
                    }
                    break;

                case 'b':
                    properties.setProperty(BRIEF, "true");
                    break;

                case 'c':
                    checkScriptOnly = true;
                    break;

                case 'd':
                    String logLevel = opts.getValStr();
                    if (logLevel != null) {
                        properties.setProperty(DEBUG, logLevel);
                    }
                    debug = true;
                    break;

                case 'e':
                    mode = ExecMode.STRING;
                    break;

                case 'f':
                    mode = ExecMode.FILE;
                    file = opts.getValStr();
                    break;

                case 'g':
                    embedded = true;
                    engine = opts.getValStr();
                    break;

                case 'F':
                    properties.setProperty("stop.on.failure", ""+true);
                    break;

                case 'h':
                    String topic = opts.getValStr();
                    if (topic != null)
                        if (topic.equalsIgnoreCase("properties"))
                            printProperties();
                        else{
                            System.out.println("Invalide help topic - " + topic);
                            usage();
                        }
                    else
                        usage();
                    return ReturnCode.SUCCESS;

                case 'i':
                    printScriptRunTime = false;
                    System.out.println(getVersionString());
                    return ReturnCode.SUCCESS;

                case 'l':
                    //call to method that validates the path to the log file
                    //and sets up the file to store the client side log file
                    String logFileParameter = opts.getValStr();
                    if (logFileParameter != null && logFileParameter.length() > 0) {
                        logFileName = validateLogFile(logFileParameter, null);
                    } else {
                        logFileName = validateLogFile(logFileName, null);
                    }
                    userSpecifiedLog = true;
                    properties.setProperty("pig.logfile", (logFileName == null? "": logFileName));
                    break;

                case 'm':
                    paramFiles.add(opts.getValStr());
                    break;

                case 'M':
                    // turns off multiquery optimization
                    properties.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY,""+false);
                    break;

                case 'N':
                    properties.setProperty(PigConfiguration.PIG_OPT_FETCH,""+false);
                    break;

                case 'p':
                    params.add(opts.getValStr());
                    break;

                case 'r':
                    // currently only used for parameter substitution
                    // will be extended in the future
                    dryrun = true;
                    break;

                case 't':
                    disabledOptimizerRules.add(opts.getValStr());
                    break;

                case 'v':
                    properties.setProperty(VERBOSE, ""+true);
                    verbose = true;
                    break;

                case 'w':
                    properties.setProperty("aggregate.warning", ""+false);
                    break;

                case 'x':
                    properties.setProperty("exectype", opts.getValStr());
                    if (opts.getValStr().toLowerCase().contains("local")) {
                        UserGroupInformation.setConfiguration(new Configuration(false));
                    }
                    break;

                case 'P':
                {
                    InputStream inputStream = null;
                    try {
                        FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(properties, opts.getValStr());
                        inputStream = new BufferedInputStream(new FileInputStream(localFileRet.file));
                        properties.load(inputStream) ;
                    } catch (IOException e) {
                        throw new RuntimeException("Unable to parse properties file '" + opts.getValStr() + "'");
                    } finally {
                        if (inputStream != null) {
                            try {
                                inputStream.close();
                            } catch (IOException e) {
                            }
                        }
                    }
                }
                break;

                default: {
                    Character cc = Character.valueOf(opt);
                    throw new AssertionError("Unhandled option " + cc.toString());
                         }
                }
            }

            // create the context with the parameter
            pigContext = new PigContext(properties);

            // create the static script state object
            ScriptState scriptState = pigContext.getExecutionEngine().instantiateScriptState();
            String commandLine = LoadFunc.join((AbstractList<String>)Arrays.asList(args), " ");
            scriptState.setCommandLine(commandLine);
            if (listener != null) {
                scriptState.registerListener(listener);
            }
            ScriptState.start(scriptState);

            pigContext.getProperties().setProperty("pig.cmd.args", commandLine);

            if(logFileName == null && !userSpecifiedLog) {
                logFileName = validateLogFile(properties.getProperty("pig.logfile"), null);
            }

            pigContext.getProperties().setProperty("pig.logfile", (logFileName == null? "": logFileName));

            // configure logging
            configureLog4J(properties, pigContext);

            log.info(getVersionString().replace("\n", ""));

            if(logFileName != null) {
                log.info("Logging error messages to: " + logFileName);
            }

            deleteTempFiles = Boolean.valueOf(properties.getProperty(
                    PigConfiguration.PIG_DELETE_TEMP_FILE, "true"));

            pigContext.getProperties().setProperty(PigImplConstants.PIG_OPTIMIZER_RULES_KEY,
                    ObjectSerializer.serialize(disabledOptimizerRules));

            PigContext.setClassLoader(pigContext.createCl(null));

            // construct the parameter substitution preprocessor
            Grunt grunt = null;
            BufferedReader in;
            String substFile = null;

            paramFiles = fetchRemoteParamFiles(paramFiles, properties);
            pigContext.setParams(params);
            pigContext.setParamFiles(paramFiles);

            switch (mode) {

            case FILE: {
                String remainders[] = opts.getRemainingArgs();
                if (remainders != null) {
                    pigContext.getProperties().setProperty(PigContext.PIG_CMD_ARGS_REMAINDERS,
                            ObjectSerializer.serialize(remainders));
                }
                FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(properties, file);
                if (localFileRet.didFetch) {
                    properties.setProperty("pig.jars.relative.to.dfs", "true");
                }

                scriptState.setFileName(file);

                if (embedded) {
                    return runEmbeddedScript(pigContext, localFileRet.file.getPath(), engine);
                } else {
                    SupportedScriptLang type = determineScriptType(localFileRet.file.getPath());
                    if (type != null) {
                        return runEmbeddedScript(pigContext, localFileRet.file
                                    .getPath(), type.name().toLowerCase());
                    }
                }
                //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
                in = new BufferedReader(new InputStreamReader(Utils.getCompositeStream(new FileInputStream(localFileRet.file), properties)));

                // run parameter substitution preprocessor first
                substFile = file + ".substituted";

                pin = runParamPreprocessor(pigContext, in, substFile, debug || dryrun || checkScriptOnly);
                if (dryrun) {
                    if (dryrun(substFile, pigContext)) {
                        log.info("Dry run completed. Substituted pig script is at "
                                + substFile
                                + ". Expanded pig script is at "
                                + file + ".expanded");
                    } else {
                        log.info("Dry run completed. Substituted pig script is at "
                                    + substFile);
                    }
                    return ReturnCode.SUCCESS;
                }


                logFileName = validateLogFile(logFileName, localFileRet.file);
                pigContext.getProperties().setProperty("pig.logfile", (logFileName == null? "": logFileName));

                // Set job name based on name of the script
                pigContext.getProperties().setProperty(PigContext.JOB_NAME,
                                                       "PigLatin:" +new File(file).getName()
                );
                if (!debug) {
                    new File(substFile).deleteOnExit();
                }

                scriptState.setScript(localFileRet.file);

                grunt = new Grunt(pin, pigContext);
                gruntCalled = true;

                if(checkScriptOnly) {
                    grunt.checkScript(substFile);
                    System.err.println(file + " syntax OK");
                    rc = ReturnCode.SUCCESS;
                } else {
                    int results[] = grunt.exec();
                    rc = getReturnCodeForStats(results);
                }

                return rc;
            }

            case STRING: {
                if(checkScriptOnly) {
                    System.err.println("ERROR:" +
                            "-c (-check) option is only valid " +
                            "when executing pig with a pig script file)");
                    return ReturnCode.ILLEGAL_ARGS;
                }
                // Gather up all the remaining arguments into a string and pass them into
                // grunt.
                StringBuffer sb = new StringBuffer();
                String remainders[] = opts.getRemainingArgs();
                for (int i = 0; i < remainders.length; i++) {
                    if (i != 0) sb.append(' ');
                    sb.append(remainders[i]);
                }

                sb.append('\n');

                scriptState.setScript(sb.toString());

                in = new BufferedReader(new StringReader(sb.toString()));

                grunt = new Grunt(in, pigContext);
                gruntCalled = true;
                int results[] = grunt.exec();
                return getReturnCodeForStats(results);
            }

            default:
                break;
            }

            // If we're here, we don't know yet what they want.  They may have just
            // given us a jar to execute, they might have given us a pig script to
            // execute, or they might have given us a dash (or nothing) which means to
            // run grunt interactive.
            String remainders[] = opts.getRemainingArgs();
            if (remainders == null) {
                if(checkScriptOnly) {
                    System.err.println("ERROR:" +
                            "-c (-check) option is only valid " +
                            "when executing pig with a pig script file)");
                    return ReturnCode.ILLEGAL_ARGS;
                }
                // Interactive
                mode = ExecMode.SHELL;
                //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
                ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(System.in, properties), System.out);
                reader.setExpandEvents(false);
                reader.setPrompt("grunt> ");
                final String HISTORYFILE = ".pig_history";
                String historyFile = System.getProperty("user.home") + File.separator  + HISTORYFILE;
                reader.setHistory(new FileHistory(new File(historyFile)));
                ConsoleReaderInputStream inputStream = new ConsoleReaderInputStream(reader);
                grunt = new Grunt(new BufferedReader(new InputStreamReader(inputStream)), pigContext);
                grunt.setConsoleReader(reader);
                gruntCalled = true;
                grunt.run();
                return ReturnCode.SUCCESS;
            } else {
                pigContext.getProperties().setProperty(PigContext.PIG_CMD_ARGS_REMAINDERS, ObjectSerializer.serialize(remainders));

                // They have a pig script they want us to run.
                mode = ExecMode.FILE;

                FileLocalizer.FetchFileRet localFileRet = FileLocalizer.fetchFile(properties, remainders[0]);
                if (localFileRet.didFetch) {
                    properties.setProperty("pig.jars.relative.to.dfs", "true");
                }

                scriptState.setFileName(remainders[0]);

                if (embedded) {
                    return runEmbeddedScript(pigContext, localFileRet.file.getPath(), engine);
                } else {
                    SupportedScriptLang type = determineScriptType(localFileRet.file.getPath());
                    if (type != null) {
                        return runEmbeddedScript(pigContext, localFileRet.file
                                    .getPath(), type.name().toLowerCase());
                    }
                }
                //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
                InputStream seqInputStream = Utils.getCompositeStream(new FileInputStream(localFileRet.file), properties);
                in = new BufferedReader(new InputStreamReader(seqInputStream));

                // run parameter substitution preprocessor first
                substFile = remainders[0] + ".substituted";
                pin = runParamPreprocessor(pigContext, in, substFile, debug || dryrun || checkScriptOnly);
                if (dryrun) {
                    if (dryrun(substFile, pigContext)) {
                        log.info("Dry run completed. Substituted pig script is at "
                                + substFile
                                + ". Expanded pig script is at "
                                + remainders[0] + ".expanded");
                    } else {
                        log.info("Dry run completed. Substituted pig script is at "
                                + substFile);
                    }
                    return ReturnCode.SUCCESS;
                }

                logFileName = validateLogFile(logFileName, localFileRet.file);
                pigContext.getProperties().setProperty("pig.logfile", (logFileName == null? "": logFileName));

                if (!debug) {
                    new File(substFile).deleteOnExit();
                }

                // Set job name based on name of the script
                pigContext.getProperties().setProperty(PigContext.JOB_NAME,
                                                       "PigLatin:" +new File(remainders[0]).getName()
                );

                scriptState.setScript(localFileRet.file);

                grunt = new Grunt(pin, pigContext);
                gruntCalled = true;

                if(checkScriptOnly) {
                    grunt.checkScript(substFile);
                    System.err.println(remainders[0] + " syntax OK");
                    rc = ReturnCode.SUCCESS;
                } else {
                    int results[] = grunt.exec();
                    rc = getReturnCodeForStats(results);
                }
                return rc;
            }

            // Per Utkarsh and Chris invocation of jar file via pig depricated.
        } catch (ParseException e) {
            usage();
            rc = ReturnCode.PARSE_EXCEPTION;
            PigStatsUtil.setErrorMessage(e.getMessage());
            PigStatsUtil.setErrorThrowable(e);
        } catch (org.apache.pig.tools.parameters.ParseException e) {
           // usage();
            rc = ReturnCode.PARSE_EXCEPTION;
            PigStatsUtil.setErrorMessage(e.getMessage());
            PigStatsUtil.setErrorThrowable(e);
        } catch (IOException e) {
            if (e instanceof PigException) {
                PigException pe = (PigException)e;
                rc = (pe.retriable()) ? ReturnCode.RETRIABLE_EXCEPTION
                        : ReturnCode.PIG_EXCEPTION;
                PigStatsUtil.setErrorMessage(pe.getMessage());
                PigStatsUtil.setErrorCode(pe.getErrorCode());
            } else {
                rc = ReturnCode.IO_EXCEPTION;
                PigStatsUtil.setErrorMessage(e.getMessage());
            }
            PigStatsUtil.setErrorThrowable(e);

            if(!gruntCalled) {
                LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
            }
            killRunningJobsIfInterrupted(e, pigContext);
        } catch (Throwable e) {
            rc = ReturnCode.THROWABLE_EXCEPTION;
            PigStatsUtil.setErrorMessage(e.getMessage());
            PigStatsUtil.setErrorThrowable(e);

            if(!gruntCalled) {
                LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
            }
            killRunningJobsIfInterrupted(e, pigContext);
        } finally {
            if (printScriptRunTime) {
                printScriptRunTime(startTime);
            }
            if (deleteTempFiles) {
                // clear temp files
                FileLocalizer.deleteTempFiles();
            }
            if (pigContext != null) {
                pigContext.getExecutionEngine().destroy();
            }
            PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();
        }

        return rc;
    }