in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [4123:4280]
private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean isPendingEvents) throws AMUserCodeException {
for(TezEvent tezEvent : tezEvents) {
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: " + getLogIdentifier() + " routing event: "
+ tezEvent.getEventType());
}
EventMetaData sourceMeta = tezEvent.getSourceInfo();
switch(tezEvent.getEventType()) {
case CUSTOM_PROCESSOR_EVENT:
{
// set version as app attempt id
((CustomProcessorEvent) tezEvent.getEvent()).setVersion(
appContext.getApplicationAttemptId().getAttemptId());
// route event to task
EventMetaData destinationMeta = tezEvent.getDestinationInfo();
Task targetTask = getTask(destinationMeta.getTaskAttemptID().getTaskID());
targetTask.registerTezEvent(tezEvent);
}
break;
case INPUT_FAILED_EVENT:
case DATA_MOVEMENT_EVENT:
case COMPOSITE_DATA_MOVEMENT_EVENT:
{
if (isEventFromVertex(this, sourceMeta)) {
// event from this vertex. send to destination vertex
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
} else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
} else {
((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
}
Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
Edge destEdge = targetVertices.get(destVertex);
if (destEdge == null) {
throw new TezUncheckedException("Bad destination vertex: " +
sourceMeta.getEdgeVertexName() + " for event vertex: " +
getLogIdentifier());
}
eventHandler.handle(new VertexEventRouteEvent(destVertex
.getVertexId(), Collections.singletonList(tezEvent)));
} else {
if (tasksNotYetScheduled) {
// this is only needed to support mixed mode routing. Else for
// on demand routing events can be directly added to taskEvents
// when legacy routing is removed then pending task events can be
// removed.
pendingTaskEvents.add(tezEvent);
} else {
// event not from this vertex. must have come from source vertex.
int srcTaskIndex = sourceMeta.getTaskID().getId();
Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName());
Edge srcEdge = sourceVertices.get(edgeVertex);
if (srcEdge == null) {
throw new TezUncheckedException("Bad source vertex: " +
sourceMeta.getTaskVertexName() + " for destination vertex: " +
getLogIdentifier());
}
if (srcEdge.hasOnDemandRouting()) {
processOnDemandEvent(tezEvent, srcEdge, srcTaskIndex);
} else {
// send to tasks
srcEdge.sendTezEventToDestinationTasks(tezEvent);
}
}
}
}
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
{
checkEventSourceMetadata(this, sourceMeta);
if (tasksNotYetScheduled) {
// this is only needed to support mixed mode routing. Else for
// on demand routing events can be directly added to taskEvents
// when legacy routing is removed then pending task events can be
// removed.
pendingTaskEvents.add(tezEvent);
} else {
InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
Task targetTask = getTask(riEvent.getTargetIndex());
targetTask.registerTezEvent(tezEvent);
}
}
break;
case VERTEX_MANAGER_EVENT:
{
// VM events on task success only can be changed as part of TEZ-1532
VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
Vertex target = getDAG().getVertex(vmEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
"Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
if (srcTaId.getVertexID().equals(vertexId)) {
// this is the producer tasks' vertex
vmEvent.setProducerAttemptIdentifier(
getTaskAttemptIdentifier(dag.getName(), getName(), srcTaId));
}
if (target == this) {
if (!vmIsInitialized.get()) {
// The VM hasn't been setup yet, defer event consumption
pendingVmEvents.add(vmEvent);
} else {
vertexManager.onVertexManagerEventReceived(vmEvent);
}
} else {
checkEventSourceMetadata(this, sourceMeta);
eventHandler.handle(new VertexEventRouteEvent(target
.getVertexId(), Collections.singletonList(tezEvent)));
}
}
break;
case ROOT_INPUT_INITIALIZER_EVENT:
{
InputInitializerEvent riEvent = (InputInitializerEvent) tezEvent.getEvent();
Vertex target = getDAG().getVertex(riEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
"Event sent to unknown vertex: " + riEvent.getTargetVertexName());
riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName());
if (target == this) {
if (rootInputDescriptors == null ||
!rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
throw new TezUncheckedException(
"InputInitializerEvent targeted at unknown initializer on vertex " +
logIdentifier + ", Event=" + riEvent);
}
if (getState() == VertexState.NEW) {
pendingInitializerEvents.add(tezEvent);
} else if (getState() == VertexState.INITIALIZING) {
rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
} else {
// Currently, INITED and subsequent states means Initializer complete / failure
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in "
+ getLogIdentifier() + ", state=" + getState());
}
}
} else {
checkEventSourceMetadata(this, sourceMeta);
eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
Collections.singletonList(tezEvent)));
}
}
break;
case INPUT_READ_ERROR_EVENT:
{
checkEventSourceMetadata(this, sourceMeta);
Edge srcEdge = sourceVertices.get(this.getDAG().getVertex(
sourceMeta.getEdgeVertexName()));
srcEdge.sendTezEventToSourceTasks(tezEvent);
}
break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
+ tezEvent.getEventType());
}
}
}