in tez-api/src/main/java/org/apache/tez/dag/api/DAG.java [428:530]
void checkAndInferOneToOneParallelism() {
// infer all 1-1 via dependencies
// collect all 1-1 edges where the source parallelism is set
Set<Vertex> newKnownTasksVertices = Sets.newHashSet();
for (Vertex vertex : vertices.values()) {
if (vertex.getParallelism() > -1) {
newKnownTasksVertices.add(vertex);
}
}
// walk through all known source 1-1 edges and infer parallelism
// add newly inferred vertices for consideration as known sources
// the outer loop will run for every new level of inferring the parallelism
// however, the entire logic will process each vertex only once
while(!newKnownTasksVertices.isEmpty()) {
Set<Vertex> knownTasksVertices = Sets.newHashSet(newKnownTasksVertices);
newKnownTasksVertices.clear();
for (Vertex v : knownTasksVertices) {
for (Edge e : v.getOutputEdges()) {
if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
Vertex outVertex = e.getOutputVertex();
if (outVertex.getParallelism() == -1) {
LOG.info("Inferring parallelism for vertex: "
+ outVertex.getName() + " to be " + v.getParallelism()
+ " from 1-1 connection with vertex " + v.getName());
outVertex.setParallelism(v.getParallelism());
newKnownTasksVertices.add(outVertex);
}
}
}
}
}
// check for inconsistency and errors
for (Edge e : edges) {
Vertex inputVertex = e.getInputVertex();
Vertex outputVertex = e.getOutputVertex();
if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
if (inputVertex.getParallelism() != outputVertex.getParallelism()) {
// both should be equal or equal to -1.
if (outputVertex.getParallelism() != -1) {
throw new TezUncheckedException(
"1-1 Edge. Destination vertex parallelism must match source vertex. "
+ "Vertex: " + inputVertex.getName() + " does not match vertex: "
+ outputVertex.getName());
}
}
}
}
// check the vertices with -1 parallelism, currently only 3 cases are allowed to has -1 parallelism.
// It is OK not using topological order to check vertices here.
// 1. has input initializers
// 2. 1-1 uninited sources
// 3. has custom vertex manager
for (Vertex vertex : vertices.values()) {
if (vertex.getParallelism() == -1) {
boolean hasInputInitializer = false;
if (vertex.getDataSources() != null && !vertex.getDataSources().isEmpty()) {
for (DataSourceDescriptor ds : vertex.getDataSources()) {
if (ds.getInputInitializerDescriptor() != null) {
hasInputInitializer = true;
break;
}
}
}
if (hasInputInitializer) {
continue;
} else {
// Account for the case where the vertex has a data source with a determined number of
// shards e.g. splits calculated on the client and not in the AM
// In this case, vertex parallelism is setup later using the data source's numShards
// and as a result, an initializer is not needed.
if (vertex.getDataSources() != null
&& vertex.getDataSources().size() == 1
&& vertex.getDataSources().get(0).getNumberOfShards() > -1) {
continue;
}
}
boolean has1to1UninitedSources = false;
if (vertex.getInputVertices()!= null && !vertex.getInputVertices().isEmpty()) {
for (Vertex srcVertex : vertex.getInputVertices()) {
if (srcVertex.getParallelism() == -1) {
has1to1UninitedSources = true;
break;
}
}
}
if (has1to1UninitedSources) {
continue;
}
if (vertex.getVertexManagerPlugin() != null) {
continue;
}
throw new IllegalStateException(vertex.getName() +
" has -1 tasks but does not have input initializers, " +
"1-1 uninited sources or custom vertex manager to set it at runtime");
}
}
}