in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [1752:1848]
private VertexState setupVertex(VertexInitializedEvent event) {
if (event == null) {
initTimeRequested = clock.getTime();
} else {
initTimeRequested = event.getInitRequestedTime();
initedTime = event.getInitedTime();
}
// VertexManager needs to be setup before attempting to Initialize any
// Inputs - since events generated by them will be routed to the
// VertexManager for handling.
if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) {
List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
for (VertexGroupInfo groupInfo : dagVertexGroups.values()) {
if (groupInfo.edgeMergedInputs.containsKey(getName())) {
InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName());
groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
}
}
if (!groupSpecList.isEmpty()) {
groupInputSpecList = groupSpecList;
}
}
// Check if any inputs need initializers
if (event != null) {
this.rootInputDescriptors = event.getAdditionalInputs();
} else {
if (rootInputDescriptors != null) {
LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
+ rootInputDescriptors);
for (RootInputLeafOutputDescriptor<InputDescriptor> input : rootInputDescriptors.values()) {
if (input.getInitializerClassName() != null) {
if (inputsWithInitializers == null) {
inputsWithInitializers = Sets.newHashSet();
}
inputsWithInitializers.add(input.getEntityName());
LOG.info("Starting root input initializer for input: "
+ input.getEntityName() + ", with class: ["
+ input.getInitializerClassName() + "]");
}
}
}
}
boolean hasBipartite = false;
if (sourceVertices != null) {
for (Edge edge : sourceVertices.values()) {
if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
hasBipartite = true;
break;
}
}
}
if (hasBipartite && inputsWithInitializers != null) {
LOG.fatal("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
if (event != null) {
return VertexState.FAILED;
} else {
return finished(VertexState.FAILED);
}
}
assignVertexManager();
vertexManager.initialize();
// Setup tasks early if possible. If the VertexManager is not being used
// to set parallelism, sending events to Tasks is safe (and less confusing
// then relying on tasks to be created after TaskEvents are generated).
// For VertexManagers setting parallelism, the setParallelism call needs
// to be inline.
if (event != null) {
numTasks = event.getNumTasks();
} else {
numTasks = getVertexPlan().getTaskConfig().getNumTasks();
}
if (!(numTasks == -1 || numTasks >= 0)) {
addDiagnostic("Invalid task count for vertex"
+ ", numTasks=" + numTasks);
trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
if (event != null) {
abortVertex(VertexStatus.State.FAILED);
return finished(VertexState.FAILED);
} else {
return VertexState.FAILED;
}
}
checkTaskLimits();
return VertexState.INITED;
}