public DAGPlan createDag()

in tez-api/src/main/java/org/apache/tez/dag/api/DAG.java [515:673]


  public DAGPlan createDag(Configuration dagConf) {
    verify(true);

    DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();

    dagBuilder.setName(this.name);
    
    if (!vertexGroups.isEmpty()) {
      for (VertexGroup av : vertexGroups) {
        GroupInfo groupInfo = av.getGroupInfo();
        PlanVertexGroupInfo.Builder groupBuilder = PlanVertexGroupInfo.newBuilder();
        groupBuilder.setGroupName(groupInfo.getGroupName());
        for (Vertex v : groupInfo.getMembers()) {
          groupBuilder.addGroupMembers(v.getName());
        }
        groupBuilder.addAllOutputs(groupInfo.outputs);
        for (Map.Entry<String, InputDescriptor> entry : 
             groupInfo.edgeMergedInputs.entrySet()) {
          groupBuilder.addEdgeMergedInputs(
              PlanGroupInputEdgeInfo.newBuilder().setDestVertexName(entry.getKey()).
              setMergedInput(DagTypeConverters.convertToDAGPlan(entry.getValue())));
        }
        dagBuilder.addVertexGroups(groupBuilder); 
      }
    }

    for (Vertex vertex : vertices.values()) {
      VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
      vertexBuilder.setName(vertex.getName());
      vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
      vertexBuilder.setProcessorDescriptor(DagTypeConverters
        .convertToDAGPlan(vertex.getProcessorDescriptor()));
      if (vertex.getInputs().size() > 0) {
        for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
          vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));
        }
      }
      if (vertex.getOutputs().size() > 0) {
        for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
          vertexBuilder.addOutputs(DagTypeConverters.convertToDAGPlan(output));
        }
      }

      //task config
      PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
      Resource resource = vertex.getTaskResource();
      taskConfigBuilder.setNumTasks(vertex.getParallelism());
      taskConfigBuilder.setMemoryMb(resource.getMemory());
      taskConfigBuilder.setVirtualCores(resource.getVirtualCores());
      taskConfigBuilder.setJavaOpts(vertex.getTaskLaunchCmdOpts());

      taskConfigBuilder.setTaskModule(vertex.getName());
      PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
      localResourcesBuilder.clear();
      for (Entry<String, LocalResource> entry :
             vertex.getTaskLocalFiles().entrySet()) {
        String key = entry.getKey();
        LocalResource lr = entry.getValue();
        localResourcesBuilder.setName(key);
        localResourcesBuilder.setUri(
          DagTypeConverters.convertToDAGPlan(lr.getResource()));
        localResourcesBuilder.setSize(lr.getSize());
        localResourcesBuilder.setTimeStamp(lr.getTimestamp());
        localResourcesBuilder.setType(
          DagTypeConverters.convertToDAGPlan(lr.getType()));
        localResourcesBuilder.setVisibility(
          DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
        if (lr.getType() == LocalResourceType.PATTERN) {
          if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
            throw new TezUncheckedException("LocalResource type set to pattern"
              + " but pattern is null or empty");
          }
          localResourcesBuilder.setPattern(lr.getPattern());
        }
        taskConfigBuilder.addLocalResource(localResourcesBuilder);
      }
      
      for (String key : vertex.getTaskEnvironment().keySet()) {
        PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
        envSettingBuilder.setKey(key);
        envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
        taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
      }

      if (vertex.getTaskLocationsHint() != null) {
        if (vertex.getTaskLocationsHint().getTaskLocationHints() != null) {
          for (TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()) {
            PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();

            if (hint.getAffinitizedContainer() != null) {
              throw new TezUncheckedException(
                  "Container affinity may not be specified via the DAG API");
            }
            if (hint.getDataLocalHosts() != null) {
              taskLocationHintBuilder.addAllHost(hint.getDataLocalHosts());
            }
            if (hint.getRacks() != null) {
              taskLocationHintBuilder.addAllRack(hint.getRacks());
            }

            vertexBuilder.addTaskLocationHint(taskLocationHintBuilder);
          }
        }
      }
      
      if (vertex.getVertexManagerPlugin() != null) {
        vertexBuilder.setVertexManagerPlugin(DagTypeConverters
            .convertToDAGPlan(vertex.getVertexManagerPlugin()));
      }

      for (Edge inEdge : vertex.getInputEdges()) {
        vertexBuilder.addInEdgeId(inEdge.getId());
      }

      for (Edge outEdge : vertex.getOutputEdges()) {
        vertexBuilder.addOutEdgeId(outEdge.getId());
      }

      vertexBuilder.setTaskConfig(taskConfigBuilder);
      dagBuilder.addVertex(vertexBuilder);
    }

    for (Edge edge : edges) {
      EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
      edgeBuilder.setId(edge.getId());
      edgeBuilder.setInputVertexName(edge.getInputVertex().getName());
      edgeBuilder.setOutputVertexName(edge.getOutputVertex().getName());
      edgeBuilder.setDataMovementType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataMovementType()));
      edgeBuilder.setDataSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataSourceType()));
      edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType()));
      edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource()));
      edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination()));
      if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM) {
        if (edge.getEdgeProperty().getEdgeManagerDescriptor() != null) {
          edgeBuilder.setEdgeManager(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeManagerDescriptor()));
        } // else the AM will deal with this.
      }
      dagBuilder.addEdge(edgeBuilder);
    }

    if (dagConf != null) {
      Iterator<Entry<String, String>> iter = dagConf.iterator();
      ConfigurationProto.Builder confProtoBuilder =
        ConfigurationProto.newBuilder();
      while (iter.hasNext()) {
        Entry<String, String> entry = iter.next();
        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
        kvp.setKey(entry.getKey());
        kvp.setValue(entry.getValue());
        confProtoBuilder.addConfKeyValues(kvp);
      }
      dagBuilder.setDagKeyValues(confProtoBuilder);
    }
    if (credentials != null) {
      dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(credentials));
      LogUtils.logCredentials(LOG, credentials, "dag");
    }
    return dagBuilder.build();
  }