protected void processCommandReports()

in ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java [324:539]


  protected void processCommandReports(List<CommandReport> reports, String hostName, Long now)
      throws AmbariException {

    // Cache HostRoleCommand entities because we will need them few times
    List<Long> taskIds = new ArrayList<>();
    for (CommandReport report : reports) {
      taskIds.add(report.getTaskId());
    }
    Map<Long, HostRoleCommand> commands = actionManager.getTasksMap(taskIds);

    for (CommandReport report : reports) {

      Long clusterId = Long.parseLong(report.getClusterId());

      LOG.debug("Received command report: {}", report);

      // get this locally; don't touch the database
      Host host = clusterFsm.getHost(hostName);
      if (host == null) {
        LOG.error("Received a command report and was unable to retrieve Host for hostname = " + hostName);
        continue;
      }

      // Send event for final command reports for actions
      if (RoleCommand.valueOf(report.getRoleCommand()) == RoleCommand.ACTIONEXECUTE &&
          HostRoleStatus.valueOf(report.getStatus()).isCompletedState()) {
        ActionFinalReportReceivedEvent event = new ActionFinalReportReceivedEvent(
            clusterId, hostName, report, false);
        ambariEventPublisher.publish(event);
      }

      // Fetch HostRoleCommand that corresponds to a given task ID
      HostRoleCommand hostRoleCommand = commands.get(report.getTaskId());
      if (hostRoleCommand == null) {
        LOG.warn("Can't fetch HostRoleCommand with taskId = " + report.getTaskId());
      } else {
        // Skip sending events for command reports for ABORTed commands
        if (hostRoleCommand.getStatus() == HostRoleStatus.ABORTED) {
          continue;
        }
        if (hostRoleCommand.getStatus() == HostRoleStatus.QUEUED &&
            report.getStatus().equals("IN_PROGRESS")) {
          hostRoleCommand.setStartTime(now);

          // Because the task may be retried several times, set the original start time only once.
          if (hostRoleCommand.getOriginalStartTime() == -1) {
            hostRoleCommand.setOriginalStartTime(now);
          }
        }
      }

      // If the report indicates the keytab file was successfully transferred to a host or removed
      // from a host, record this for future reference
      if (Service.Type.KERBEROS.name().equalsIgnoreCase(report.getServiceName()) &&
          Role.KERBEROS_CLIENT.name().equalsIgnoreCase(report.getRole()) &&
          RoleCommand.CUSTOM_COMMAND.name().equalsIgnoreCase(report.getRoleCommand()) &&
          RequestExecution.Status.COMPLETED.name().equalsIgnoreCase(report.getStatus())) {

        String customCommand = report.getCustomCommand();

        if (SET_KEYTAB.equalsIgnoreCase(customCommand)) {
          WriteKeytabsStructuredOut writeKeytabsStructuredOut;
          try {
            writeKeytabsStructuredOut = gson.fromJson(report.getStructuredOut(), WriteKeytabsStructuredOut.class);
          } catch (JsonSyntaxException ex) {
            //Json structure was incorrect do nothing, pass this data further for processing
            writeKeytabsStructuredOut = null;
          }

          if (writeKeytabsStructuredOut != null) {
            Map<String, String> keytabs = writeKeytabsStructuredOut.getKeytabs();
            if (keytabs != null) {
              for (Map.Entry<String, String> entry : keytabs.entrySet()) {
                String keytabPath = entry.getValue();
                for (KerberosKeytabPrincipalEntity kkpe : kerberosKeytabPrincipalDAO.findByHostAndKeytab(host.getHostId(), keytabPath)) {
                  kkpe.setDistributed(true);
                  kerberosKeytabPrincipalDAO.merge(kkpe);
                }
              }
            }
          }
        } else if (CHECK_KEYTABS.equalsIgnoreCase(customCommand)) {
          ListKeytabsStructuredOut structuredOut = gson.fromJson(report.getStructuredOut(), ListKeytabsStructuredOut.class);
          for (MissingKeytab each : structuredOut.missingKeytabs) {
            LOG.info("Missing principal: {} for keytab: {} on host: {}", each.principal, each.keytabFilePath, hostName);
            KerberosKeytabPrincipalEntity kkpe = kerberosKeytabPrincipalDAO.findByHostKeytabAndPrincipal(host.getHostId(), each.keytabFilePath, each.principal);
            kkpe.setDistributed(false);
            kerberosKeytabPrincipalDAO.merge(kkpe);
          }
        }
      }

      //pass custom START, STOP and RESTART
      if (RoleCommand.ACTIONEXECUTE.toString().equals(report.getRoleCommand()) ||
          (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
              !("RESTART".equals(report.getCustomCommand()) ||
                  "START".equals(report.getCustomCommand()) ||
                  "STOP".equals(report.getCustomCommand())))) {
        continue;
      }

      Cluster cl = clusterFsm.getCluster(Long.parseLong(report.getClusterId()));
      String service = report.getServiceName();
      if (service == null || service.isEmpty()) {
        throw new AmbariException("Invalid command report, service: " + service);
      }
      if (actionMetadata.getActions(service.toLowerCase()).contains(report.getRole())) {
        LOG.debug("{} is an action - skip component lookup", report.getRole());
      } else {
        try {
          Service svc = cl.getService(service);
          ServiceComponent svcComp = svc.getServiceComponent(report.getRole());
          ServiceComponentHost scHost = svcComp.getServiceComponentHost(hostName);
          String schName = scHost.getServiceComponentName();

          if (report.getStatus().equals(HostRoleStatus.COMPLETED.toString())) {

            // Reading component version if it is present
            if (StringUtils.isNotBlank(report.getStructuredOut())
                && !StringUtils.equals("{}", report.getStructuredOut())) {
              ComponentVersionStructuredOut structuredOutput = null;
              try {
                structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);
              } catch (JsonSyntaxException ex) {
                //Json structure for component version was incorrect
                //do nothing, pass this data further for processing
              }

              String newVersion = structuredOutput == null ? null : structuredOutput.version;
              Long repoVersionId = structuredOutput == null ? null : structuredOutput.repositoryVersionId;

              HostComponentVersionAdvertisedEvent event = new HostComponentVersionAdvertisedEvent(
                  cl, scHost, newVersion, repoVersionId);

              versionEventPublisher.publish(event);
            }

            if (!scHost.getState().equals(org.apache.ambari.server.state.State.UPGRADING) &&
                (report.getRoleCommand().equals(RoleCommand.START.toString()) ||
                (report.getRoleCommand().equals(RoleCommand.CUSTOM_COMMAND.toString()) &&
                    ("START".equals(report.getCustomCommand()) ||
                        "RESTART".equals(report.getCustomCommand()))))) {
              scHost.setRestartRequired(false);
            }

            // Necessary for resetting clients stale configs after starting service
            if ((RoleCommand.INSTALL.toString().equals(report.getRoleCommand()) ||
                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
                    "INSTALL".equals(report.getCustomCommand()))) && svcComp.isClientComponent()){
              scHost.setRestartRequired(false);
            }

            if (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
                !("START".equals(report.getCustomCommand()) ||
                    "STOP".equals(report.getCustomCommand()))) {
              //do not affect states for custom commands except START and STOP
              //lets status commands to be responsible for this
              continue;
            }

            if (RoleCommand.START.toString().equals(report.getRoleCommand()) ||
                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
                    "START".equals(report.getCustomCommand()))) {
              scHost.handleEvent(new ServiceComponentHostStartedEvent(schName,
                  hostName, now));
              scHost.setRestartRequired(false);
            } else if (RoleCommand.STOP.toString().equals(report.getRoleCommand()) ||
                (RoleCommand.CUSTOM_COMMAND.toString().equals(report.getRoleCommand()) &&
                    "STOP".equals(report.getCustomCommand()))) {
              scHost.handleEvent(new ServiceComponentHostStoppedEvent(schName,
                  hostName, now));
            } else {
              scHost.handleEvent(new ServiceComponentHostOpSucceededEvent(schName,
                  hostName, now));
            }
          } else if (report.getStatus().equals("FAILED")) {

            if (StringUtils.isNotBlank(report.getStructuredOut())) {
              try {
                ComponentVersionStructuredOut structuredOutput = gson.fromJson(report.getStructuredOut(), ComponentVersionStructuredOut.class);

                if (null != structuredOutput.upgradeDirection) {
                  scHost.setUpgradeState(UpgradeState.FAILED);
                }
              } catch (JsonSyntaxException ex) {
                LOG.warn("Structured output was found, but not parseable: {}", report.getStructuredOut());
              }
            }

            LOG.error("Operation failed - may be retried. Service component host: "
                + schName + ", host: " + hostName + " Action id " + report.getActionId() + " and taskId " + report.getTaskId());
            if (actionManager.isInProgressCommand(report)) {
              scHost.handleEvent(new ServiceComponentHostOpFailedEvent
                  (schName, hostName, now));
            } else {
              LOG.info("Received report for a command that is no longer active. " + report);
            }
          } else if (report.getStatus().equals("IN_PROGRESS")) {
            scHost.handleEvent(new ServiceComponentHostOpInProgressEvent(schName,
                hostName, now));
          }
        } catch (ServiceComponentNotFoundException scnex) {
          LOG.warn("Service component not found ", scnex);
        } catch (InvalidStateTransitionException ex) {
          if (LOG.isDebugEnabled()) {
            LOG.warn("State machine exception.", ex);
          } else {
            LOG.warn("State machine exception. " + ex.getMessage());
          }
        }
      }
    }

    //Update state machines from reports
    actionManager.processTaskResponse(hostName, reports, commands);
  }