in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [2724:2820]
private VertexState setupVertex() {
this.initTimeRequested = clock.getTime();
// VertexManager needs to be setup before attempting to Initialize any
// Inputs - since events generated by them will be routed to the
// VertexManager for handling.
if (dagVertexGroups != null && !dagVertexGroups.isEmpty()) {
List<GroupInputSpec> groupSpecList = Lists.newLinkedList();
for (VertexGroupInfo groupInfo : dagVertexGroups.values()) {
if (groupInfo.edgeMergedInputs.containsKey(getName())) {
InputDescriptor mergedInput = groupInfo.edgeMergedInputs.get(getName());
groupSpecList.add(new GroupInputSpec(groupInfo.groupName,
Lists.newLinkedList(groupInfo.groupMembers), mergedInput));
}
}
if (!groupSpecList.isEmpty()) {
groupInputSpecList = groupSpecList;
}
}
// Check if any inputs need initializers
if (rootInputDescriptors != null) {
LOG.info("Root Inputs exist for Vertex: " + getName() + " : "
+ rootInputDescriptors);
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input
: rootInputDescriptors.values()) {
if (input.getControllerDescriptor() != null &&
input.getControllerDescriptor().getClassName() != null) {
if (!hasInputInitializers()) {
inputsWithInitializers = Sets.newHashSet();
}
inputsWithInitializers.add(input.getName());
LOG.info("Starting root input initializer for input: "
+ input.getName() + ", with class: ["
+ input.getControllerDescriptor().getClassName() + "]");
}
}
}
boolean hasBipartite = false;
if (sourceVertices != null) {
for (Edge edge : sourceVertices.values()) {
if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.SCATTER_GATHER) {
hasBipartite = true;
break;
}
}
}
if (hasBipartite && hasInputInitializers()) {
LOG.error("A vertex with an Initial Input and a Shuffle Input are not supported at the moment");
return finished(VertexState.FAILED);
}
numTasks = getVertexPlan().getTaskConfig().getNumTasks();
if (!(numTasks == -1 || numTasks >= 0)) {
addDiagnostic("Invalid task count for vertex"
+ ", numTasks=" + numTasks);
trySetTerminationCause(VertexTerminationCause.INVALID_NUM_OF_TASKS);
return VertexState.FAILED;
}
checkTaskLimits();
// set VertexManager as the last step. Because in recovery case, we may need to restore
// some info from last the AM attempt and skip the initialization step. Otherwise numTasks may be
// reset to -1 after the restore.
try {
assignVertexManager();
} catch (TezException e1) {
String msg = "Fail to create VertexManager, " + ExceptionUtils.getStackTrace(e1);
LOG.error(msg);
return finished(VertexState.FAILED, VertexTerminationCause.INIT_FAILURE, msg);
}
try {
vertexManager.initialize();
vmIsInitialized.set(true);
if (!pendingVmEvents.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing: " + pendingVmEvents.size() + " pending VMEvents for Vertex: " +
logIdentifier);
}
for (VertexManagerEvent vmEvent : pendingVmEvents) {
vertexManager.onVertexManagerEventReceived(vmEvent);
}
pendingVmEvents.clear();
}
} catch (AMUserCodeException e) {
String msg = "Exception in " + e.getSource()+ ", vertex:" + logIdentifier;
LOG.error(msg, e);
finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE,
msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause()));
return VertexState.FAILED;
}
return VertexState.INITED;
}