protected Void execute()

in core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java [73:193]


    protected Void execute() throws CommandException {
        // number of actions to start (-1 means start ALL)
        int numActionsToStart = -1;

        // get execution setting for this job (FIFO, LIFO, LAST_ONLY)
        CoordinatorJob.Execution jobExecution = coordJob.getExecutionOrder();
        // get concurrency setting for this job
        int jobConcurrency = coordJob.getConcurrency();
        // if less than 0, then UNLIMITED concurrency
        if (jobConcurrency >= 0) {
            // count number of actions that are already RUNNING or SUBMITTED
            // subtract from CONCURRENCY to calculate number of actions to start
            // in WF engine

            int numRunningJobs;
            try {
                numRunningJobs = jpaService.execute(new CoordJobGetRunningActionsCountJPAExecutor(jobId));
            }
            catch (JPAExecutorException je) {
                throw new CommandException(je);
            }

            numActionsToStart = jobConcurrency - numRunningJobs;
            if (numActionsToStart < 0) {
                numActionsToStart = 0;
            }
            log.debug("concurrency=" + jobConcurrency + ", execution=" + jobExecution + ", numRunningJobs="
                    + numRunningJobs + ", numLeftover=" + numActionsToStart);
            // no actions to start
            if (numActionsToStart == 0) {
                log.info("Not starting any additional actions because max concurrency [{0}]" +
                        " for coordinator [{1}] has been reached.", jobConcurrency, jobId);
            }
        }
        // get list of actions that are READY and fit in the concurrency and execution

        List<CoordinatorActionBean> actions;
        try {
            actions = jpaService.execute(new CoordJobGetReadyActionsJPAExecutor(jobId, jobExecution.name()));
        }
        catch (JPAExecutorException je) {
            throw new CommandException(je);
        }
        log.debug("Number of READY actions = " + actions.size());
        Date now = new Date();
        // If we're using LAST_ONLY or NONE, we should check if any of these need to be SKIPPED instead of SUBMITTED
        if (jobExecution.equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
            for (Iterator<CoordinatorActionBean> it = actions.iterator(); it.hasNext(); ) {
                CoordinatorActionBean action = it.next();
                try {
                    Date nextNominalTime = CoordCommandUtils.computeNextNominalTime(coordJob, action);
                    if (nextNominalTime != null) {
                        // If the current time is after the next action's nominal time, then we've passed the window where this
                        // action should be started; so set it to SKIPPED
                        if (now.after(nextNominalTime)) {
                            LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later "
                                    + "than the nominal time [{2}] of the next action]", action.getId(),
                                    DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
                            queue(new CoordActionSkipXCommand(action, coordJob.getUser(), coordJob.getAppName()));
                            it.remove();
                        } else {
                            LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier "
                                    + "than the nominal time [{2}] of the next action]", action.getId(),
                                    DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
                        }
                    }
                } catch (ParseException | JDOMException e) {
                    LOG.error("Failed to calculate next nominal time", e);
                }
            }
        }
        else if (jobExecution.equals(CoordinatorJobBean.Execution.NONE)) {
            for (Iterator<CoordinatorActionBean> it = actions.iterator(); it.hasNext(); ) {
                CoordinatorActionBean action = it.next();
                // If the current time is after the nominal time of this action plus some tolerance,
                // then we've passed the window where this action should be started; so set it to SKIPPED
                Calendar cal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
                cal.setTime(action.getNominalTime());
                int tolerance = ConfigurationService.getInt(CoordActionInputCheckXCommand.COORD_EXECUTION_NONE_TOLERANCE);
                cal.add(Calendar.MINUTE, tolerance);
                if (now.after(cal.getTime())) {
                    LOG.info("NONE execution: Preparing to skip action [{0}] because the current time [{1}] is more than [{2}]"
                                    + " minutes later than the nominal time [{3}] of the current action]", action.getId(),
                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(action.getNominalTime()));
                    queue(new CoordActionSkipXCommand(action, coordJob.getUser(), coordJob.getAppName()));
                    it.remove();
                } else {
                    LOG.debug("NONE execution: Not skipping action [{0}] because the current time [{1}] is earlier than [{2}]"
                                    + " minutes later than the nominal time [{3}] of the current action]", action.getId(),
                            DateUtils.formatDateOozieTZ(now), tolerance, DateUtils.formatDateOozieTZ(action.getNominalTime()));
                }
            }
        }

        int counter = 0;
        for (CoordinatorActionBean action : actions) {
            // continue if numActionsToStart is negative (no limit on number of
            // actions), or if the counter is less than numActionsToStart
            if ((numActionsToStart < 0) || (counter < numActionsToStart)) {
                log.debug("Set status to SUBMITTED for id: " + action.getId());
                // change state of action to SUBMITTED
                action.setStatus(CoordinatorAction.Status.SUBMITTED);
                try {
                    CoordActionQueryExecutor.getInstance().executeUpdate(
                            CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
                }
                catch (JPAExecutorException je) {
                    throw new CommandException(je);
                }
                // start action
                new CoordActionStartXCommand(action.getId(), coordJob.getUser(), coordJob.getAppName(),
                        action.getJobId()).call();
            }
            else {
                break;
            }
            counter++;

        }
        return null;
    }