in ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java [2870:3340]
protected RequestStageContainer doStageCreation(RequestStageContainer requestStages,
Cluster cluster,
Map<State, List<Service>> changedServices,
Map<State, List<ServiceComponent>> changedComps,
Map<String, Map<State, List<ServiceComponentHost>>> changedScHosts,
Map<String, String> requestParameters,
Map<String, String> requestProperties,
boolean runSmokeTest, boolean reconfigureClients, boolean useLatestConfigs, boolean useClusterHostInfo)
throws AmbariException {
// TODO handle different transitions?
// Say HDFS to stopped and MR to started, what order should actions be done
// in?
// TODO additional validation?
// verify all configs
// verify all required components
if ((changedServices == null || changedServices.isEmpty())
&& (changedComps == null || changedComps.isEmpty())
&& (changedScHosts == null || changedScHosts.isEmpty())) {
LOG.info("Created 0 stages");
return requestStages;
}
// check all stack configs are present in desired configs
configHelper.checkAllStageConfigsPresentInDesiredConfigs(cluster);
// caching upgrade suspended
boolean isUpgradeSuspended = cluster.isUpgradeSuspended();
// caching database type
DatabaseType databaseType = configs.getDatabaseType();
// smoke test any service that goes from installed to started
Set<String> smokeTestServices = getServicesForSmokeTests(cluster,
changedServices, changedScHosts, runSmokeTest);
if (reconfigureClients) {
// Re-install client only hosts to reattach changed configs on service
// restart
addClientSchForReinstall(cluster, changedServices, changedScHosts);
}
if (!changedScHosts.isEmpty()
|| !smokeTestServices.isEmpty()) {
long nowTimestamp = System.currentTimeMillis();
// FIXME cannot work with a single stage
// multiple stages may be needed for reconfigure
Map<String, Set<String>> clusterHostInfo = StageUtils.getClusterHostInfo(cluster);
String clusterHostInfoJson = StageUtils.getGson().toJson(clusterHostInfo);
Stage stage = createNewStage(requestStages.getLastStageId(), cluster,
requestStages.getId(), requestProperties.get(RequestResourceProvider.CONTEXT),
"{}", null);
boolean skipFailure = false;
if (requestProperties.containsKey(Setting.SETTING_NAME_SKIP_FAILURE) && requestProperties.get(Setting.SETTING_NAME_SKIP_FAILURE).equalsIgnoreCase("true")) {
skipFailure = true;
}
stage.setAutoSkipFailureSupported(skipFailure);
stage.setSkippable(skipFailure);
Collection<ServiceComponentHost> componentsToEnableKerberos = new ArrayList<>();
Set<String> hostsToForceKerberosOperations = new HashSet<>();
/* *******************************************************************************************
* If Kerberos is enabled, pre-process the changed components to update any configurations and
* indicate which components may need to have principals or keytab files created.
*
* NOTE: Configurations need to be updated before tasks are created to install components
* so that any configuration changes are included before the task is queued.
*
* Kerberos-related stages need to be inserted between the INSTALLED and STARTED states
* because some services need to set up the host (i,e, create user accounts, etc...)
* before Kerberos-related tasks an occur (like distribute keytabs)
* **************************************************************************************** */
if(kerberosHelper.isClusterKerberosEnabled(cluster)) {
Collection<ServiceComponentHost> componentsToConfigureForKerberos = new ArrayList<>();
for (Map<State, List<ServiceComponentHost>> changedScHostStates : changedScHosts.values()) {
if (changedScHostStates != null) {
for (Map.Entry<State, List<ServiceComponentHost>> changedScHostState : changedScHostStates.entrySet()) {
State newState = changedScHostState.getKey();
if (newState == State.INSTALLED) {
List<ServiceComponentHost> scHosts = changedScHostState.getValue();
if (scHosts != null) {
for (ServiceComponentHost scHost : scHosts) {
State oldSchState = scHost.getState();
// If the state is transitioning from INIT TO INSTALLED and the cluster has Kerberos
// enabled, mark this ServiceComponentHost to see if anything needs to be done to
// make sure it is properly configured.
//
// If the component is transitioning from an INSTALL_FAILED to an INSTALLED state
// indicates a failure attempt on install followed by a new installation attempt and
// will also need consideration for Kerberos-related tasks
if ((oldSchState == State.INIT || oldSchState == State.INSTALL_FAILED)) {
// Check if the host component already exists, if it exists there is no need to
// reset Kerberos-related configs.
// Check if it's blueprint install. If it is, then do not configure this service
// at this time.
if (!hostComponentAlreadyExists(cluster, scHost) && !(CLUSTER_PHASE_INITIAL_INSTALL.equals(requestProperties.get(CLUSTER_PHASE_PROPERTY)))) {
componentsToConfigureForKerberos.add(scHost);
}
// Add the ServiceComponentHost to the componentsToEnableKerberos Set to indicate
// it may need Kerberos-related operations to be performed on its behalf.
// For example, creating principals and keytab files.
componentsToEnableKerberos.add(scHost);
if (Service.Type.KERBEROS.name().equalsIgnoreCase(scHost.getServiceName()) &&
Role.KERBEROS_CLIENT.name().equalsIgnoreCase(scHost.getServiceComponentName())) {
// Since the KERBEROS/KERBEROS_CLIENT is about to be moved from the INIT to the
// INSTALLED state (and it should be by the time the stages (in this request)
// that need to be execute), collect the relevant hostname to make sure the
// Kerberos logic doest not skip operations for it.
hostsToForceKerberosOperations.add(scHost.getHostName());
}
}
}
}
}
}
}
}
// If there are any components that may need Kerberos-related configuration changes, do it
// here - before the INSTALL tasks get created so the configuration updates are set and
// get included in the task details.
if (!componentsToConfigureForKerberos.isEmpty()) {
// Build service/component filter to declare what services and components are being added
// so kerberosHelper.configureServices know which to work on. Null indicates no filter
// and all services and components will be (re)configured, however null will not be
// passed in from here.
Map<String, Collection<String>> serviceFilter = new HashMap<>();
for (ServiceComponentHost scHost : componentsToConfigureForKerberos) {
String serviceName = scHost.getServiceName();
Collection<String> componentFilter = serviceFilter.get(serviceName);
if (componentFilter == null) {
componentFilter = new HashSet<>();
serviceFilter.put(serviceName, componentFilter);
}
componentFilter.add(scHost.getServiceComponentName());
}
try {
kerberosHelper.configureServices(cluster, serviceFilter);
} catch (KerberosInvalidConfigurationException e) {
throw new AmbariException(e.getMessage(), e);
}
}
}
for (String compName : changedScHosts.keySet()) {
for (State newState : changedScHosts.get(compName).keySet()) {
for (ServiceComponentHost scHost :
changedScHosts.get(compName).get(newState)) {
Service service = cluster.getService(scHost.getServiceName());
ServiceComponent serviceComponent = service.getServiceComponent(compName);
if (StringUtils.isBlank(stage.getHostParamsStage())) {
RepositoryVersionEntity repositoryVersion = serviceComponent.getDesiredRepositoryVersion();
stage.setHostParamsStage(StageUtils.getGson().toJson(
customCommandExecutionHelper.createDefaultHostParams(cluster, repositoryVersion.getStackId())));
}
// Do not create role command for hosts that are not responding
if (scHost.getHostState().equals(HostState.HEARTBEAT_LOST)) {
LOG.info("Command is not created for servicecomponenthost "
+ ", clusterName=" + cluster.getClusterName()
+ ", clusterId=" + cluster.getClusterId()
+ ", serviceName=" + scHost.getServiceName()
+ ", componentName=" + scHost.getServiceComponentName()
+ ", hostname=" + scHost.getHostName()
+ ", hostState=" + scHost.getHostState()
+ ", targetNewState=" + newState);
continue;
}
RoleCommand roleCommand;
State oldSchState = scHost.getState();
ServiceComponentHostEvent event;
switch (newState) {
case INSTALLED:
if (oldSchState == State.INIT
|| oldSchState == State.UNINSTALLED
|| oldSchState == State.INSTALLED
|| oldSchState == State.INSTALLING
|| oldSchState == State.UNKNOWN
|| oldSchState == State.INSTALL_FAILED) {
roleCommand = RoleCommand.INSTALL;
if (scHost.isClientComponent() && oldSchState == State.INSTALLED) {
// Client reinstalls are executed to reattach changed configs on service.
// Do not transition a client component to INSTALLING state if it was installed.
// Prevents INSTALL_FAILED state if a command gets aborted.
event = new ServiceComponentHostOpInProgressEvent(
scHost.getServiceComponentName(), scHost.getHostName(),
nowTimestamp);
} else {
event = new ServiceComponentHostInstallEvent(
scHost.getServiceComponentName(), scHost.getHostName(),
nowTimestamp,
serviceComponent.getDesiredStackId().getStackId());
}
} else if (oldSchState == State.STARTED
// TODO: oldSchState == State.INSTALLED is always false, looks like a bug
//|| oldSchState == State.INSTALLED
|| oldSchState == State.STOPPING) {
roleCommand = RoleCommand.STOP;
event = new ServiceComponentHostStopEvent(
scHost.getServiceComponentName(), scHost.getHostName(),
nowTimestamp);
} else if (oldSchState == State.UPGRADING) {
roleCommand = RoleCommand.UPGRADE;
event = new ServiceComponentHostUpgradeEvent(
scHost.getServiceComponentName(), scHost.getHostName(),
nowTimestamp, serviceComponent.getDesiredStackId().getStackId());
} else {
throw new AmbariException("Invalid transition for"
+ " servicecomponenthost"
+ ", clusterName=" + cluster.getClusterName()
+ ", clusterId=" + cluster.getClusterId()
+ ", serviceName=" + scHost.getServiceName()
+ ", componentName=" + scHost.getServiceComponentName()
+ ", hostname=" + scHost.getHostName()
+ ", currentState=" + oldSchState
+ ", newDesiredState=" + newState);
}
break;
case STARTED:
StackId stackId = serviceComponent.getDesiredStackId();
ComponentInfo compInfo = ambariMetaInfo.getComponent(
stackId.getStackName(), stackId.getStackVersion(), scHost.getServiceName(),
scHost.getServiceComponentName());
if (oldSchState == State.INSTALLED ||
oldSchState == State.STARTING ||
//todo: after separating install and start, the install stage is no longer in request stage container
//todo: so projected state will not equal INSTALLED which causes an exception for invalid state transition
//todo: so for now disabling this check
//todo: this change breaks test AmbariManagementControllerTest.testServiceComponentHostUpdateRecursive()
true) {
// requestStages.getProjectedState(scHost.getHostName(),
// scHost.getServiceComponentName()) == State.INSTALLED) {
roleCommand = RoleCommand.START;
event = new ServiceComponentHostStartEvent(
scHost.getServiceComponentName(), scHost.getHostName(),
nowTimestamp);
} else {
String error = "Invalid transition for"
+ " servicecomponenthost"
+ ", clusterName=" + cluster.getClusterName()
+ ", clusterId=" + cluster.getClusterId()
+ ", serviceName=" + scHost.getServiceName()
+ ", componentName=" + scHost.getServiceComponentName()
+ ", hostname=" + scHost.getHostName()
+ ", currentState=" + oldSchState
+ ", newDesiredState=" + newState;
if (compInfo.isMaster()) {
throw new AmbariException(error);
} else {
LOG.info("Ignoring: " + error);
continue;
}
}
break;
case UNINSTALLED:
if (oldSchState == State.INSTALLED
|| oldSchState == State.UNINSTALLING) {
roleCommand = RoleCommand.UNINSTALL;
event = new ServiceComponentHostStartEvent(
scHost.getServiceComponentName(), scHost.getHostName(),
nowTimestamp);
} else {
throw new AmbariException("Invalid transition for"
+ " servicecomponenthost"
+ ", clusterName=" + cluster.getClusterName()
+ ", clusterId=" + cluster.getClusterId()
+ ", serviceName=" + scHost.getServiceName()
+ ", componentName=" + scHost.getServiceComponentName()
+ ", hostname=" + scHost.getHostName()
+ ", currentState=" + oldSchState
+ ", newDesiredState=" + newState);
}
break;
case INIT:
if (oldSchState == State.INSTALLED ||
oldSchState == State.INSTALL_FAILED ||
oldSchState == State.INIT) {
scHost.setState(State.INIT);
continue;
} else {
throw new AmbariException("Unsupported transition to INIT for"
+ " servicecomponenthost"
+ ", clusterName=" + cluster.getClusterName()
+ ", clusterId=" + cluster.getClusterId()
+ ", serviceName=" + scHost.getServiceName()
+ ", componentName=" + scHost.getServiceComponentName()
+ ", hostname=" + scHost.getHostName()
+ ", currentState=" + oldSchState
+ ", newDesiredState=" + newState);
}
default:
throw new AmbariException("Unsupported state change operation"
+ ", newState=" + newState);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Create a new host action, requestId={}, componentName={}, hostname={}, roleCommand={}",
requestStages.getId(), scHost.getServiceComponentName(), scHost.getHostName(), roleCommand.name());
}
// any targeted information
String keyName = scHost.getServiceComponentName().toLowerCase();
if (requestProperties.containsKey(keyName)) {
// in the case where the command is targeted, but the states
// of the old and new are the same, the targeted component
// may still need to get the command. This is true for Flume.
if (oldSchState == newState) {
switch (oldSchState) {
case INSTALLED:
roleCommand = RoleCommand.STOP;
event = new ServiceComponentHostStopEvent(
scHost.getServiceComponentName(), scHost.getHostName(),
nowTimestamp);
break;
case STARTED:
roleCommand = RoleCommand.START;
event = new ServiceComponentHostStartEvent(
scHost.getServiceComponentName(), scHost.getHostName(),
nowTimestamp);
break;
default:
break;
}
}
if (null == requestParameters) {
requestParameters = new HashMap<>();
}
requestParameters.put(keyName, requestProperties.get(keyName));
}
if (requestProperties.containsKey(CLUSTER_PHASE_PROPERTY)) {
if (null == requestParameters) {
requestParameters = new HashMap<>();
}
requestParameters.put(CLUSTER_PHASE_PROPERTY, requestProperties.get(CLUSTER_PHASE_PROPERTY));
}
Map<String, DesiredConfig> clusterDesiredConfigs = cluster.getDesiredConfigs();
// Skip INSTALL task in case SysPrepped hosts and in case of server components. In case of server component
// START task should run configuration script.
if (newState == State.INSTALLED && skipInstallTaskForComponent(requestProperties, cluster, scHost)) {
LOG.info("Skipping create of INSTALL task for {} on {}.", scHost.getServiceComponentName(), scHost.getHostName());
// set state to INSTALLING, then immediately send an ServiceComponentHostOpSucceededEvent to allow
// transitioning from INSTALLING --> INSTALLED.
scHost.setState(State.INSTALLING);
long now = System.currentTimeMillis();
try {
scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(scHost.getServiceComponentName(), scHost.getHostName(), now));
} catch (InvalidStateTransitionException e) {
LOG.error("Error transitioning ServiceComponentHost state to INSTALLED", e);
}
} else {
// !!! can never be null
RepositoryVersionEntity repoVersion = serviceComponent.getDesiredRepositoryVersion();
createHostAction(cluster, stage, scHost,
roleCommand, requestParameters, event, skipFailure, repoVersion, isUpgradeSuspended,
databaseType, clusterDesiredConfigs, useLatestConfigs);
}
}
}
}
for (String serviceName : smokeTestServices) { // Creates smoke test commands
Service s = cluster.getService(serviceName);
// find service component host
ServiceComponent component = getClientComponentForRunningAction(cluster, s);
String componentName = component != null ? component.getName() : null;
String clientHost = getClientHostForRunningAction(cluster, s, component);
String smokeTestRole = actionMetadata.getServiceCheckAction(serviceName);
if (clientHost == null || smokeTestRole == null) {
LOG.info("Nothing to do for service check as could not find role or"
+ " or host to run check on"
+ ", clusterName=" + cluster.getClusterName()
+ ", serviceName=" + serviceName
+ ", clientHost=" + clientHost
+ ", serviceCheckRole=" + smokeTestRole);
continue;
}
if (StringUtils.isBlank(stage.getHostParamsStage())) {
RepositoryVersionEntity repositoryVersion = component.getDesiredRepositoryVersion();
stage.setHostParamsStage(StageUtils.getGson().toJson(
customCommandExecutionHelper.createDefaultHostParams(cluster, repositoryVersion.getStackId())));
}
customCommandExecutionHelper.addServiceCheckAction(stage, clientHost, smokeTestRole,
nowTimestamp, serviceName, componentName, null, false, false, useLatestConfigs);
}
RoleCommandOrder rco = getRoleCommandOrder(cluster);
RoleGraph rg = roleGraphFactory.createNew(rco);
if (CommandExecutionType.DEPENDENCY_ORDERED == configs.getStageExecutionType() &&
CLUSTER_PHASE_INITIAL_START.equals(requestProperties.get(CLUSTER_PHASE_PROPERTY))
) {
LOG.info("Set DEPENDENCY_ORDERED CommandExecutionType on stage: {}", stage.getRequestContext());
rg.setCommandExecutionType(CommandExecutionType.DEPENDENCY_ORDERED);
}
rg.build(stage);
if (useClusterHostInfo) {
requestStages.setClusterHostInfo(clusterHostInfoJson);
}
requestStages.addStages(rg.getStages());
if (!componentsToEnableKerberos.isEmpty()) {
Map<String, Collection<String>> serviceFilter = new HashMap<>();
Set<String> hostFilter = new HashSet<>();
for (ServiceComponentHost scHost : componentsToEnableKerberos) {
String serviceName = scHost.getServiceName();
Collection<String> componentFilter = serviceFilter.get(serviceName);
if (componentFilter == null) {
componentFilter = new HashSet<>();
serviceFilter.put(serviceName, componentFilter);
}
componentFilter.add(scHost.getServiceComponentName());
hostFilter.add(scHost.getHostName());
}
try {
kerberosHelper.ensureIdentities(cluster, serviceFilter, hostFilter, null, hostsToForceKerberosOperations, requestStages,
kerberosHelper.getManageIdentitiesDirective(requestProperties));
} catch (KerberosOperationException e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
}
List<Stage> stages = requestStages.getStages();
LOG.debug("Created {} stages", ((stages != null) ? stages.size() : 0));
} else {
LOG.debug("Created 0 stages");
}
return requestStages;
}