in ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java [324:539]
protected void processCommandReports(List<CommandReport> reports, String hostName, Long now)
throws AmbariException {
// Cache HostRoleCommand entities because we will need them few times
List<Long> taskIds = new ArrayList<>();
for (CommandReport report : reports) {
taskIds.add(report.getTaskId());
}
Map<Long, HostRoleCommand> commands = actionManager.getTasksMap(taskIds);
for (CommandReport report : reports) {
Long clusterId = Long.parseLong(report.getClusterId());
LOG.debug("Received command report: {}", report);
// get this locally; don't touch the database
Host host = clusterFsm.getHost(hostName);
if (host == null) {
LOG.error("Received a command report and was unable to retrieve Host for hostname = " + hostName);
continue;
}
// Send event for final command reports for actions
if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE &&
HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) {
ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
clusterId, hostName, report, false);
ambariEventPublisher.publish(event);
}
// Fetch HostRoleCommand that corresponds to a given task ID
HostRoleCommand hostRoleCommand = commands.get(report.getTaskId());
if (hostRoleCommand == null) {
LOG.warn("Can't fetch HostRoleCommand with taskId = " + report.getTaskId());
} else {
// Skip sending events for command reports for ABORTed commands
if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
continue;
}
if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
report.getStatus().equals("IN_PROGRESS")) {
hostRoleCommand.setStartTime(now);
// Because the task may be retried several times, set the original start time only once.
if (hostRoleCommand.getOriginalStartTime() == -1) {
hostRoleCommand.setOriginalStartTime(now);
}
}
}
// If the report indicates the keytab file was successfully transferred to a host or removed
// from a host, record this for future reference
if (Service.Type.KERBEROS.name().equalsIgnoreCase(report.getServiceName()) &&
Role.KERBEROS_CLIENT.name().equalsIgnoreCase(report.getRole()) &&
RoleCommand.CUSTOM_COMMAND.name().equalsIgnoreCase(report.getRoleCommand()) &&
RequestExecution.Status.COMPLETED.name().equalsIgnoreCase(report.getStatus())) {
String customCommand = report.getCustomCommand();
if (SET_KEYTAB.equalsIgnoreCase(customCommand)) {
WriteKeytabsStructuredOut writeKeytabsStructuredOut;
try {
writeKeytabsStructuredOut = gson.fromJson(report.getStructuredOut(), WriteKeytabsStructuredOut.class);
} catch (JsonSyntaxException ex) {
//Json structure was incorrect do nothing, pass this data further for processing
writeKeytabsStructuredOut = null;
}
if (writeKeytabsStructuredOut != null) {
Map<String, String> keytabs = writeKeytabsStructuredOut.getKeytabs();
if (keytabs != null) {
for (Map.Entry<String, String> entry : keytabs.entrySet()) {
String keytabPath = entry.getValue();
for (KerberosKeytabPrincipalEntity kkpe : kerberosKeytabPrincipalDAO.findByHostAndKeytab(host.getHostId(), keytabPath)) {
kkpe.setDistributed(true);
kerberosKeytabPrincipalDAO.merge(kkpe);
}
}
}
}
} else if (CHECK_KEYTABS.equalsIgnoreCase(customCommand)) {
ListKeytabsStructuredOut structuredOut = gson.fromJson(report.getStructuredOut(), ListKeytabsStructuredOut.class);
for (MissingKeytab each : structuredOut.missingKeytabs) {
LOG.info("Missing principal: {} for keytab: {} on host: {}", each.principal, each.keytabFilePath, hostName);
KerberosKeytabPrincipalEntity kkpe = kerberosKeytabPrincipalDAO.findByHostKeytabAndPrincipal(host.getHostId(), each.keytabFilePath, each.principal);
kkpe.setDistributed(false);
kerberosKeytabPrincipalDAO.merge(kkpe);
}
}
}
//pass custom START, STOP and RESTART
if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
(RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
!("RESTART".equals(report.getCustomCommand()) ||
"START".equals(report.getCustomCommand()) ||
"STOP".equals(report.getCustomCommand())))) {
continue;
}
Cluster cl = clusterFsm.getCluster(Long.parseLong(report.getClusterId()));
String service = report.getServiceName();
if (service == null || service.isEmpty()) {
throw new AmbariException("Invalid command report, service: " + service);
}
if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
LOG.debug("{} is an action - skip component lookup", report.getRole());
} else {
try {
Service svc = cl.getService(service);
ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostName);
String schName = scHost.getServiceComponentName();
if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {
// Reading component version if it is present
if (StringUtils.isNotBlank(report.getStructuredOut())
&& !StringUtils.equals("{}", report.getStructuredOut())) {
ComponentVersionStructuredOut structuredOutput = null;
try {
structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
} catch (JsonSyntaxException ex) {
//Json structure for component version was incorrect
//do nothing, pass this data further for processing
}
String newVersion = structuredOutput == null ? null : structuredOutput.version;
Long repoVersionId = structuredOutput == null ? null : structuredOutput.repositoryVersionId;
HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(
cl, scHost, newVersion, repoVersionId);
versionEventPublisher.publish(event);
}
if (!scHost.getState().equals(org.apache.ambari.server.state.State.UPGRADING) &&
(report.getRoleCommand().equals(RoleCommand.START.toString()) ||
(report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
("START".equals(report.getCustomCommand()) ||
"RESTART".equals(report.getCustomCommand()))))) {
scHost.setRestartRequired(false);
}
// Necessary for resetting clients stale configs after starting service
if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) ||
(RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
"INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){
scHost.setRestartRequired(false);
}
if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
!("START".equals(report.getCustomCommand()) ||
"STOP".equals(report.getCustomCommand()))) {
//do not affect states for custom commands except START and STOP
//lets status commands to be responsible for this
continue;
}
if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
(RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
"START".equals(report.getCustomCommand()))) {
scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
hostName, now));
scHost.setRestartRequired(false);
} else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
(RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
"STOP".equals(report.getCustomCommand()))) {
scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
hostName, now));
} else {
scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
hostName, now));
}
} else if (report.getStatus().equals("FAILED")) {
if (StringUtils.isNotBlank(report.getStructuredOut())) {
try {
ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
if (null != structuredOutput.upgradeDirection) {
scHost.setUpgradeState(UpgradeState.FAILED);
}
} catch (JsonSyntaxException ex) {
LOG.warn("Structured output was found, but not parseable: {}", report.getStructuredOut());
}
}
LOG.error("Operation failed - may be retried. Service component host: "
+ schName + ", host: " + hostName + " Action id " + report.getActionId() + " and taskId " + report.getTaskId());
if (actionManager.isInProgressCommand(report)) {
scHost.handleEvent(new ServiceComponentHostOpFailedEvent
(schName, hostName, now));
} else {
LOG.info("Received report for a command that is no longer active. " + report);
}
} else if (report.getStatus().equals("IN_PROGRESS")) {
scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
hostName, now));
}
} catch (ServiceComponentNotFoundException scnex) {
LOG.warn("Service component not found ", scnex);
} catch (InvalidStateTransitionException ex) {
if (LOG.isDebugEnabled()) {
LOG.warn("State machine exception.", ex);
} else {
LOG.warn("State machine exception. " + ex.getMessage());
}
}
}
}
//Update state machines from reports
actionManager.processTaskResponse(hostName, reports, commands);
}