in tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java [218:364]
private static Map<String, Object> convertDAGPlanToATSMap(DAGPlan dagPlan,
final Inflater inflater) {
final String VERSION_KEY = "version";
final int version = 2;
Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
dagMap.put(DAG_NAME_KEY, dagPlan.getName());
if (dagPlan.hasDagInfo()) {
dagMap.put(DAG_INFO_KEY, dagPlan.getDagInfo());
}
if (dagPlan.hasCallerContext()) {
dagMap.put(DAG_CONTEXT_KEY, createDagInfoMap(dagPlan));
}
dagMap.put(VERSION_KEY, version);
ArrayList<Object> verticesList = new ArrayList<Object>();
for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
if (vertexPlan.hasProcessorDescriptor()) {
vertexMap.put(PROCESSOR_CLASS_KEY,
vertexPlan.getProcessorDescriptor().getClassName());
if (vertexPlan.getProcessorDescriptor().hasHistoryText()) {
vertexMap.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
vertexPlan.getProcessorDescriptor(), inflater));
}
}
ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
inEdgeIdList.addAll(vertexPlan.getInEdgeIdList());
putInto(vertexMap, IN_EDGE_IDS_KEY, inEdgeIdList);
ArrayList<Object> outEdgeIdList = new ArrayList<Object>();
outEdgeIdList.addAll(vertexPlan.getOutEdgeIdList());
putInto(vertexMap, OUT_EDGE_IDS_KEY, outEdgeIdList);
ArrayList<Object> inputsList = new ArrayList<Object>();
for (DAGProtos.RootInputLeafOutputProto input :
vertexPlan.getInputsList()) {
Map<String,Object> inputMap = new LinkedHashMap<String, Object>();
inputMap.put(NAME_KEY, input.getName());
inputMap.put(CLASS_KEY, input.getIODescriptor().getClassName());
if (input.hasControllerDescriptor()) {
inputMap.put(INITIALIZER_KEY, input.getControllerDescriptor().getClassName());
}
if (input.getIODescriptor().hasHistoryText()) {
inputMap.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
input.getIODescriptor(), inflater));
}
inputsList.add(inputMap);
}
putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);
ArrayList<Object> outputsList = new ArrayList<Object>();
for (DAGProtos.RootInputLeafOutputProto output :
vertexPlan.getOutputsList()) {
Map<String,Object> outputMap = new LinkedHashMap<String, Object>();
outputMap.put(NAME_KEY, output.getName());
outputMap.put(CLASS_KEY, output.getIODescriptor().getClassName());
if (output.hasControllerDescriptor()) {
outputMap.put(INITIALIZER_KEY, output.getControllerDescriptor().getClassName());
}
if (output.getIODescriptor().hasHistoryText()) {
outputMap.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
output.getIODescriptor(), inflater));
}
outputsList.add(outputMap);
}
putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);
if (vertexPlan.hasVertexManagerPlugin()) {
vertexMap.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
vertexPlan.getVertexManagerPlugin().getClassName());
}
verticesList.add(vertexMap);
}
putInto(dagMap, VERTICES_KEY, verticesList);
ArrayList<Object> edgesList = new ArrayList<Object>();
for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
Map<String,Object> edgeMap = new LinkedHashMap<String, Object>();
edgeMap.put(EDGE_ID_KEY, edgePlan.getId());
edgeMap.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
edgeMap.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
edgeMap.put(DATA_MOVEMENT_TYPE_KEY,
edgePlan.getDataMovementType().name());
edgeMap.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
edgeMap.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
edgeMap.put(EDGE_SOURCE_CLASS_KEY,
edgePlan.getEdgeSource().getClassName());
edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
edgePlan.getEdgeDestination().getClassName());
if (edgePlan.getEdgeSource().hasHistoryText()) {
edgeMap.put(OUTPUT_USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
edgePlan.getEdgeSource(), inflater));
}
if (edgePlan.getEdgeDestination().hasHistoryText()) {
edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
edgePlan.getEdgeDestination(), inflater));
} // TEZ-2286 this is missing edgemanager descriptor for custom edge
edgesList.add(edgeMap);
}
putInto(dagMap, EDGES_KEY, edgesList);
ArrayList<Object> vertexGroupsList = new ArrayList<Object>();
for (DAGProtos.PlanVertexGroupInfo vertexGroupInfo :
dagPlan.getVertexGroupsList()) {
Map<String,Object> groupMap = new LinkedHashMap<String, Object>();
groupMap.put(VERTEX_GROUP_NAME_KEY, vertexGroupInfo.getGroupName());
if (vertexGroupInfo.getGroupMembersCount() > 0 ) {
groupMap.put(VERTEX_GROUP_MEMBERS_KEY, vertexGroupInfo.getGroupMembersList());
}
if (vertexGroupInfo.getOutputsCount() > 0) {
groupMap.put(VERTEX_GROUP_OUTPUTS_KEY, vertexGroupInfo.getOutputsList());
}
if (vertexGroupInfo.getEdgeMergedInputsCount() > 0) {
ArrayList<Object> edgeMergedInputs = new ArrayList<Object>();
for (PlanGroupInputEdgeInfo edgeMergedInputInfo :
vertexGroupInfo.getEdgeMergedInputsList()) {
Map<String,Object> edgeMergedInput = new LinkedHashMap<String, Object>();
edgeMergedInput.put(VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY,
edgeMergedInputInfo.getDestVertexName());
if (edgeMergedInputInfo.hasMergedInput()
&& edgeMergedInputInfo.getMergedInput().hasClassName()) {
edgeMergedInput.put(PROCESSOR_CLASS_KEY,
edgeMergedInputInfo.getMergedInput().getClassName());
if (edgeMergedInputInfo.getMergedInput().hasHistoryText()) {
edgeMergedInput.put(USER_PAYLOAD_AS_TEXT,
DagTypeConverters.getHistoryTextFromProto(
edgeMergedInputInfo.getMergedInput(), inflater));
}
}
edgeMergedInputs.add(edgeMergedInput);
}
groupMap.put(VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY, edgeMergedInputs);
}
vertexGroupsList.add(groupMap);
}
putInto(dagMap, VERTEX_GROUPS_KEY, vertexGroupsList);
return dagMap;
}