protected void readEventsFromSource()

in tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java [216:321]


  protected void readEventsFromSource(String dagId, JSONObjectSource source,
      Map<String, JSONObject> vertexJsonMap, Map<String, JSONObject> taskJsonMap,
      Map<String, JSONObject> attemptJsonMap) throws JSONException, TezException, IOException{
    JSONObject dagJson = null;
    TezDAGID tezDAGID = TezDAGID.fromString(dagId);
    String userName = null;

    while (source.hasNext()) {
      JSONObject jsonObject = source.next();

      String entity = jsonObject.getString(Constants.ENTITY);
      String entityType = jsonObject.getString(Constants.ENTITY_TYPE);
      switch (entityType) {
      case Constants.TEZ_DAG_ID:
        if (!dagId.equals(entity)) {
          LOG.warn(dagId + " is not matching with " + entity);
          continue;
        }
        // Club all DAG related information together (DAG_INIT, DAG_FINISH etc). Each of them
        // would have a set of entities in otherinfo (e.g vertex mapping, dagPlan, start/finish
        // time etc).
        if (dagJson == null) {
          dagJson = jsonObject;
        } else {
          if (dagJson.optJSONObject(ATSConstants.OTHER_INFO).optJSONObject(ATSConstants.DAG_PLAN) == null) {
            // if DAG_PLAN is not filled already, let's try to fetch it from other
            dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN,
                jsonObject.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
          }
          mergeSubJSONArray(jsonObject, dagJson, Constants.EVENTS);
        }
        JSONArray relatedEntities = dagJson.optJSONArray(Constants
            .RELATED_ENTITIES);
        //UserName is present in related entities
        // {"entity":"userXYZ","entitytype":"user"}
        if (relatedEntities != null) {
          for (int i = 0; i < relatedEntities.length(); i++) {
            JSONObject subEntity = relatedEntities.getJSONObject(i);
            String subEntityType = subEntity.optString(Constants.ENTITY_TYPE);
            if (subEntityType != null && subEntityType.equals(Constants.USER)) {
              userName = subEntity.getString(Constants.ENTITY);
              break;
            }
          }
        }
        populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO),
            dagJson.getJSONObject(Constants.OTHER_INFO));
        break;
      case Constants.TEZ_VERTEX_ID:
        String vertexName = entity;
        TezVertexID tezVertexID = TezVertexID.fromString(vertexName);
        if (!tezDAGID.equals(tezVertexID.getDAGID())) {
          LOG.warn("{} does not belong to {} ('{}' != '{}')}", vertexName, tezDAGID, tezDAGID, tezVertexID.getDAGID());
          continue;
        }
        if (!vertexJsonMap.containsKey(vertexName)) {
          vertexJsonMap.put(vertexName, jsonObject);
        } else {
          mergeSubJSONArray(jsonObject, vertexJsonMap.get(vertexName), Constants.EVENTS);
        }
        populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), vertexName, vertexJsonMap);
        break;
      case Constants.TEZ_TASK_ID:
        String taskName = entity;
        TezTaskID tezTaskID = TezTaskID.fromString(taskName);
        if (!tezDAGID.equals(tezTaskID.getDAGID())) {
          LOG.warn("{} does not belong to {} ('{}' != '{}')}", taskName, tezDAGID, tezDAGID,
              tezTaskID.getDAGID());
          continue;
        }
        if (!taskJsonMap.containsKey(taskName)) {
          taskJsonMap.put(taskName, jsonObject);
        } else {
          mergeSubJSONArray(jsonObject, taskJsonMap.get(taskName), Constants.EVENTS);
        }
        populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskName, taskJsonMap);
        break;
      case Constants.TEZ_TASK_ATTEMPT_ID:
        String taskAttemptName = entity;
        TezTaskAttemptID tezAttemptId = TezTaskAttemptID.fromString(taskAttemptName);
        if (!tezDAGID.equals(tezAttemptId.getDAGID())) {
          LOG.warn("{} does not belong to {} ('{}' != '{}')}", taskAttemptName, tezDAGID, tezDAGID,
              tezAttemptId.getDAGID());
          continue;
        }
        if (!attemptJsonMap.containsKey(taskAttemptName)) {
          attemptJsonMap.put(taskAttemptName, jsonObject);
        } else {
          mergeSubJSONArray(jsonObject, attemptJsonMap.get(taskAttemptName), Constants.EVENTS);
        }
        populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskAttemptName, attemptJsonMap);
        break;
      default:
        break;
      }
    }
    source.close();
    if (dagJson != null) {
      this.dagInfo = DagInfo.create(dagJson);
      setUserName(userName);
    } else {
      LOG.error("Dag is not yet parsed. Looks like partial file.");
      throw new TezException(
          "Please provide a valid/complete history log file containing " + dagId);
    }
  }