private void processEventForTimelineServer()

in hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java [830:1148]


  private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
          long timestamp) {
    TimelineEvent tEvent = new TimelineEvent();
    tEvent.setEventType(StringUtils.toUpperCase(event.getEventType().name()));
    tEvent.setTimestamp(timestamp);
    TimelineEntity tEntity = new TimelineEntity();

    switch (event.getEventType()) {
      case JOB_SUBMITTED:
        JobSubmittedEvent jse =
            (JobSubmittedEvent) event;
        tEvent.addEventInfo("SUBMIT_TIME", jse.getSubmitTime());
        tEvent.addEventInfo("QUEUE_NAME", jse.getJobQueueName());
        tEvent.addEventInfo("JOB_NAME", jse.getJobName());
        tEvent.addEventInfo("USER_NAME", jse.getUserName());
        tEvent.addEventInfo("JOB_CONF_PATH", jse.getJobConfPath());
        tEvent.addEventInfo("ACLS", jse.getJobAcls());
        tEvent.addEventInfo("JOB_QUEUE_NAME", jse.getJobQueueName());
        tEvent.addEventInfo("WORKFLOW_ID", jse.getWorkflowId());
        tEvent.addEventInfo("WORKFLOW_NAME", jse.getWorkflowName());
        tEvent.addEventInfo("WORKFLOW_NAME_NAME", jse.getWorkflowNodeName());
        tEvent.addEventInfo("WORKFLOW_ADJACENCIES",
                jse.getWorkflowAdjacencies());
        tEvent.addEventInfo("WORKFLOW_TAGS", jse.getWorkflowTags());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(jobId.toString());
        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
        break;
      case JOB_STATUS_CHANGED:
        JobStatusChangedEvent jsce = (JobStatusChangedEvent) event;
        tEvent.addEventInfo("STATUS", jsce.getStatus());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(jobId.toString());
        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
        break;
      case JOB_INFO_CHANGED:
        JobInfoChangeEvent jice = (JobInfoChangeEvent) event;
        tEvent.addEventInfo("SUBMIT_TIME", jice.getSubmitTime());
        tEvent.addEventInfo("LAUNCH_TIME", jice.getLaunchTime());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(jobId.toString());
        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
        break;
      case JOB_INITED:
        JobInitedEvent jie = (JobInitedEvent) event;
        tEvent.addEventInfo("START_TIME", jie.getLaunchTime());
        tEvent.addEventInfo("STATUS", jie.getStatus());
        tEvent.addEventInfo("TOTAL_MAPS", jie.getTotalMaps());
        tEvent.addEventInfo("TOTAL_REDUCES", jie.getTotalReduces());
        tEvent.addEventInfo("UBERIZED", jie.getUberized());
        tEntity.setStartTime(jie.getLaunchTime());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(jobId.toString());
        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
        break;
      case JOB_PRIORITY_CHANGED:
        JobPriorityChangeEvent jpce = (JobPriorityChangeEvent) event;
        tEvent.addEventInfo("PRIORITY", jpce.getPriority().toString());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(jobId.toString());
        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
        break;
      case JOB_QUEUE_CHANGED:
        JobQueueChangeEvent jqe = (JobQueueChangeEvent) event;
        tEvent.addEventInfo("QUEUE_NAMES", jqe.getJobQueueName());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(jobId.toString());
        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
        break;
      case JOB_FAILED:
      case JOB_KILLED:
      case JOB_ERROR:
        JobUnsuccessfulCompletionEvent juce =
              (JobUnsuccessfulCompletionEvent) event;
        tEvent.addEventInfo("FINISH_TIME", juce.getFinishTime());
        tEvent.addEventInfo("NUM_MAPS",
            juce.getSucceededMaps() +
            juce.getFailedMaps() +
            juce.getKilledMaps());
        tEvent.addEventInfo("NUM_REDUCES",
            juce.getSucceededReduces() +
            juce.getFailedReduces() +
            juce.getKilledReduces());
        tEvent.addEventInfo("JOB_STATUS", juce.getStatus());
        tEvent.addEventInfo("DIAGNOSTICS", juce.getDiagnostics());
        tEvent.addEventInfo("SUCCESSFUL_MAPS", juce.getSucceededMaps());
        tEvent.addEventInfo("SUCCESSFUL_REDUCES", juce.getSucceededReduces());
        tEvent.addEventInfo("FAILED_MAPS", juce.getFailedMaps());
        tEvent.addEventInfo("FAILED_REDUCES", juce.getFailedReduces());
        tEvent.addEventInfo("KILLED_MAPS", juce.getKilledMaps());
        tEvent.addEventInfo("KILLED_REDUCES", juce.getKilledReduces());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(jobId.toString());
        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
        break;
      case JOB_FINISHED:
        JobFinishedEvent jfe = (JobFinishedEvent) event;
        tEvent.addEventInfo("FINISH_TIME", jfe.getFinishTime());
        tEvent.addEventInfo("NUM_MAPS",
            jfe.getSucceededMaps() +
            jfe.getFailedMaps() +
            jfe.getKilledMaps());
        tEvent.addEventInfo("NUM_REDUCES",
            jfe.getSucceededReduces() +
            jfe.getFailedReduces() +
            jfe.getKilledReduces());
        tEvent.addEventInfo("FAILED_MAPS", jfe.getFailedMaps());
        tEvent.addEventInfo("FAILED_REDUCES", jfe.getFailedReduces());
        tEvent.addEventInfo("SUCCESSFUL_MAPS", jfe.getSucceededMaps());
        tEvent.addEventInfo("SUCCESSFUL_REDUCES", jfe.getSucceededReduces());
        tEvent.addEventInfo("KILLED_MAPS", jfe.getKilledMaps());
        tEvent.addEventInfo("KILLED_REDUCES", jfe.getKilledReduces());
        tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
            JobHistoryEventUtils.countersToJSON(jfe.getMapCounters()));
        tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
            JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters()));
        tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
            JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters()));
        tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(jobId.toString());
        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
        break;
      case TASK_STARTED:
        TaskStartedEvent tse = (TaskStartedEvent) event;
        tEvent.addEventInfo("TASK_TYPE", tse.getTaskType().toString());
        tEvent.addEventInfo("START_TIME", tse.getStartTime());
        tEvent.addEventInfo("SPLIT_LOCATIONS", tse.getSplitLocations());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(tse.getTaskId().toString());
        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
        break;
      case TASK_FAILED:
        TaskFailedEvent tfe = (TaskFailedEvent) event;
        tEvent.addEventInfo("TASK_TYPE", tfe.getTaskType().toString());
        tEvent.addEventInfo("STATUS", TaskStatus.State.FAILED.toString());
        tEvent.addEventInfo("FINISH_TIME", tfe.getFinishTime());
        tEvent.addEventInfo("ERROR", tfe.getError());
        tEvent.addEventInfo("FAILED_ATTEMPT_ID",
                tfe.getFailedAttemptID() == null ?
                "" : tfe.getFailedAttemptID().toString());
        tEvent.addEventInfo("COUNTERS_GROUPS",
            JobHistoryEventUtils.countersToJSON(tfe.getCounters()));
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(tfe.getTaskId().toString());
        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
        break;
      case TASK_UPDATED:
        TaskUpdatedEvent tue = (TaskUpdatedEvent) event;
        tEvent.addEventInfo("FINISH_TIME", tue.getFinishTime());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(tue.getTaskId().toString());
        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
        break;
      case TASK_FINISHED:
        TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
        tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
        tEvent.addEventInfo("COUNTERS_GROUPS",
            JobHistoryEventUtils.countersToJSON(tfe2.getCounters()));
        tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
        tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
        tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
            tfe2.getSuccessfulTaskAttemptId() == null ?
            "" : tfe2.getSuccessfulTaskAttemptId().toString());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(tfe2.getTaskId().toString());
        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
        break;
      case MAP_ATTEMPT_STARTED:
      case CLEANUP_ATTEMPT_STARTED:
      case REDUCE_ATTEMPT_STARTED:
      case SETUP_ATTEMPT_STARTED:
        TaskAttemptStartedEvent tase = (TaskAttemptStartedEvent) event;
        tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
        tEvent.addEventInfo("TASK_ATTEMPT_ID",
            tase.getTaskAttemptId().toString());
        tEvent.addEventInfo("START_TIME", tase.getStartTime());
        tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
        tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
        tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
        tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
            "" : tase.getContainerId().toString());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(tase.getTaskId().toString());
        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
        break;
      case MAP_ATTEMPT_FAILED:
      case CLEANUP_ATTEMPT_FAILED:
      case REDUCE_ATTEMPT_FAILED:
      case SETUP_ATTEMPT_FAILED:
      case MAP_ATTEMPT_KILLED:
      case CLEANUP_ATTEMPT_KILLED:
      case REDUCE_ATTEMPT_KILLED:
      case SETUP_ATTEMPT_KILLED:
        TaskAttemptUnsuccessfulCompletionEvent tauce =
                (TaskAttemptUnsuccessfulCompletionEvent) event;
        tEvent.addEventInfo("TASK_TYPE", tauce.getTaskType().toString());
        tEvent.addEventInfo("TASK_ATTEMPT_ID",
            tauce.getTaskAttemptId() == null ?
            "" : tauce.getTaskAttemptId().toString());
        tEvent.addEventInfo("FINISH_TIME", tauce.getFinishTime());
        tEvent.addEventInfo("ERROR", tauce.getError());
        tEvent.addEventInfo("STATUS", tauce.getTaskStatus());
        tEvent.addEventInfo("HOSTNAME", tauce.getHostname());
        tEvent.addEventInfo("PORT", tauce.getPort());
        tEvent.addEventInfo("RACK_NAME", tauce.getRackName());
        tEvent.addEventInfo("SHUFFLE_FINISH_TIME", tauce.getFinishTime());
        tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
        tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
        tEvent.addEventInfo("COUNTERS_GROUPS",
            JobHistoryEventUtils.countersToJSON(tauce.getCounters()));
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(tauce.getTaskId().toString());
        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
        break;
      case MAP_ATTEMPT_FINISHED:
        MapAttemptFinishedEvent mafe = (MapAttemptFinishedEvent) event;
        tEvent.addEventInfo("TASK_TYPE", mafe.getTaskType().toString());
        tEvent.addEventInfo("FINISH_TIME", mafe.getFinishTime());
        tEvent.addEventInfo("STATUS", mafe.getTaskStatus());
        tEvent.addEventInfo("STATE", mafe.getState());
        tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
        tEvent.addEventInfo("COUNTERS_GROUPS",
            JobHistoryEventUtils.countersToJSON(mafe.getCounters()));
        tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
        tEvent.addEventInfo("PORT", mafe.getPort());
        tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
        tEvent.addEventInfo("ATTEMPT_ID", mafe.getAttemptId() == null ?
            "" : mafe.getAttemptId().toString());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(mafe.getTaskId().toString());
        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
        break;
      case REDUCE_ATTEMPT_FINISHED:
        ReduceAttemptFinishedEvent rafe = (ReduceAttemptFinishedEvent) event;
        tEvent.addEventInfo("TASK_TYPE", rafe.getTaskType().toString());
        tEvent.addEventInfo("ATTEMPT_ID", rafe.getAttemptId() == null ?
            "" : rafe.getAttemptId().toString());
        tEvent.addEventInfo("FINISH_TIME", rafe.getFinishTime());
        tEvent.addEventInfo("STATUS", rafe.getTaskStatus());
        tEvent.addEventInfo("STATE", rafe.getState());
        tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
        tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
        tEvent.addEventInfo("COUNTERS_GROUPS",
            JobHistoryEventUtils.countersToJSON(rafe.getCounters()));
        tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
        tEvent.addEventInfo("PORT", rafe.getPort());
        tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(rafe.getTaskId().toString());
        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
        break;
      case SETUP_ATTEMPT_FINISHED:
      case CLEANUP_ATTEMPT_FINISHED:
        TaskAttemptFinishedEvent tafe = (TaskAttemptFinishedEvent) event;
        tEvent.addEventInfo("TASK_TYPE", tafe.getTaskType().toString());
        tEvent.addEventInfo("ATTEMPT_ID", tafe.getAttemptId() == null ?
            "" : tafe.getAttemptId().toString());
        tEvent.addEventInfo("FINISH_TIME", tafe.getFinishTime());
        tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
        tEvent.addEventInfo("STATE", tafe.getState());
        tEvent.addEventInfo("COUNTERS_GROUPS",
            JobHistoryEventUtils.countersToJSON(tafe.getCounters()));
        tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(tafe.getTaskId().toString());
        tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
        tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
        break;
      case AM_STARTED:
        AMStartedEvent ase = (AMStartedEvent) event;
        tEvent.addEventInfo("APPLICATION_ATTEMPT_ID",
                ase.getAppAttemptId() == null ?
                "" : ase.getAppAttemptId().toString());
        tEvent.addEventInfo("CONTAINER_ID", ase.getContainerId() == null ?
                "" : ase.getContainerId().toString());
        tEvent.addEventInfo("NODE_MANAGER_HOST", ase.getNodeManagerHost());
        tEvent.addEventInfo("NODE_MANAGER_PORT", ase.getNodeManagerPort());
        tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT",
                ase.getNodeManagerHttpPort());
        tEvent.addEventInfo("START_TIME", ase.getStartTime());
        tEvent.addEventInfo("SUBMIT_TIME", ase.getSubmitTime());
        tEntity.addEvent(tEvent);
        tEntity.setEntityId(jobId.toString());
        tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
        break;
      default:
        break;
    }

    try {
      TimelinePutResponse response = timelineClient.putEntities(tEntity);
      List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
      if (errors.size() == 0) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Timeline entities are successfully put in event " + event
              .getEventType());
        }
      } else {
        for (TimelinePutResponse.TimelinePutError error : errors) {
          LOG.error(
              "Error when publishing entity [" + error.getEntityType() + ","
                  + error.getEntityId() + "], server side error code: "
                  + error.getErrorCode());
        }
      }
    } catch (YarnException | IOException | ProcessingException ex) {
      LOG.error("Error putting entity {} to Timeline Server",
          tEntity.getEntityId(), ex);
    }
  }