protected Map processInProgressStage()

in ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java [729:894]


  protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule,
                                                          Multimap<Long, AgentCommand> commandsToEnqueue) throws AmbariException {
    LOG.debug("==> Collecting commands to schedule...");
    // Map to track role status
    Map<String, RoleStats> roleStats = initRoleStats(s);
    long now = System.currentTimeMillis();
    Set<RoleCommandPair> rolesCommandsInProgress = s.getHostRolesInProgress();

    Cluster cluster = null;
    if (null != s.getClusterName()) {
      cluster = clusters.getCluster(s.getClusterName());
    }

    for (String host : s.getHosts()) {

      List<ExecutionCommandWrapper> commandWrappers = s.getExecutionCommands(host);
      Host hostObj = null;
      try {
        hostObj = clusters.getHost(host);
      } catch (AmbariException e) {
        LOG.debug("Host {} not found, stage is likely a server side action", host);
      }

      int i_my = 0;
      LOG.trace("===>host={}", host);

      for (ExecutionCommandWrapper wrapper : commandWrappers) {
        ExecutionCommand c = wrapper.getExecutionCommand();
        String roleStr = c.getRole();
        HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
        i_my++;
        if (LOG.isTraceEnabled()) {
          LOG.trace("Host task {}) id = {} status = {} (role={}), roleCommand = {}", i_my, c.getTaskId(), status, roleStr, c.getRoleCommand());
        }
        boolean hostDeleted = false;
        if (null != cluster) {
          Service svc = null;
          if (c.getServiceName() != null && !c.getServiceName().isEmpty()) {
            svc = cluster.getService(c.getServiceName());
          }

          ServiceComponent svcComp = null;
          Map<String, ServiceComponentHost> scHosts = null;
          try {
            if (svc != null) {
              svcComp = svc.getServiceComponent(roleStr);
              scHosts = svcComp.getServiceComponentHosts();
            }
          } catch (ServiceComponentNotFoundException scnex) {
            String msg = String.format(
              "%s is not not a service component, assuming its an action",
              roleStr);
            LOG.debug(msg);
          }

          hostDeleted = (scHosts != null && !scHosts.containsKey(host));
          if (hostDeleted) {
            String message = String.format(
              "Host component information has not been found.  Details:" +
                "cluster=%s; host=%s; service=%s; component=%s; ",
              c.getClusterName(), host,
              svcComp == null ? "null" : svcComp.getServiceName(),
              svcComp == null ? "null" : svcComp.getName());
            LOG.warn(message);
          }
        }

        //basic timeout for stage
        long commandTimeout = actionTimeout;
        if (taskTimeoutAdjustment) {
          Map<String, String> commandParams = c.getCommandParams();
          String timeoutKey = ExecutionCommand.KeyNames.COMMAND_TIMEOUT;
          if (commandParams != null && commandParams.containsKey(timeoutKey)) {
            String timeoutStr = commandParams.get(timeoutKey);
            commandTimeout += Long.parseLong(timeoutStr) * 1000; // Converting to milliseconds
          } else {
            LOG.error("Execution command has no timeout parameter" +
              c);
          }
        }

        // Check that service host component is not deleted
        boolean isHostStateUnknown = false;
        if (hostDeleted) {

          String message = String.format(
            "Host not found when trying to schedule an execution command. " +
              "The most probable reason for that is that host or host component " +
              "has been deleted recently. The command has been aborted and dequeued." +
              "Execution command details: " +
              "cmdId: %s; taskId: %s; roleCommand: %s",
            c.getCommandId(), c.getTaskId(), c.getRoleCommand());
          LOG.warn("Host {} has been detected as non-available. {}", host, message);
          // Abort the command itself
          // We don't need to send CANCEL_COMMANDs in this case
          db.abortHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), message);
          if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
            processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
          }
          status = HostRoleStatus.ABORTED;
        } else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)
          || (isHostStateUnknown = isHostStateUnknown(s, hostObj, roleStr))) {
          // Process command timeouts
          if (s.getAttemptCount(host, roleStr) >= maxAttempts || isHostStateUnknown) {
            LOG.warn("Host: {}, role: {}, actionId: {} expired and will be failed", host, roleStr,
              s.getActionId());

            // determine if the task should be auto skipped
            boolean isSkipSupported = s.isAutoSkipOnFailureSupported();
            HostRoleCommand hostRoleCommand = s.getHostRoleCommand(c.getTaskId());
            if (isSkipSupported && null != hostRoleCommand) {
              isSkipSupported = hostRoleCommand.isFailureAutoSkipped();
            }

            db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), isSkipSupported, isHostStateUnknown);
            //Reinitialize status
            status = s.getHostRoleStatus(host, roleStr);

            if (null != cluster) {
              if (!RoleCommand.CUSTOM_COMMAND.equals(c.getRoleCommand())
                && !RoleCommand.SERVICE_CHECK.equals(c.getRoleCommand())
                && !RoleCommand.ACTIONEXECUTE.equals(c.getRoleCommand())) {
                //commands above don't affect host component state (e.g. no in_progress state in process), transition will fail
                transitionToFailedState(cluster.getClusterName(), c.getServiceName(), roleStr, host, now, false);
              }
              if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
                processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
              }
            }

            // Dequeue command
            LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId());
          } else {
            cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr)), commandsToEnqueue);

            LOG.info("Host: {}, role: {}, actionId: {} timed out and will be rescheduled", host,
              roleStr, s.getActionId());

            // reschedule command
            commandsToSchedule.add(c);
            LOG.trace("===> commandsToSchedule(reschedule)={}", commandsToSchedule.size());
          }
        } else if (status.equals(HostRoleStatus.PENDING)) {
          // in case of DEPENDENCY_ORDERED stage command can be scheduled only if all of it's dependencies are
          // already finished
          if (CommandExecutionType.STAGE == s.getCommandExecutionType() ||
                (CommandExecutionType.DEPENDENCY_ORDERED == s.getCommandExecutionType() &&
                  CommandExecutionType.DEPENDENCY_ORDERED == configuration.getStageExecutionType() &&
                  areCommandDependenciesFinished(c, s, rolesCommandsInProgress))) {

            //Need to schedule first time
            commandsToSchedule.add(c);
            LOG.trace("===>commandsToSchedule(first_time)={}", commandsToSchedule.size());
          }
        }

        updateRoleStats(status, roleStats.get(roleStr));
        if (status == HostRoleStatus.FAILED) {
          LOG.info("Role {} on host {} was failed", roleStr, host);
        }

      }
    }
    LOG.debug("Collected {} commands to schedule in this wakeup.", commandsToSchedule.size());
    return roleStats;
  }