in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [3079:3179]
private VertexState handleInitEvent(VertexImpl vertex) {
VertexState state = vertex.setupVertex();
if (state.equals(VertexState.FAILED)) {
return state;
}
// TODO move before to handle NEW state
if (vertex.targetVertices != null) {
for (Edge e : vertex.targetVertices.values()) {
if (e.getEdgeManager() == null) {
Preconditions
.checkState(
e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
"Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
vertex.uninitializedEdges.add(e);
}
}
}
if (vertex.sourceVertices != null) {
for (Edge e : vertex.sourceVertices.values()) {
if (e.getEdgeManager() == null) {
Preconditions
.checkState(
e.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM,
"Null edge manager allowed only for custom edge. " + vertex.logIdentifier);
vertex.uninitializedEdges.add(e);
}
}
}
// Create tasks based on initial configuration, but don't start them yet.
if (vertex.numTasks == -1) {
// this block must always return VertexState.INITIALIZING
LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers/1-1 split"
+ " to set #tasks for the vertex " + vertex.getLogIdentifier());
if (vertex.hasInputInitializers()) {
if (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit()) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
try {
vertex.setupInputInitializerManager();
} catch (TezException e) {
String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e);
LOG.info(msg);
return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
}
}
return VertexState.INITIALIZING;
} else {
boolean hasOneToOneUninitedSource = false;
for (Map.Entry<Vertex, Edge> entry : vertex.sourceVertices.entrySet()) {
if (entry.getValue().getEdgeProperty().getDataMovementType() ==
DataMovementType.ONE_TO_ONE) {
if (entry.getKey().getTotalTasks() == -1) {
hasOneToOneUninitedSource = true;
break;
}
}
}
if (hasOneToOneUninitedSource) {
LOG.info("Vertex will initialize from 1-1 sources. " + vertex.logIdentifier);
return VertexState.INITIALIZING;
}
if (vertex.vertexPlan.hasVertexManagerPlugin()) {
LOG.info("Vertex will initialize via custom vertex manager. " + vertex.logIdentifier);
return VertexState.INITIALIZING;
}
throw new TezUncheckedException(vertex.getLogIdentifier() +
" has -1 tasks but does not have input initializers, " +
"1-1 uninited sources or custom vertex manager to set it at runtime");
}
} else {
LOG.info("Creating " + vertex.numTasks + " tasks for vertex: " + vertex.logIdentifier);
vertex.createTasks();
// this block may return VertexState.INITIALIZING
if (vertex.hasInputInitializers() && (vertex.recoveryData == null || !vertex.recoveryData.shouldSkipInit())) {
LOG.info("Vertex will initialize from input initializer. " + vertex.logIdentifier);
try {
vertex.setupInputInitializerManager();
} catch (TezException e) {
String msg = "Fail to create InputInitializerManager, " + ExceptionUtils.getStackTrace(e);
LOG.error(msg);
return vertex.finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
}
return VertexState.INITIALIZING;
}
if (!vertex.uninitializedEdges.isEmpty()) {
LOG.info("Vertex has uninitialized edges. " + vertex.logIdentifier);
return VertexState.INITIALIZING;
}
LOG.info("Directly initializing vertex: " + vertex.logIdentifier);
// vertex is completely configured. Send out notification now.
vertex.maybeSendConfiguredEvent();
boolean isInitialized = vertex.initializeVertex();
if (isInitialized) {
return VertexState.INITED;
} else {
return VertexState.FAILED;
}
}
}