private static Map convertDAGPlanToATSMap()

in tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java [218:364]


  private static Map<String, Object> convertDAGPlanToATSMap(DAGPlan dagPlan,
      final Inflater inflater) {
    final String VERSION_KEY = "version";
    final int version = 2;
    Map<String,Object> dagMap = new LinkedHashMap<String, Object>();
    dagMap.put(DAG_NAME_KEY, dagPlan.getName());
    if (dagPlan.hasDagInfo()) {
      dagMap.put(DAG_INFO_KEY, dagPlan.getDagInfo());
    }
    if (dagPlan.hasCallerContext()) {
      dagMap.put(DAG_CONTEXT_KEY, createDagInfoMap(dagPlan));
    }
    dagMap.put(VERSION_KEY, version);
    ArrayList<Object> verticesList = new ArrayList<Object>();
    for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
      Map<String,Object> vertexMap = new LinkedHashMap<String, Object>();
      vertexMap.put(VERTEX_NAME_KEY, vertexPlan.getName());
      if (vertexPlan.hasProcessorDescriptor()) {
        vertexMap.put(PROCESSOR_CLASS_KEY,
            vertexPlan.getProcessorDescriptor().getClassName());
        if (vertexPlan.getProcessorDescriptor().hasHistoryText()) {
          vertexMap.put(USER_PAYLOAD_AS_TEXT,
              DagTypeConverters.getHistoryTextFromProto(
                  vertexPlan.getProcessorDescriptor(), inflater));
        }
      }

      ArrayList<Object> inEdgeIdList = new ArrayList<Object>();
      inEdgeIdList.addAll(vertexPlan.getInEdgeIdList());
      putInto(vertexMap, IN_EDGE_IDS_KEY, inEdgeIdList);

      ArrayList<Object> outEdgeIdList = new ArrayList<Object>();
      outEdgeIdList.addAll(vertexPlan.getOutEdgeIdList());
      putInto(vertexMap, OUT_EDGE_IDS_KEY, outEdgeIdList);

      ArrayList<Object> inputsList = new ArrayList<Object>();
      for (DAGProtos.RootInputLeafOutputProto input :
          vertexPlan.getInputsList()) {
        Map<String,Object> inputMap = new LinkedHashMap<String, Object>();
        inputMap.put(NAME_KEY, input.getName());
        inputMap.put(CLASS_KEY, input.getIODescriptor().getClassName());
        if (input.hasControllerDescriptor()) {
          inputMap.put(INITIALIZER_KEY, input.getControllerDescriptor().getClassName());
        }
        if (input.getIODescriptor().hasHistoryText()) {
          inputMap.put(USER_PAYLOAD_AS_TEXT,
              DagTypeConverters.getHistoryTextFromProto(
                  input.getIODescriptor(), inflater));
        }
        inputsList.add(inputMap);
      }
      putInto(vertexMap, ADDITIONAL_INPUTS_KEY, inputsList);

      ArrayList<Object> outputsList = new ArrayList<Object>();
      for (DAGProtos.RootInputLeafOutputProto output :
          vertexPlan.getOutputsList()) {
        Map<String,Object> outputMap = new LinkedHashMap<String, Object>();
        outputMap.put(NAME_KEY, output.getName());
        outputMap.put(CLASS_KEY, output.getIODescriptor().getClassName());
        if (output.hasControllerDescriptor()) {
          outputMap.put(INITIALIZER_KEY, output.getControllerDescriptor().getClassName());
        }
        if (output.getIODescriptor().hasHistoryText()) {
          outputMap.put(USER_PAYLOAD_AS_TEXT,
              DagTypeConverters.getHistoryTextFromProto(
                  output.getIODescriptor(), inflater));
        }
        outputsList.add(outputMap);
      }
      putInto(vertexMap, ADDITIONAL_OUTPUTS_KEY, outputsList);

      if (vertexPlan.hasVertexManagerPlugin()) {
        vertexMap.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
            vertexPlan.getVertexManagerPlugin().getClassName());
      }

      verticesList.add(vertexMap);
    }
    putInto(dagMap, VERTICES_KEY, verticesList);

    ArrayList<Object> edgesList = new ArrayList<Object>();
    for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
      Map<String,Object> edgeMap = new LinkedHashMap<String, Object>();
      edgeMap.put(EDGE_ID_KEY, edgePlan.getId());
      edgeMap.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
      edgeMap.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
      edgeMap.put(DATA_MOVEMENT_TYPE_KEY,
          edgePlan.getDataMovementType().name());
      edgeMap.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
      edgeMap.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
      edgeMap.put(EDGE_SOURCE_CLASS_KEY,
          edgePlan.getEdgeSource().getClassName());
      edgeMap.put(EDGE_DESTINATION_CLASS_KEY,
          edgePlan.getEdgeDestination().getClassName());
      if (edgePlan.getEdgeSource().hasHistoryText()) {
        edgeMap.put(OUTPUT_USER_PAYLOAD_AS_TEXT,
            DagTypeConverters.getHistoryTextFromProto(
                edgePlan.getEdgeSource(), inflater));
      }
      if (edgePlan.getEdgeDestination().hasHistoryText()) {
        edgeMap.put(INPUT_USER_PAYLOAD_AS_TEXT,
            DagTypeConverters.getHistoryTextFromProto(
                edgePlan.getEdgeDestination(), inflater));
      } // TEZ-2286 this is missing edgemanager descriptor for custom edge
      edgesList.add(edgeMap);
    }
    putInto(dagMap, EDGES_KEY, edgesList);

    ArrayList<Object> vertexGroupsList = new ArrayList<Object>();
    for (DAGProtos.PlanVertexGroupInfo vertexGroupInfo :
        dagPlan.getVertexGroupsList()) {
      Map<String,Object> groupMap = new LinkedHashMap<String, Object>();
      groupMap.put(VERTEX_GROUP_NAME_KEY, vertexGroupInfo.getGroupName());
      if (vertexGroupInfo.getGroupMembersCount() > 0 ) {
        groupMap.put(VERTEX_GROUP_MEMBERS_KEY, vertexGroupInfo.getGroupMembersList());
      }
      if (vertexGroupInfo.getOutputsCount() > 0) {
        groupMap.put(VERTEX_GROUP_OUTPUTS_KEY, vertexGroupInfo.getOutputsList());
      }

      if (vertexGroupInfo.getEdgeMergedInputsCount() > 0) {
        ArrayList<Object> edgeMergedInputs = new ArrayList<Object>();
        for (PlanGroupInputEdgeInfo edgeMergedInputInfo :
            vertexGroupInfo.getEdgeMergedInputsList()) {
          Map<String,Object> edgeMergedInput = new LinkedHashMap<String, Object>();
          edgeMergedInput.put(VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY,
              edgeMergedInputInfo.getDestVertexName());
          if (edgeMergedInputInfo.hasMergedInput()
            && edgeMergedInputInfo.getMergedInput().hasClassName()) {
            edgeMergedInput.put(PROCESSOR_CLASS_KEY,
                edgeMergedInputInfo.getMergedInput().getClassName());
            if (edgeMergedInputInfo.getMergedInput().hasHistoryText()) {
              edgeMergedInput.put(USER_PAYLOAD_AS_TEXT,
                  DagTypeConverters.getHistoryTextFromProto(
                      edgeMergedInputInfo.getMergedInput(), inflater));
            }
          }
          edgeMergedInputs.add(edgeMergedInput);
        }
        groupMap.put(VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY, edgeMergedInputs);
      }
      vertexGroupsList.add(groupMap);
    }
    putInto(dagMap, VERTEX_GROUPS_KEY, vertexGroupsList);

    return dagMap;
  }