protected RequestStageContainer doStageCreation()

in ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java [2870:3340]


  protected RequestStageContainer doStageCreation(RequestStageContainer requestStages,
      Cluster cluster,
      Map<State, List<Service>> changedServices,
      Map<State, List<ServiceComponent>> changedComps,
      Map<String, Map<State, List<ServiceComponentHost>>> changedScHosts,
      Map<String, String> requestParameters,
      Map<String, String> requestProperties,
      boolean runSmokeTest, boolean reconfigureClients, boolean useLatestConfigs, boolean useClusterHostInfo)
      throws AmbariException {


    // TODO handle different transitions?
    // Say HDFS to stopped and MR to started, what order should actions be done
    // in?

    // TODO additional validation?
    // verify all configs
    // verify all required components

    if ((changedServices == null || changedServices.isEmpty())
        && (changedComps == null || changedComps.isEmpty())
        && (changedScHosts == null || changedScHosts.isEmpty())) {
      LOG.info("Created 0 stages");
      return requestStages;
    }

    // check all stack configs are present in desired configs
    configHelper.checkAllStageConfigsPresentInDesiredConfigs(cluster);

    // caching upgrade suspended
    boolean isUpgradeSuspended = cluster.isUpgradeSuspended();

    // caching database type
    DatabaseType databaseType = configs.getDatabaseType();

    // smoke test any service that goes from installed to started
    Set<String> smokeTestServices = getServicesForSmokeTests(cluster,
      changedServices, changedScHosts, runSmokeTest);

    if (reconfigureClients) {
      // Re-install client only hosts to reattach changed configs on service
      // restart
      addClientSchForReinstall(cluster, changedServices, changedScHosts);
    }

    if (!changedScHosts.isEmpty()
        || !smokeTestServices.isEmpty()) {
      long nowTimestamp = System.currentTimeMillis();

      // FIXME cannot work with a single stage
      // multiple stages may be needed for reconfigure
      Map<String, Set<String>> clusterHostInfo = StageUtils.getClusterHostInfo(cluster);

      String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);

      Stage stage = createNewStage(requestStages.getLastStageId(), cluster,
          requestStages.getId(), requestProperties.get(RequestResourceProvider.CONTEXT),
          "{}", null);
      boolean skipFailure = false;
      if (requestProperties.containsKey(Setting.SETTING_NAME_SKIP_FAILURE) && requestProperties.get(Setting.SETTING_NAME_SKIP_FAILURE).equalsIgnoreCase("true")) {
        skipFailure = true;
      }
      stage.setAutoSkipFailureSupported(skipFailure);
      stage.setSkippable(skipFailure);

      Collection<ServiceComponentHost> componentsToEnableKerberos = new ArrayList<>();
      Set<String> hostsToForceKerberosOperations = new HashSet<>();

      /* *******************************************************************************************
       * If Kerberos is enabled, pre-process the changed components to update any configurations and
       * indicate which components may need to have principals or keytab files created.
       *
       * NOTE: Configurations need to be updated before tasks are created to install components
       *       so that any configuration changes are included before the task is queued.
       *
       *       Kerberos-related stages need to be inserted between the INSTALLED and STARTED states
       *       because some services need to set up the host (i,e, create user accounts, etc...)
       *       before Kerberos-related tasks an occur (like distribute keytabs)
       * **************************************************************************************** */
      if(kerberosHelper.isClusterKerberosEnabled(cluster)) {
        Collection<ServiceComponentHost> componentsToConfigureForKerberos = new ArrayList<>();

        for (Map<State, List<ServiceComponentHost>> changedScHostStates : changedScHosts.values()) {

          if (changedScHostStates != null) {
            for (Map.Entry<State, List<ServiceComponentHost>> changedScHostState : changedScHostStates.entrySet()) {
              State newState = changedScHostState.getKey();

              if (newState == State.INSTALLED) {
                List<ServiceComponentHost> scHosts = changedScHostState.getValue();

                if (scHosts != null) {
                  for (ServiceComponentHost scHost : scHosts) {
                    State oldSchState = scHost.getState();

                    // If the state is transitioning from INIT TO INSTALLED and the cluster has Kerberos
                    // enabled, mark this ServiceComponentHost to see if anything needs to be done to
                    // make sure it is properly configured.
                    //
                    // If the component is transitioning from an INSTALL_FAILED to an INSTALLED state
                    // indicates a failure attempt on install followed by a new installation attempt and
                    // will also need consideration for Kerberos-related tasks
                    if ((oldSchState == State.INIT || oldSchState == State.INSTALL_FAILED)) {
                      // Check if the host component already exists, if it exists there is no need to
                      // reset Kerberos-related configs.
                      // Check if it's blueprint install. If it is, then do not configure this service
                      // at this time.
                      if (!hostComponentAlreadyExists(cluster, scHost) && !(CLUSTER_PHASE_INITIAL_INSTALL.equals(requestProperties.get(CLUSTER_PHASE_PROPERTY)))) {
                        componentsToConfigureForKerberos.add(scHost);
                      }

                      // Add the ServiceComponentHost to the componentsToEnableKerberos Set to indicate
                      // it may need Kerberos-related operations to be performed on its behalf.
                      // For example, creating principals and keytab files.
                      componentsToEnableKerberos.add(scHost);

                      if (Service.Type.KERBEROS.name().equalsIgnoreCase(scHost.getServiceName()) &&
                          Role.KERBEROS_CLIENT.name().equalsIgnoreCase(scHost.getServiceComponentName())) {
                        // Since the KERBEROS/KERBEROS_CLIENT is about to be moved from the INIT to the
                        // INSTALLED state (and it should be by the time the stages (in this request)
                        // that need to be execute), collect the relevant hostname to make sure the
                        // Kerberos logic doest not skip operations for it.
                        hostsToForceKerberosOperations.add(scHost.getHostName());
                      }
                    }
                  }
                }
              }
            }
          }
        }

        // If there are any components that may need Kerberos-related configuration changes, do it
        // here - before the INSTALL tasks get created so the configuration updates are set and
        // get included in the task details.
        if (!componentsToConfigureForKerberos.isEmpty()) {
          // Build service/component filter to declare what services and components are being added
          // so kerberosHelper.configureServices know which to work on.  Null indicates no filter
          // and all services and components will be (re)configured, however null will not be
          // passed in from here.
          Map<String, Collection<String>> serviceFilter = new HashMap<>();

          for (ServiceComponentHost scHost : componentsToConfigureForKerberos) {
            String serviceName = scHost.getServiceName();
            Collection<String> componentFilter = serviceFilter.get(serviceName);

            if (componentFilter == null) {
              componentFilter = new HashSet<>();
              serviceFilter.put(serviceName, componentFilter);
            }

            componentFilter.add(scHost.getServiceComponentName());
          }

          try {
            kerberosHelper.configureServices(cluster, serviceFilter);
          } catch (KerberosInvalidConfigurationException e) {
            throw new AmbariException(e.getMessage(), e);
          }
        }
      }

      for (String compName : changedScHosts.keySet()) {
        for (State newState : changedScHosts.get(compName).keySet()) {
          for (ServiceComponentHost scHost :
              changedScHosts.get(compName).get(newState)) {

            Service service = cluster.getService(scHost.getServiceName());
            ServiceComponent serviceComponent = service.getServiceComponent(compName);

            if (StringUtils.isBlank(stage.getHostParamsStage())) {
              RepositoryVersionEntity repositoryVersion = serviceComponent.getDesiredRepositoryVersion();
              stage.setHostParamsStage(StageUtils.getGson().toJson(
                  customCommandExecutionHelper.createDefaultHostParams(cluster, repositoryVersion.getStackId())));
            }


            // Do not create role command for hosts that are not responding
            if (scHost.getHostState().equals(HostState.HEARTBEAT_LOST)) {
              LOG.info("Command is not created for servicecomponenthost "
                  + ", clusterName=" + cluster.getClusterName()
                  + ", clusterId=" + cluster.getClusterId()
                  + ", serviceName=" + scHost.getServiceName()
                  + ", componentName=" + scHost.getServiceComponentName()
                  + ", hostname=" + scHost.getHostName()
                  + ", hostState=" + scHost.getHostState()
                  + ", targetNewState=" + newState);
              continue;
            }

            RoleCommand roleCommand;
            State oldSchState = scHost.getState();
            ServiceComponentHostEvent event;

            switch (newState) {
              case INSTALLED:
                if (oldSchState == State.INIT
                    || oldSchState == State.UNINSTALLED
                    || oldSchState == State.INSTALLED
                    || oldSchState == State.INSTALLING
                    || oldSchState == State.UNKNOWN
                    || oldSchState == State.INSTALL_FAILED) {
                  roleCommand = RoleCommand.INSTALL;

                  if (scHost.isClientComponent() && oldSchState == State.INSTALLED) {
                    // Client reinstalls are executed to reattach changed configs on service.
                    // Do not transition a client component to INSTALLING state if it was installed.
                    // Prevents INSTALL_FAILED state if a command gets aborted.
                    event = new ServiceComponentHostOpInProgressEvent(
                        scHost.getServiceComponentName(), scHost.getHostName(),
                        nowTimestamp);
                  } else {
                    event = new ServiceComponentHostInstallEvent(
                        scHost.getServiceComponentName(), scHost.getHostName(),
                        nowTimestamp,
                        serviceComponent.getDesiredStackId().getStackId());
                  }
                } else if (oldSchState == State.STARTED
                      // TODO: oldSchState == State.INSTALLED is always false, looks like a bug
                      //|| oldSchState == State.INSTALLED
                    || oldSchState == State.STOPPING) {
                  roleCommand = RoleCommand.STOP;
                  event = new ServiceComponentHostStopEvent(
                      scHost.getServiceComponentName(), scHost.getHostName(),
                      nowTimestamp);
                } else if (oldSchState == State.UPGRADING) {
                  roleCommand = RoleCommand.UPGRADE;
                  event = new ServiceComponentHostUpgradeEvent(
                      scHost.getServiceComponentName(), scHost.getHostName(),
                      nowTimestamp, serviceComponent.getDesiredStackId().getStackId());
                } else {
                  throw new AmbariException("Invalid transition for"
                      + " servicecomponenthost"
                      + ", clusterName=" + cluster.getClusterName()
                      + ", clusterId=" + cluster.getClusterId()
                      + ", serviceName=" + scHost.getServiceName()
                      + ", componentName=" + scHost.getServiceComponentName()
                      + ", hostname=" + scHost.getHostName()
                      + ", currentState=" + oldSchState
                      + ", newDesiredState=" + newState);
                }
                break;
              case STARTED:
                StackId stackId = serviceComponent.getDesiredStackId();
                ComponentInfo compInfo = ambariMetaInfo.getComponent(
                    stackId.getStackName(), stackId.getStackVersion(), scHost.getServiceName(),
                    scHost.getServiceComponentName());

                if (oldSchState == State.INSTALLED ||
                    oldSchState == State.STARTING ||
                    //todo: after separating install and start, the install stage is no longer in request stage container
                    //todo: so projected state will not equal INSTALLED which causes an exception for invalid state transition
                    //todo: so for now disabling this check
                    //todo: this change breaks test AmbariManagementControllerTest.testServiceComponentHostUpdateRecursive()
                    true) {
//                    requestStages.getProjectedState(scHost.getHostName(),
//                        scHost.getServiceComponentName()) == State.INSTALLED) {
                  roleCommand = RoleCommand.START;
                  event = new ServiceComponentHostStartEvent(
                      scHost.getServiceComponentName(), scHost.getHostName(),
                      nowTimestamp);
                } else {
                  String error = "Invalid transition for"
                      + " servicecomponenthost"
                      + ", clusterName=" + cluster.getClusterName()
                      + ", clusterId=" + cluster.getClusterId()
                      + ", serviceName=" + scHost.getServiceName()
                      + ", componentName=" + scHost.getServiceComponentName()
                      + ", hostname=" + scHost.getHostName()
                      + ", currentState=" + oldSchState
                      + ", newDesiredState=" + newState;
                  if (compInfo.isMaster()) {
                    throw new AmbariException(error);
                  } else {
                    LOG.info("Ignoring: " + error);
                    continue;
                  }
                }
                break;
              case UNINSTALLED:
                if (oldSchState == State.INSTALLED
                    || oldSchState == State.UNINSTALLING) {
                  roleCommand = RoleCommand.UNINSTALL;
                  event = new ServiceComponentHostStartEvent(
                      scHost.getServiceComponentName(), scHost.getHostName(),
                      nowTimestamp);
                } else {
                  throw new AmbariException("Invalid transition for"
                      + " servicecomponenthost"
                      + ", clusterName=" + cluster.getClusterName()
                      + ", clusterId=" + cluster.getClusterId()
                      + ", serviceName=" + scHost.getServiceName()
                      + ", componentName=" + scHost.getServiceComponentName()
                      + ", hostname=" + scHost.getHostName()
                      + ", currentState=" + oldSchState
                      + ", newDesiredState=" + newState);
                }
                break;
              case INIT:
                if (oldSchState == State.INSTALLED ||
                    oldSchState == State.INSTALL_FAILED ||
                    oldSchState == State.INIT) {
                  scHost.setState(State.INIT);
                  continue;
                } else  {
                  throw new AmbariException("Unsupported transition to INIT for"
                      + " servicecomponenthost"
                      + ", clusterName=" + cluster.getClusterName()
                      + ", clusterId=" + cluster.getClusterId()
                      + ", serviceName=" + scHost.getServiceName()
                      + ", componentName=" + scHost.getServiceComponentName()
                      + ", hostname=" + scHost.getHostName()
                      + ", currentState=" + oldSchState
                      + ", newDesiredState=" + newState);
                }
              default:
                throw new AmbariException("Unsupported state change operation"
                    + ", newState=" + newState);
            }

            if (LOG.isDebugEnabled()) {
              LOG.debug("Create a new host action, requestId={}, componentName={}, hostname={}, roleCommand={}",
                requestStages.getId(), scHost.getServiceComponentName(), scHost.getHostName(), roleCommand.name());
            }

            // any targeted information
            String keyName = scHost.getServiceComponentName().toLowerCase();
            if (requestProperties.containsKey(keyName)) {
              // in the case where the command is targeted, but the states
              // of the old and new are the same, the targeted component
              // may still need to get the command.  This is true for Flume.
              if (oldSchState == newState) {
                switch (oldSchState) {
                  case INSTALLED:
                    roleCommand = RoleCommand.STOP;
                    event = new ServiceComponentHostStopEvent(
                        scHost.getServiceComponentName(), scHost.getHostName(),
                        nowTimestamp);
                    break;
                  case STARTED:
                    roleCommand = RoleCommand.START;
                    event = new ServiceComponentHostStartEvent(
                        scHost.getServiceComponentName(), scHost.getHostName(),
                        nowTimestamp);
                    break;
                  default:
                    break;
                }
              }

              if (null == requestParameters) {
                requestParameters = new HashMap<>();
              }
              requestParameters.put(keyName, requestProperties.get(keyName));
            }

            if (requestProperties.containsKey(CLUSTER_PHASE_PROPERTY)) {
              if (null == requestParameters) {
                requestParameters = new HashMap<>();
              }
              requestParameters.put(CLUSTER_PHASE_PROPERTY, requestProperties.get(CLUSTER_PHASE_PROPERTY));

            }

            Map<String, DesiredConfig> clusterDesiredConfigs = cluster.getDesiredConfigs();

            // Skip INSTALL task in case SysPrepped hosts and in case of server components. In case of server component
            // START task should run configuration script.
            if (newState == State.INSTALLED && skipInstallTaskForComponent(requestProperties, cluster, scHost)) {
              LOG.info("Skipping create of INSTALL task for {} on {}.", scHost.getServiceComponentName(), scHost.getHostName());
              // set state to INSTALLING, then immediately send an ServiceComponentHostOpSucceededEvent to allow
              // transitioning from INSTALLING --> INSTALLED.
              scHost.setState(State.INSTALLING);
              long now = System.currentTimeMillis();
              try {
                scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(scHost.getServiceComponentName(), scHost.getHostName(), now));
              } catch (InvalidStateTransitionException e) {
                LOG.error("Error transitioning ServiceComponentHost state to INSTALLED", e);
              }
            } else {
              // !!! can never be null
              RepositoryVersionEntity repoVersion = serviceComponent.getDesiredRepositoryVersion();

              createHostAction(cluster, stage, scHost,
                roleCommand, requestParameters, event, skipFailure, repoVersion, isUpgradeSuspended,
                databaseType, clusterDesiredConfigs, useLatestConfigs);
            }

          }
        }
      }

      for (String serviceName : smokeTestServices) { // Creates smoke test commands
        Service s = cluster.getService(serviceName);
        // find service component host
        ServiceComponent component = getClientComponentForRunningAction(cluster, s);
        String componentName = component != null ? component.getName() : null;
        String clientHost = getClientHostForRunningAction(cluster, s, component);
        String smokeTestRole = actionMetadata.getServiceCheckAction(serviceName);

        if (clientHost == null || smokeTestRole == null) {
          LOG.info("Nothing to do for service check as could not find role or"
              + " or host to run check on"
              + ", clusterName=" + cluster.getClusterName()
              + ", serviceName=" + serviceName
              + ", clientHost=" + clientHost
              + ", serviceCheckRole=" + smokeTestRole);
          continue;
        }

        if (StringUtils.isBlank(stage.getHostParamsStage())) {
          RepositoryVersionEntity repositoryVersion = component.getDesiredRepositoryVersion();
          stage.setHostParamsStage(StageUtils.getGson().toJson(
              customCommandExecutionHelper.createDefaultHostParams(cluster, repositoryVersion.getStackId())));
        }

        customCommandExecutionHelper.addServiceCheckAction(stage, clientHost, smokeTestRole,
            nowTimestamp, serviceName, componentName, null, false, false, useLatestConfigs);
      }

      RoleCommandOrder rco = getRoleCommandOrder(cluster);
      RoleGraph rg = roleGraphFactory.createNew(rco);


      if (CommandExecutionType.DEPENDENCY_ORDERED == configs.getStageExecutionType() &&
        CLUSTER_PHASE_INITIAL_START.equals(requestProperties.get(CLUSTER_PHASE_PROPERTY))
      ) {
        LOG.info("Set DEPENDENCY_ORDERED CommandExecutionType on stage: {}", stage.getRequestContext());
        rg.setCommandExecutionType(CommandExecutionType.DEPENDENCY_ORDERED);
      }
      rg.build(stage);
      if (useClusterHostInfo) {
        requestStages.setClusterHostInfo(clusterHostInfoJson);
      }

      requestStages.addStages(rg.getStages());

      if (!componentsToEnableKerberos.isEmpty()) {
        Map<String, Collection<String>> serviceFilter = new HashMap<>();
        Set<String> hostFilter = new HashSet<>();

        for (ServiceComponentHost scHost : componentsToEnableKerberos) {
          String serviceName = scHost.getServiceName();
          Collection<String> componentFilter = serviceFilter.get(serviceName);

          if (componentFilter == null) {
            componentFilter = new HashSet<>();
            serviceFilter.put(serviceName, componentFilter);
          }

          componentFilter.add(scHost.getServiceComponentName());
          hostFilter.add(scHost.getHostName());
        }

        try {
          kerberosHelper.ensureIdentities(cluster, serviceFilter, hostFilter, null, hostsToForceKerberosOperations, requestStages,
              kerberosHelper.getManageIdentitiesDirective(requestProperties));
        } catch (KerberosOperationException e) {
          throw new IllegalArgumentException(e.getMessage(), e);
        }
      }

      List<Stage> stages = requestStages.getStages();
      LOG.debug("Created {} stages", ((stages != null) ? stages.size() : 0));

    } else {
      LOG.debug("Created 0 stages");
    }

    return requestStages;
  }