in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [2558:2668]
private VertexState handleInitEvent(VertexImpl vertex, VertexEvent event) {
VertexState state = vertex.setupVertex();
if (state.equals(VertexState.FAILED)) {
return state;
}
// TODO move before to handle NEW state
if (vertex.targetVertices != null) {
for (Edge e : vertex.targetVertices.values()) {
if (e.getEdgeManager() == null) {
Preconditions
.checkState(
e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
"Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
vertex.uninitializedEdges.add(e);
}
}
}
if (vertex.sourceVertices != null) {
for (Edge e : vertex.sourceVertices.values()) {
if (e.getEdgeManager() == null) {
Preconditions
.checkState(
e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
"Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
vertex.uninitializedEdges.add(e);
}
}
}
// Create tasks based on initial configuration, but don't start them yet.
if (vertex.numTasks == -1) {
LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
+ " to set #tasks for the vertex " + vertex.getVertexId());
if (vertex.inputsWithInitializers != null) {
// Use DAGScheduler to arbitrate resources among vertices later
vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
vertex.eventHandler, -1,
vertex.appContext.getTaskScheduler().getNumClusterNodes(),
vertex.getTaskResource(),
vertex.appContext.getTaskScheduler().getTotalResources());
List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
for (String inputName : vertex.inputsWithInitializers) {
inputList.add(vertex.rootInputDescriptors.get(inputName));
}
LOG.info("Vertex will initialize via inputInitializers "
+ vertex.logIdentifier + ". Starting root input initializers: "
+ vertex.inputsWithInitializers.size());
vertex.rootInputInitializerManager.runInputInitializers(inputList);
return VertexState.INITIALIZING;
} else {
boolean hasOneToOneUninitedSource = false;
for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
if (entry.getValue().getEdgeProperty().getDataMovementType() ==
DataMovementType.ONE_TO_ONE) {
if (entry.getKey().getTotalTasks() == -1) {
hasOneToOneUninitedSource = true;
break;
}
}
}
if (hasOneToOneUninitedSource) {
LOG.info("Vertex will initialize from 1-1 sources. " + vertex.logIdentifier);
return VertexState.INITIALIZING;
}
if (vertex.vertexPlan.hasVertexManagerPlugin()) {
LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier);
return VertexState.INITIALIZING;
}
throw new TezUncheckedException(vertex.getVertexId() +
" has -1 tasks but does not have input initializers, " +
"1-1 uninited sources or custom vertex manager to set it at runtime");
}
} else {
LOG.info("Creating " + vertex.numTasks + " for vertex: " + vertex.logIdentifier);
vertex.createTasks();
if (vertex.inputsWithInitializers != null) {
vertex.rootInputInitializerManager = vertex.createRootInputInitializerManager(
vertex.getDAG().getName(), vertex.getName(), vertex.getVertexId(),
vertex.eventHandler, vertex.getTotalTasks(),
vertex.appContext.getTaskScheduler().getNumClusterNodes(),
vertex.getTaskResource(),
vertex.appContext.getTaskScheduler().getTotalResources());
List<RootInputLeafOutputDescriptor<InputDescriptor>> inputList = Lists
.newArrayListWithCapacity(vertex.inputsWithInitializers.size());
for (String inputName : vertex.inputsWithInitializers) {
inputList.add(vertex.rootInputDescriptors.get(inputName));
}
LOG.info("Starting root input initializers: "
+ vertex.inputsWithInitializers.size());
// special case when numTasks>0 and still we want to stay in initializing
// state. This is handled in RootInputInitializedTransition specially.
vertex.initWaitsForRootInitializers = true;
vertex.rootInputInitializerManager.runInputInitializers(inputList);
return VertexState.INITIALIZING;
}
if (!vertex.uninitializedEdges.isEmpty()) {
LOG.info("Vertex has uninitialized edges. " + vertex.logIdentifier);
return VertexState.INITIALIZING;
}
LOG.info("Directly initializing vertex: " + vertex.logIdentifier);
boolean isInitialized = vertex.initializeVertex();
if (isInitialized) {
return VertexState.INITED;
} else {
return VertexState.FAILED;
}
}
}