in ambari-server/src/main/java/org/apache/ambari/server/stack/upgrade/orchestrate/UpgradeHelper.java [288:561]
public List<UpgradeGroupHolder> createSequence(UpgradePack upgradePack,
UpgradeContext context) throws AmbariException {
Cluster cluster = context.getCluster();
MasterHostResolver mhr = context.getResolver();
Map<String, AddComponentTask> addedComponentsDuringUpgrade = upgradePack.getAddComponentTasks();
// Note, only a Rolling Upgrade uses processing tasks.
Map<String, Map<String, ProcessingComponent>> allTasks = upgradePack.getTasks();
List<UpgradeGroupHolder> groups = new ArrayList<>();
UpgradeGroupHolder previousGroupHolder = null;
for (Grouping group : upgradePack.getGroups(context.getDirection())) {
// !!! grouping is not scoped to context
if (!context.isScoped(group.scope)) {
continue;
}
// if there is a condition on the group, evaluate it and skip scheduling
// of this group if the condition has not been satisfied
if (null != group.condition && !group.condition.isSatisfied(context)) {
LOG.info("Skipping {} while building upgrade orchestration due to {}", group, group.condition );
continue;
}
UpgradeGroupHolder groupHolder = new UpgradeGroupHolder();
groupHolder.name = group.name;
groupHolder.title = group.title;
groupHolder.groupClass = group.getClass();
groupHolder.skippable = group.skippable;
groupHolder.supportsAutoSkipOnFailure = group.supportsAutoSkipOnFailure;
groupHolder.allowRetry = group.allowRetry;
groupHolder.processingGroup = group.isProcessingGroup();
// !!! all downgrades are skippable
if (context.getDirection().isDowngrade()) {
groupHolder.skippable = true;
}
// Attempt to get the function of the group, during a NonRolling Upgrade
Task.Type functionName = null;
if (group instanceof UpgradeFunction) {
functionName = ((UpgradeFunction) group).getFunction();
}
// NonRolling defaults to not performing service checks on a group.
// Of course, a Service Check Group does indeed run them.
if (upgradePack.getType() == UpgradeType.NON_ROLLING) {
group.performServiceCheck = false;
}
StageWrapperBuilder builder = group.getBuilder();
List<UpgradePack.OrderService> services = group.services;
// Rolling Downgrade must reverse the order of services.
if (upgradePack.getType() == UpgradeType.ROLLING) {
if (context.getDirection().isDowngrade() && !services.isEmpty()) {
List<UpgradePack.OrderService> reverse = new ArrayList<>(services);
Collections.reverse(reverse);
services = reverse;
}
}
// !!! cluster and service checks are empty here
for (UpgradePack.OrderService service : services) {
if (!context.isServiceSupported(service.serviceName)) {
continue;
}
if (upgradePack.getType() == UpgradeType.ROLLING && !allTasks.containsKey(service.serviceName)) {
continue;
}
for (String component : service.components) {
// Rolling Upgrade has exactly one task for a Component.
// NonRolling Upgrade has several tasks for the same component, since it must first call Stop, perform several
// other tasks, and then Start on that Component.
if (upgradePack.getType() == UpgradeType.ROLLING && !allTasks.get(service.serviceName).containsKey(component)) {
continue;
}
// if a function name is present, build the tasks dynamically;
// otherwise use the tasks defined in the upgrade pack processing
ProcessingComponent pc = null;
if (null == functionName) {
pc = allTasks.get(service.serviceName).get(component);
} else {
// Construct a processing task on-the-fly if it is a "stop" group.
if (functionName == Type.STOP) {
pc = new ProcessingComponent();
pc.name = component;
pc.tasks = new ArrayList<>();
pc.tasks.add(new StopTask());
} else {
// For Start and Restart, make a best attempt at finding
// Processing Components.
// If they don't exist, make one on the fly.
if (allTasks.containsKey(service.serviceName)
&& allTasks.get(service.serviceName).containsKey(component)) {
pc = allTasks.get(service.serviceName).get(component);
} else {
// Construct a processing task on-the-fly so that the Upgrade
// Pack is less verbose.
pc = new ProcessingComponent();
pc.name = component;
pc.tasks = new ArrayList<>();
if (functionName == Type.START) {
pc.tasks.add(new StartTask());
}
if (functionName == Type.RESTART) {
pc.tasks.add(new RestartTask());
}
}
}
}
if (pc == null) {
LOG.error(MessageFormat.format("Couldn't create a processing component for service {0} and component {1}.", service.serviceName, component));
continue;
}
HostsType hostsType = mhr.getMasterAndHosts(service.serviceName, component);
// only worry about adding future commands if this is a start/restart task
boolean taskIsRestartOrStart = functionName == null || functionName == Type.START
|| functionName == Type.RESTART;
// see if this component has an add component task which will indicate
// we need to dynamically schedule some more tasks by predicting where
// the components will be installed
String serviceAndComponentHash = service.serviceName + "/" + component;
if (taskIsRestartOrStart && addedComponentsDuringUpgrade.containsKey(serviceAndComponentHash)) {
AddComponentTask task = addedComponentsDuringUpgrade.get(serviceAndComponentHash);
Collection<Host> candidateHosts = MasterHostResolver.getCandidateHosts(cluster,
task.hosts, task.hostService, task.hostComponent);
// if we have candidate hosts, then we can create a structure to be
// scheduled
if (!candidateHosts.isEmpty()) {
if (null == hostsType) {
// a null hosts type usually means that the component is not
// installed in the cluster - but it's possible that it's going
// to be added as part of the upgrade. If this is the case, then
// we need to schedule tasks assuming the add works
hostsType = HostsType.normal(
candidateHosts.stream().map(host -> host.getHostName()).collect(
Collectors.toCollection(LinkedHashSet::new)));
} else {
// it's possible that we're adding components that may already
// exist in the cluster - in this case, we must append the
// structure instead of creating a new one
Set<String> hostsForTask = hostsType.getHosts();
for (Host host : candidateHosts) {
hostsForTask.add(host.getHostName());
}
}
}
}
// if we still have no hosts, then truly skip this component
if (null == hostsType) {
continue;
}
if (!hostsType.unhealthy.isEmpty()) {
context.addUnhealthy(hostsType.unhealthy);
}
Service svc = cluster.getService(service.serviceName);
setDisplayNames(context, service.serviceName, component);
// Special case for NAMENODE when there are multiple
if (service.serviceName.equalsIgnoreCase("HDFS") && component.equalsIgnoreCase("NAMENODE")) {
// Rolling Upgrade requires first upgrading the Standby, then the Active NameNode.
// Whereas NonRolling needs to do the following:
// NameNode HA: Pick one to the be active, and the other the standby.
// Non-NameNode HA: Upgrade first the SECONDARY, then the primary NAMENODE
switch (upgradePack.getType()) {
case ROLLING:
if (!hostsType.getHosts().isEmpty() && hostsType.hasMastersAndSecondaries()) {
// The order is important, first do the standby, then the active namenode.
hostsType.arrangeHostSecondariesFirst();
builder.add(context, hostsType, service.serviceName,
svc.isClientOnlyService(), pc, null);
} else {
LOG.warn("Could not orchestrate NameNode. Hosts could not be resolved: hosts={}, active={}, standby={}",
StringUtils.join(hostsType.getHosts(), ','), hostsType.getMasters(), hostsType.getSecondaries());
}
break;
case NON_ROLLING:
boolean isNameNodeHA = mhr.isNameNodeHA();
if (isNameNodeHA && hostsType.hasMastersAndSecondaries()) {
// This could be any order, but the NameNodes have to know what role they are going to take.
// So need to make 2 stages, and add different parameters to each one.
builder.add(context, HostsType.normal(hostsType.getMasters()), service.serviceName,
svc.isClientOnlyService(), pc, nameNodeRole("active"));
builder.add(context, HostsType.normal(hostsType.getSecondaries()), service.serviceName,
svc.isClientOnlyService(), pc, nameNodeRole("standby"));
} else {
// If no NameNode HA, then don't need to change hostsType.hosts since there should be exactly one.
builder.add(context, hostsType, service.serviceName,
svc.isClientOnlyService(), pc, null);
}
break;
}
} else {
builder.add(context, hostsType, service.serviceName,
svc.isClientOnlyService(), pc, null);
}
}
}
List<StageWrapper> proxies = builder.build(context);
if (CollectionUtils.isNotEmpty(proxies)) {
groupHolder.items = proxies;
postProcess(context, groupHolder);
// !!! prevent service checks from running twice. merge the stage wrappers
if (ServiceCheckGrouping.class.isInstance(group)) {
if (null != previousGroupHolder && ServiceCheckGrouping.class.equals(previousGroupHolder.groupClass)) {
mergeServiceChecks(groupHolder, previousGroupHolder);
} else {
groups.add(groupHolder);
}
} else {
groups.add(groupHolder);
}
previousGroupHolder = groupHolder;
}
}
if (LOG.isDebugEnabled()) {
for (UpgradeGroupHolder group : groups) {
LOG.debug(group.name);
int i = 0;
for (StageWrapper proxy : group.items) {
LOG.debug(" Stage {}", Integer.valueOf(i++));
int j = 0;
for (TaskWrapper task : proxy.getTasks()) {
LOG.debug(" Task {} {}", Integer.valueOf(j++), task);
}
}
}
}
// !!! strip off the first service check if nothing has been processed
Iterator<UpgradeGroupHolder> iterator = groups.iterator();
boolean canServiceCheck = false;
while (iterator.hasNext()) {
UpgradeGroupHolder holder = iterator.next();
if (ServiceCheckGrouping.class.equals(holder.groupClass) && !canServiceCheck) {
iterator.remove();
}
canServiceCheck |= holder.processingGroup;
}
return groups;
}