in tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java [216:321]
protected void readEventsFromSource(String dagId, JSONObjectSource source,
Map<String, JSONObject> vertexJsonMap, Map<String, JSONObject> taskJsonMap,
Map<String, JSONObject> attemptJsonMap) throws JSONException, TezException, IOException{
JSONObject dagJson = null;
TezDAGID tezDAGID = TezDAGID.fromString(dagId);
String userName = null;
while (source.hasNext()) {
JSONObject jsonObject = source.next();
String entity = jsonObject.getString(Constants.ENTITY);
String entityType = jsonObject.getString(Constants.ENTITY_TYPE);
switch (entityType) {
case Constants.TEZ_DAG_ID:
if (!dagId.equals(entity)) {
LOG.warn(dagId + " is not matching with " + entity);
continue;
}
// Club all DAG related information together (DAG_INIT, DAG_FINISH etc). Each of them
// would have a set of entities in otherinfo (e.g vertex mapping, dagPlan, start/finish
// time etc).
if (dagJson == null) {
dagJson = jsonObject;
} else {
if (dagJson.optJSONObject(ATSConstants.OTHER_INFO).optJSONObject(ATSConstants.DAG_PLAN) == null) {
// if DAG_PLAN is not filled already, let's try to fetch it from other
dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN,
jsonObject.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
}
mergeSubJSONArray(jsonObject, dagJson, Constants.EVENTS);
}
JSONArray relatedEntities = dagJson.optJSONArray(Constants
.RELATED_ENTITIES);
//UserName is present in related entities
// {"entity":"userXYZ","entitytype":"user"}
if (relatedEntities != null) {
for (int i = 0; i < relatedEntities.length(); i++) {
JSONObject subEntity = relatedEntities.getJSONObject(i);
String subEntityType = subEntity.optString(Constants.ENTITY_TYPE);
if (subEntityType != null && subEntityType.equals(Constants.USER)) {
userName = subEntity.getString(Constants.ENTITY);
break;
}
}
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO),
dagJson.getJSONObject(Constants.OTHER_INFO));
break;
case Constants.TEZ_VERTEX_ID:
String vertexName = entity;
TezVertexID tezVertexID = TezVertexID.fromString(vertexName);
if (!tezDAGID.equals(tezVertexID.getDAGID())) {
LOG.warn("{} does not belong to {} ('{}' != '{}')}", vertexName, tezDAGID, tezDAGID, tezVertexID.getDAGID());
continue;
}
if (!vertexJsonMap.containsKey(vertexName)) {
vertexJsonMap.put(vertexName, jsonObject);
} else {
mergeSubJSONArray(jsonObject, vertexJsonMap.get(vertexName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), vertexName, vertexJsonMap);
break;
case Constants.TEZ_TASK_ID:
String taskName = entity;
TezTaskID tezTaskID = TezTaskID.fromString(taskName);
if (!tezDAGID.equals(tezTaskID.getDAGID())) {
LOG.warn("{} does not belong to {} ('{}' != '{}')}", taskName, tezDAGID, tezDAGID,
tezTaskID.getDAGID());
continue;
}
if (!taskJsonMap.containsKey(taskName)) {
taskJsonMap.put(taskName, jsonObject);
} else {
mergeSubJSONArray(jsonObject, taskJsonMap.get(taskName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskName, taskJsonMap);
break;
case Constants.TEZ_TASK_ATTEMPT_ID:
String taskAttemptName = entity;
TezTaskAttemptID tezAttemptId = TezTaskAttemptID.fromString(taskAttemptName);
if (!tezDAGID.equals(tezAttemptId.getDAGID())) {
LOG.warn("{} does not belong to {} ('{}' != '{}')}", taskAttemptName, tezDAGID, tezDAGID,
tezAttemptId.getDAGID());
continue;
}
if (!attemptJsonMap.containsKey(taskAttemptName)) {
attemptJsonMap.put(taskAttemptName, jsonObject);
} else {
mergeSubJSONArray(jsonObject, attemptJsonMap.get(taskAttemptName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskAttemptName, attemptJsonMap);
break;
default:
break;
}
}
source.close();
if (dagJson != null) {
this.dagInfo = DagInfo.create(dagJson);
setUserName(userName);
} else {
LOG.error("Dag is not yet parsed. Looks like partial file.");
throw new TezException(
"Please provide a valid/complete history log file containing " + dagId);
}
}