void checkAndInferOneToOneParallelism()

in tez-api/src/main/java/org/apache/tez/dag/api/DAG.java [428:530]


  void checkAndInferOneToOneParallelism() {
    // infer all 1-1 via dependencies
    // collect all 1-1 edges where the source parallelism is set
    Set<Vertex> newKnownTasksVertices = Sets.newHashSet();
    for (Vertex vertex : vertices.values()) {
      if (vertex.getParallelism() > -1) {
        newKnownTasksVertices.add(vertex);
      }
    }

    // walk through all known source 1-1 edges and infer parallelism
    // add newly inferred vertices for consideration as known sources
    // the outer loop will run for every new level of inferring the parallelism
    // however, the entire logic will process each vertex only once
    while(!newKnownTasksVertices.isEmpty()) {
      Set<Vertex> knownTasksVertices = Sets.newHashSet(newKnownTasksVertices);
      newKnownTasksVertices.clear();
      for (Vertex v : knownTasksVertices) {
        for (Edge e : v.getOutputEdges()) {
          if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
            Vertex outVertex = e.getOutputVertex();
            if (outVertex.getParallelism() == -1) {
              LOG.info("Inferring parallelism for vertex: "
                  + outVertex.getName() + " to be " + v.getParallelism()
                  + " from 1-1 connection with vertex " + v.getName());
              outVertex.setParallelism(v.getParallelism());
              newKnownTasksVertices.add(outVertex);
            }
          }
        }
      }
    }

    // check for inconsistency and errors
    for (Edge e : edges) {
      Vertex inputVertex = e.getInputVertex();
      Vertex outputVertex = e.getOutputVertex();

      if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
        if (inputVertex.getParallelism() != outputVertex.getParallelism()) {
          // both should be equal or equal to -1.
          if (outputVertex.getParallelism() != -1) {
            throw new TezUncheckedException(
                "1-1 Edge. Destination vertex parallelism must match source vertex. "
                + "Vertex: " + inputVertex.getName() + " does not match vertex: "
                + outputVertex.getName());
          }
        }
      }
    }

    // check the vertices with -1 parallelism, currently only 3 cases are allowed to has -1 parallelism.
    // It is OK not using topological order to check vertices here.
    // 1. has input initializers
    // 2. 1-1 uninited sources
    // 3. has custom vertex manager
    for (Vertex vertex : vertices.values()) {
      if (vertex.getParallelism() == -1) {
        boolean hasInputInitializer = false;
        if (vertex.getDataSources() != null && !vertex.getDataSources().isEmpty()) {
          for (DataSourceDescriptor ds : vertex.getDataSources()) {
            if (ds.getInputInitializerDescriptor() != null) {
              hasInputInitializer = true;
              break;
            }
          }
        }
        if (hasInputInitializer) {
          continue;
        } else {
          // Account for the case where the vertex has a data source with a determined number of
          // shards e.g. splits calculated on the client and not in the AM
          // In this case, vertex parallelism is setup later using the data source's numShards
          // and as a result, an initializer is not needed.
          if (vertex.getDataSources() != null
              && vertex.getDataSources().size() == 1
              &&  vertex.getDataSources().get(0).getNumberOfShards() > -1) {
            continue;
          }
        }

        boolean has1to1UninitedSources = false;
        if (vertex.getInputVertices()!= null && !vertex.getInputVertices().isEmpty()) {
          for (Vertex srcVertex : vertex.getInputVertices()) {
            if (srcVertex.getParallelism() == -1) {
              has1to1UninitedSources = true;
              break;
            }
          }
        }
        if (has1to1UninitedSources) {
          continue;
        }

        if (vertex.getVertexManagerPlugin() != null) {
          continue;
        }
        throw new IllegalStateException(vertex.getName() +
            " has -1 tasks but does not have input initializers, " +
            "1-1 uninited sources or custom vertex manager to set it at runtime");
      }
    }
  }