in ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java [729:894]
protected Map<String, RoleStats> processInProgressStage(Stage s, List<ExecutionCommand> commandsToSchedule,
Multimap<Long, AgentCommand> commandsToEnqueue) throws AmbariException {
LOG.debug("==> Collecting commands to schedule...");
// Map to track role status
Map<String, RoleStats> roleStats = initRoleStats(s);
long now = System.currentTimeMillis();
Set<RoleCommandPair> rolesCommandsInProgress = s.getHostRolesInProgress();
Cluster cluster = null;
if (null != s.getClusterName()) {
cluster = clusters.getCluster(s.getClusterName());
}
for (String host : s.getHosts()) {
List<ExecutionCommandWrapper> commandWrappers = s.getExecutionCommands(host);
Host hostObj = null;
try {
hostObj = clusters.getHost(host);
} catch (AmbariException e) {
LOG.debug("Host {} not found, stage is likely a server side action", host);
}
int i_my = 0;
LOG.trace("===>host={}", host);
for (ExecutionCommandWrapper wrapper : commandWrappers) {
ExecutionCommand c = wrapper.getExecutionCommand();
String roleStr = c.getRole();
HostRoleStatus status = s.getHostRoleStatus(host, roleStr);
i_my++;
if (LOG.isTraceEnabled()) {
LOG.trace("Host task {}) id = {} status = {} (role={}), roleCommand = {}", i_my, c.getTaskId(), status, roleStr, c.getRoleCommand());
}
boolean hostDeleted = false;
if (null != cluster) {
Service svc = null;
if (c.getServiceName() != null && !c.getServiceName().isEmpty()) {
svc = cluster.getService(c.getServiceName());
}
ServiceComponent svcComp = null;
Map<String, ServiceComponentHost> scHosts = null;
try {
if (svc != null) {
svcComp = svc.getServiceComponent(roleStr);
scHosts = svcComp.getServiceComponentHosts();
}
} catch (ServiceComponentNotFoundException scnex) {
String msg = String.format(
"%s is not not a service component, assuming its an action",
roleStr);
LOG.debug(msg);
}
hostDeleted = (scHosts != null && !scHosts.containsKey(host));
if (hostDeleted) {
String message = String.format(
"Host component information has not been found. Details:" +
"cluster=%s; host=%s; service=%s; component=%s; ",
c.getClusterName(), host,
svcComp == null ? "null" : svcComp.getServiceName(),
svcComp == null ? "null" : svcComp.getName());
LOG.warn(message);
}
}
//basic timeout for stage
long commandTimeout = actionTimeout;
if (taskTimeoutAdjustment) {
Map<String, String> commandParams = c.getCommandParams();
String timeoutKey = ExecutionCommand.KeyNames.COMMAND_TIMEOUT;
if (commandParams != null && commandParams.containsKey(timeoutKey)) {
String timeoutStr = commandParams.get(timeoutKey);
commandTimeout += Long.parseLong(timeoutStr) * 1000; // Converting to milliseconds
} else {
LOG.error("Execution command has no timeout parameter" +
c);
}
}
// Check that service host component is not deleted
boolean isHostStateUnknown = false;
if (hostDeleted) {
String message = String.format(
"Host not found when trying to schedule an execution command. " +
"The most probable reason for that is that host or host component " +
"has been deleted recently. The command has been aborted and dequeued." +
"Execution command details: " +
"cmdId: %s; taskId: %s; roleCommand: %s",
c.getCommandId(), c.getTaskId(), c.getRoleCommand());
LOG.warn("Host {} has been detected as non-available. {}", host, message);
// Abort the command itself
// We don't need to send CANCEL_COMMANDs in this case
db.abortHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), message);
if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
status = HostRoleStatus.ABORTED;
} else if (timeOutActionNeeded(status, s, hostObj, roleStr, now, commandTimeout)
|| (isHostStateUnknown = isHostStateUnknown(s, hostObj, roleStr))) {
// Process command timeouts
if (s.getAttemptCount(host, roleStr) >= maxAttempts || isHostStateUnknown) {
LOG.warn("Host: {}, role: {}, actionId: {} expired and will be failed", host, roleStr,
s.getActionId());
// determine if the task should be auto skipped
boolean isSkipSupported = s.isAutoSkipOnFailureSupported();
HostRoleCommand hostRoleCommand = s.getHostRoleCommand(c.getTaskId());
if (isSkipSupported && null != hostRoleCommand) {
isSkipSupported = hostRoleCommand.isFailureAutoSkipped();
}
db.timeoutHostRole(host, s.getRequestId(), s.getStageId(), c.getRole(), isSkipSupported, isHostStateUnknown);
//Reinitialize status
status = s.getHostRoleStatus(host, roleStr);
if (null != cluster) {
if (!RoleCommand.CUSTOM_COMMAND.equals(c.getRoleCommand())
&& !RoleCommand.SERVICE_CHECK.equals(c.getRoleCommand())
&& !RoleCommand.ACTIONEXECUTE.equals(c.getRoleCommand())) {
//commands above don't affect host component state (e.g. no in_progress state in process), transition will fail
transitionToFailedState(cluster.getClusterName(), c.getServiceName(), roleStr, host, now, false);
}
if (c.getRoleCommand().equals(RoleCommand.ACTIONEXECUTE)) {
processActionDeath(cluster.getClusterName(), c.getHostname(), roleStr);
}
}
// Dequeue command
LOG.info("Removing command from queue, host={}, commandId={} ", host, c.getCommandId());
} else {
cancelCommandOnTimeout(Collections.singletonList(s.getHostRoleCommand(host, roleStr)), commandsToEnqueue);
LOG.info("Host: {}, role: {}, actionId: {} timed out and will be rescheduled", host,
roleStr, s.getActionId());
// reschedule command
commandsToSchedule.add(c);
LOG.trace("===> commandsToSchedule(reschedule)={}", commandsToSchedule.size());
}
} else if (status.equals(HostRoleStatus.PENDING)) {
// in case of DEPENDENCY_ORDERED stage command can be scheduled only if all of it's dependencies are
// already finished
if (CommandExecutionType.STAGE == s.getCommandExecutionType() ||
(CommandExecutionType.DEPENDENCY_ORDERED == s.getCommandExecutionType() &&
CommandExecutionType.DEPENDENCY_ORDERED == configuration.getStageExecutionType() &&
areCommandDependenciesFinished(c, s, rolesCommandsInProgress))) {
//Need to schedule first time
commandsToSchedule.add(c);
LOG.trace("===>commandsToSchedule(first_time)={}", commandsToSchedule.size());
}
}
updateRoleStats(status, roleStats.get(roleStr));
if (status == HostRoleStatus.FAILED) {
LOG.info("Role {} on host {} was failed", roleStr, host);
}
}
}
LOG.debug("Collected {} commands to schedule in this wakeup.", commandsToSchedule.size());
return roleStats;
}