protected Void execute()

in core/src/main/java/org/apache/oozie/command/coord/CoordActionStartXCommand.java [187:316]


    protected Void execute() throws CommandException {
        boolean makeFail = true;
        String errCode = "";
        String errMsg = "";
        ParamChecker.notEmpty(user, "user");

        log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus());
        if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) {
            try {
                // log.debug("getting.. job id: " + coordAction.getJobId());
                // create merged runConf to pass to WF Engine
                Configuration runConf = mergeConfig(coordAction);
                coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString());
                // log.debug("%%% merged runconf=" +
                // XmlUtils.prettyPrint(runConf).toString());
                DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);

                Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
                SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(),
                        Status.STARTED,
                        SlaAppType.COORDINATOR_ACTION, log);
                if(slaEvent != null) {
                    insertList.add(slaEvent);
                }
                if (OozieJobInfo.isJobInfoEnabled()) {
                    conf.set(OozieJobInfo.COORD_ID, actionId);
                    conf.set(OozieJobInfo.COORD_NAME, appName);
                    conf.set(OozieJobInfo.COORD_NOMINAL_TIME, coordAction.getNominalTimestamp().toString());
                }
                // Normalize workflow appPath here;
                JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
                if (coordAction.getExternalId() != null) {
                    conf.setBoolean(OozieClient.RERUN_FAIL_NODES, true);
                    dagEngine.reRun(coordAction.getExternalId(), conf);
                } else {
                    // Pushing the nominal time in conf to use for launcher tag search
                    conf.set(OOZIE_COORD_ACTION_NOMINAL_TIME,String.valueOf(coordAction.getNominalTime().getTime()));
                    String wfId = dagEngine.submitJobFromCoordinator(conf, actionId);
                    coordAction.setExternalId(wfId);
                }
                coordAction.setStatus(CoordinatorAction.Status.RUNNING);
                coordAction.incrementAndGetPending();

                //store.updateCoordinatorAction(coordAction);
                JPAService jpaService = Services.get().get(JPAService.class);
                if (jpaService != null) {
                    log.debug("Updating WF record for WFID :" + coordAction.getExternalId() + " with parent id: " + actionId);
                    WorkflowJobBean wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STARTTIME,
                            coordAction.getExternalId());
                    wfJob.setParentId(actionId);
                    wfJob.setLastModifiedTime(new Date());
                    BatchQueryExecutor executor = BatchQueryExecutor.getInstance();
                    updateList.add(new UpdateEntry<WorkflowJobQuery>(
                            WorkflowJobQuery.UPDATE_WORKFLOW_PARENT_MODIFIED, wfJob));
                    updateList.add(new UpdateEntry<CoordActionQuery>(
                            CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction));
                    try {
                        executor.executeBatchInsertUpdateDelete(insertList, updateList, null);
                        queue(new CoordActionNotificationXCommand(coordAction), 100);
                        if (EventHandlerService.isEnabled()) {
                            generateEvent(coordAction, user, appName, wfJob.getStartTime());
                        }
                    }
                    catch (JPAExecutorException je) {
                        throw new CommandException(je);
                    }
                }
                else {
                    log.error(ErrorCode.E0610);
                }

                makeFail = false;
            }
            catch (DagEngineException dee) {
                errMsg = dee.getMessage();
                errCode = dee.getErrorCode().toString();
                log.warn("can not create DagEngine for submitting jobs", dee);
            }
            catch (CommandException ce) {
                errMsg = ce.getMessage();
                errCode = ce.getErrorCode().toString();
                log.warn("command exception occurred ", ce);
            }
            catch (java.io.IOException ioe) {
                errMsg = ioe.getMessage();
                errCode = "E1005";
                log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe);
            }
            catch (Exception ex) {
                errMsg = ex.getMessage();
                errCode = "E1005";
                log.warn("can not create DagEngine for submitting jobs", ex);
            }
            finally {
                if (makeFail == true) { // No DB exception occurs
                    log.error("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg);
                    coordAction.setStatus(CoordinatorAction.Status.FAILED);
                    if (errMsg.length() > 254) { // Because table column size is 255
                        errMsg = errMsg.substring(0, 255);
                    }
                    coordAction.setErrorMessage(errMsg);
                    coordAction.setErrorCode(errCode);

                    updateList = new ArrayList<UpdateEntry>();
                    updateList.add(new UpdateEntry<CoordActionQuery>(
                                    CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction));
                    insertList = new ArrayList<JsonBean>();

                    SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(),
                            Status.FAILED,
                            SlaAppType.COORDINATOR_ACTION, log);
                    if(slaEvent != null) {
                        insertList.add(slaEvent); //Update SLA events
                    }
                    try {
                        // call JPAExecutor to do the bulk writes
                        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
                        if (EventHandlerService.isEnabled()) {
                            generateEvent(coordAction, user, appName, null);
                        }
                    }
                    catch (JPAExecutorException je) {
                        throw new CommandException(je);
                    }
                    queue(new CoordActionReadyXCommand(coordAction.getJobId()));
                }
            }
        }
        return null;
    }