public synchronized DAGPlan createDag()

in tez-api/src/main/java/org/apache/tez/dag/api/DAG.java [843:1122]


  public synchronized DAGPlan createDag(Configuration tezConf, Credentials extraCredentials,
      Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
      boolean tezLrsAsArchive, ServicePluginsDescriptor servicePluginsDescriptor,
      JavaOptsChecker javaOptsChecker) {
    Deque<String> topologicalVertexStack = verify(true);
    verifyLocalResources(tezConf);

    DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
    dagBuilder.setName(this.name);

    if (this.callerContext != null) {
      dagBuilder.setCallerContext(DagTypeConverters.convertCallerContextToProto(callerContext));
    }
    if (this.dagInfo != null && !this.dagInfo.isEmpty()) {
      dagBuilder.setDagInfo(this.dagInfo);
    }

    // Setup default execution context.
    VertexExecutionContext defaultContext = getDefaultExecutionContext();
    verifyExecutionContext(defaultContext, servicePluginsDescriptor, "DAGDefault");
    if (defaultContext != null) {
      DAGProtos.VertexExecutionContextProto contextProto = DagTypeConverters.convertToProto(
          defaultContext);
      dagBuilder.setDefaultExecutionContext(contextProto);
    }

    if (!vertexGroups.isEmpty()) {
      for (VertexGroup av : vertexGroups) {
        GroupInfo groupInfo = av.getGroupInfo();
        PlanVertexGroupInfo.Builder groupBuilder = PlanVertexGroupInfo.newBuilder();
        groupBuilder.setGroupName(groupInfo.getGroupName());
        for (Vertex v : groupInfo.getMembers()) {
          groupBuilder.addGroupMembers(v.getName());
        }
        groupBuilder.addAllOutputs(groupInfo.outputs);
        for (Map.Entry<String, InputDescriptor> entry :
             groupInfo.edgeMergedInputs.entrySet()) {
          groupBuilder.addEdgeMergedInputs(
              PlanGroupInputEdgeInfo.newBuilder().setDestVertexName(entry.getKey()).
              setMergedInput(DagTypeConverters.convertToDAGPlan(entry.getValue())));
        }
        dagBuilder.addVertexGroups(groupBuilder);
      }
    }

    Credentials dagCredentials = new Credentials();
    if (extraCredentials != null) {
      dagCredentials.mergeAll(extraCredentials);
    }
    dagCredentials.mergeAll(credentials);
    if (!commonTaskLocalFiles.isEmpty()) {
      dagBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(commonTaskLocalFiles));
    }

    Preconditions.checkArgument(topologicalVertexStack.size() == vertices.size(),
        "size of topologicalVertexStack is:" + topologicalVertexStack.size() +
        " while size of vertices is:" + vertices.size() +
        ", make sure they are the same in order to sort the vertices");
    while(!topologicalVertexStack.isEmpty()) {
      Vertex vertex = vertices.get(topologicalVertexStack.pop());
      // infer credentials, resources and parallelism from data source
      Resource vertexTaskResource = vertex.getTaskResource();
      if (vertexTaskResource == null) {
        vertexTaskResource = Resource.newInstance(tezConf.getInt(
            TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
            TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT), tezConf.getInt(
            TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
            TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
      }
      Map<String, LocalResource> vertexLRs = Maps.newHashMap();
      vertexLRs.putAll(vertex.getTaskLocalFiles());
      List<DataSourceDescriptor> dataSources = vertex.getDataSources();
      for (DataSourceDescriptor dataSource : dataSources) {
        if (dataSource.getCredentials() != null) {
          dagCredentials.addAll(dataSource.getCredentials());
        }
        if (dataSource.getAdditionalLocalFiles() != null) {
          TezCommonUtils
              .addAdditionalLocalResources(dataSource.getAdditionalLocalFiles(), vertexLRs,
                  "Vertex " + vertex.getName());
        }
      }
      if (tezJarResources != null) {
        TezCommonUtils
            .addAdditionalLocalResources(tezJarResources, vertexLRs, "Vertex " + vertex.getName());
      }
      if (binaryConfig != null) {
        vertexLRs.put(TezConstants.TEZ_PB_BINARY_CONF_NAME, binaryConfig);
      }
      int vertexParallelism = vertex.getParallelism();
      VertexLocationHint vertexLocationHint = vertex.getLocationHint();
      if (dataSources.size() == 1) {
        DataSourceDescriptor dataSource = dataSources.get(0);
        if (vertexParallelism == -1 && dataSource.getNumberOfShards() > -1) {
          vertexParallelism = dataSource.getNumberOfShards();
        }
        if (vertexLocationHint == null && dataSource.getLocationHint() != null) {
          vertexLocationHint = dataSource.getLocationHint();
        }
      }
      if (vertexParallelism == -1) {
        Preconditions.checkState(vertexLocationHint == null,
            "Cannot specify vertex location hint without specifying vertex parallelism. Vertex: "
                + vertex.getName());
      } else if (vertexLocationHint != null) {
        Preconditions.checkState(vertexParallelism == vertexLocationHint.getTaskLocationHints().size(),
            "vertex task location hint must equal vertex parallelism. Vertex: " + vertex.getName());
      }
      for (DataSinkDescriptor dataSink : vertex.getDataSinks()) {
        if (dataSink.getCredentials() != null) {
          dagCredentials.addAll(dataSink.getCredentials());
        }
      }

      VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
      vertexBuilder.setName(vertex.getName());
      vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until  TEZ-46.
      vertexBuilder.setProcessorDescriptor(DagTypeConverters
          .convertToDAGPlan(vertex.getProcessorDescriptor()));

      // Vertex ExecutionContext setup
      VertexExecutionContext execContext = vertex.getVertexExecutionContext();
      verifyExecutionContext(execContext, servicePluginsDescriptor, vertex.getName());
      if (execContext != null) {
        DAGProtos.VertexExecutionContextProto contextProto =
            DagTypeConverters.convertToProto(execContext);
        vertexBuilder.setExecutionContext(contextProto);
      }
      // End of VertexExecutionContext setup.

      if (vertex.getInputs().size() > 0) {
        for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : vertex.getInputs()) {
          vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));
        }
      }
      if (vertex.getOutputs().size() > 0) {
        for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output : vertex.getOutputs()) {
          vertexBuilder.addOutputs(DagTypeConverters.convertToDAGPlan(output));
        }
      }

      if (vertex.getConf()!= null && vertex.getConf().size() > 0) {
        ConfigurationProto.Builder confBuilder = ConfigurationProto.newBuilder();
        TezUtils.populateConfProtoFromEntries(vertex.getConf().entrySet(), confBuilder);
        vertexBuilder.setVertexConf(confBuilder);
      }

      //task config
      PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
      taskConfigBuilder.setNumTasks(vertexParallelism);
      taskConfigBuilder.setMemoryMb(vertexTaskResource.getMemory());
      taskConfigBuilder.setVirtualCores(vertexTaskResource.getVirtualCores());

      try {
        taskConfigBuilder.setJavaOpts(
            TezClientUtils.addDefaultsToTaskLaunchCmdOpts(vertex.getTaskLaunchCmdOpts(), tezConf,
                javaOptsChecker));
      } catch (TezException e) {
        throw new TezUncheckedException("Invalid TaskLaunchCmdOpts defined for Vertex "
            + vertex.getName() + " : " + e.getMessage(), e);
      }

      taskConfigBuilder.setTaskModule(vertex.getName());
      if (!vertexLRs.isEmpty()) {
        taskConfigBuilder.addAllLocalResource(DagTypeConverters.convertToDAGPlan(vertexLRs));
      }

      Map<String, String> taskEnv = Maps.newHashMap(vertex.getTaskEnvironment());
      TezYARNUtils.setupDefaultEnv(taskEnv, tezConf,
          TezConfiguration.TEZ_TASK_LAUNCH_ENV,
          TezConfiguration.TEZ_TASK_LAUNCH_ENV_DEFAULT,
          TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_ENV,
          TezConfiguration.TEZ_TASK_LAUNCH_CLUSTER_DEFAULT_ENV_DEFAULT,
          tezLrsAsArchive);
      for (Map.Entry<String, String> entry : taskEnv.entrySet()) {
        PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
        envSettingBuilder.setKey(entry.getKey());
        envSettingBuilder.setValue(entry.getValue());
        taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
      }

      if (vertexLocationHint != null) {
        if (vertexLocationHint.getTaskLocationHints() != null) {
          for (TaskLocationHint hint : vertexLocationHint.getTaskLocationHints()) {
            PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
            // we can allow this later on if needed
            if (hint.getAffinitizedTask() != null) {
              throw new TezUncheckedException(
                  "Task based affinity may not be specified via the DAG API");
            }

            if (hint.getHosts() != null) {
              taskLocationHintBuilder.addAllHost(hint.getHosts());
            }
            if (hint.getRacks() != null) {
              taskLocationHintBuilder.addAllRack(hint.getRacks());
            }

            vertexBuilder.addTaskLocationHint(taskLocationHintBuilder);
          }
        }
      }

      if (vertex.getVertexManagerPlugin() != null) {
        vertexBuilder.setVertexManagerPlugin(DagTypeConverters
            .convertToDAGPlan(vertex.getVertexManagerPlugin()));
      }

      for (Edge inEdge : vertex.getInputEdges()) {
        vertexBuilder.addInEdgeId(inEdge.getId());
      }

      for (Edge outEdge : vertex.getOutputEdges()) {
        vertexBuilder.addOutEdgeId(outEdge.getId());
      }

      vertexBuilder.setTaskConfig(taskConfigBuilder);
      dagBuilder.addVertex(vertexBuilder);
    }

    for (Edge edge : edges) {
      EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
      edgeBuilder.setId(edge.getId());
      edgeBuilder.setInputVertexName(edge.getInputVertex().getName());
      edgeBuilder.setOutputVertexName(edge.getOutputVertex().getName());
      edgeBuilder.setDataMovementType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataMovementType()));
      edgeBuilder.setDataSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataSourceType()));
      edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType()));
      edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource()));
      edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination()));
      if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM) {
        if (edge.getEdgeProperty().getEdgeManagerDescriptor() != null) {
          edgeBuilder.setEdgeManager(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeManagerDescriptor()));
        } // else the AM will deal with this.
      }
      dagBuilder.addEdge(edgeBuilder);
    }

    if (dagAccessControls != null) {
      dagBuilder.setAclInfo(DagTypeConverters.convertDAGAccessControlsToProto(dagAccessControls));
    }

    ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
    if (!this.dagConf.isEmpty()) {
      TezUtils.populateConfProtoFromEntries(this.dagConf.entrySet(), confProtoBuilder);
    }
    // Copy historyLogLevel from tezConf into dagConf if its not overridden in dagConf.
    String logLevel = this.dagConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
    if (logLevel != null) {
      // The config is from dagConf, we have already added it to the proto above, just check if
      // the value is valid.
      if (!HistoryLogLevel.validateLogLevel(logLevel)) {
        throw new IllegalArgumentException(
            "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
            " is set to invalid value: " + logLevel);
      }
    } else {
      // Validate and set value from tezConf.
      logLevel = tezConf.get(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
      if (logLevel != null) {
        if (!HistoryLogLevel.validateLogLevel(logLevel)) {
          throw new IllegalArgumentException(
              "Config: " + TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL +
              " is set to invalid value: " + logLevel);
        }
        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
        kvp.setKey(TezConfiguration.TEZ_HISTORY_LOGGING_LOGLEVEL);
        kvp.setValue(logLevel);
        confProtoBuilder.addConfKeyValues(kvp);
      }
    }
    dagBuilder.setDagConf(confProtoBuilder);

    if (dagCredentials != null) {
      dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(dagCredentials));
      TezCommonUtils.logCredentials(LOG, dagCredentials, "dag");
    }

    return dagBuilder.build();
  }