in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [3285:3446]
public void transition(VertexImpl vertex, VertexEvent event) {
VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
boolean recovered = rEvent.isRecovered();
List<TezEvent> tezEvents = rEvent.getEvents();
if (vertex.getAppContext().isRecoveryEnabled()
&& !recovered
&& !tezEvents.isEmpty()) {
List<TezEvent> dataMovementEvents =
Lists.newArrayList();
for (TezEvent tezEvent : tezEvents) {
if (!isEventFromVertex(vertex, tezEvent.getSourceInfo())) {
continue;
}
if (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)
|| tezEvent.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)
|| tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
dataMovementEvents.add(tezEvent);
}
}
if (!dataMovementEvents.isEmpty()) {
VertexDataMovementEventsGeneratedEvent historyEvent =
new VertexDataMovementEventsGeneratedEvent(vertex.vertexId,
dataMovementEvents);
vertex.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(vertex.getDAGId(), historyEvent));
}
}
for(TezEvent tezEvent : tezEvents) {
if (LOG.isDebugEnabled()) {
LOG.debug("Vertex: " + vertex.getName() + " routing event: "
+ tezEvent.getEventType()
+ " Recovered:" + recovered);
}
EventMetaData sourceMeta = tezEvent.getSourceInfo();
switch(tezEvent.getEventType()) {
case INPUT_FAILED_EVENT:
case DATA_MOVEMENT_EVENT:
case COMPOSITE_DATA_MOVEMENT_EVENT:
{
if (isEventFromVertex(vertex, sourceMeta)) {
// event from this vertex. send to destination vertex
TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
} else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
} else {
((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
}
Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName());
Edge destEdge = vertex.targetVertices.get(destVertex);
if (destEdge == null) {
throw new TezUncheckedException("Bad destination vertex: " +
sourceMeta.getEdgeVertexName() + " for event vertex: " +
vertex.getVertexId());
}
vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex
.getVertexId(), Collections.singletonList(tezEvent)));
} else {
// event not from this vertex. must have come from source vertex.
// send to tasks
if (vertex.tasksNotYetScheduled) {
vertex.pendingTaskEvents.add(tezEvent);
} else {
Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
sourceMeta.getTaskVertexName()));
if (srcEdge == null) {
throw new TezUncheckedException("Bad source vertex: " +
sourceMeta.getTaskVertexName() + " for destination vertex: " +
vertex.getVertexId());
}
srcEdge.sendTezEventToDestinationTasks(tezEvent);
}
}
}
break;
case ROOT_INPUT_DATA_INFORMATION_EVENT:
if (vertex.tasksNotYetScheduled) {
vertex.pendingTaskEvents.add(tezEvent);
} else {
checkEventSourceMetadata(vertex, sourceMeta);
RootInputDataInformationEvent riEvent = (RootInputDataInformationEvent) tezEvent
.getEvent();
TezTaskID targetTaskID = TezTaskID.getInstance(vertex.getVertexId(),
riEvent.getTargetIndex());
vertex.eventHandler.handle(new TaskEventAddTezEvent(targetTaskID, tezEvent));
}
break;
case VERTEX_MANAGER_EVENT:
{
VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
"Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
if (target == vertex) {
vertex.vertexManager.onVertexManagerEventReceived(vmEvent);
} else {
checkEventSourceMetadata(vertex, sourceMeta);
vertex.eventHandler.handle(new VertexEventRouteEvent(target
.getVertexId(), Collections.singletonList(tezEvent)));
}
}
break;
case ROOT_INPUT_INITIALIZER_EVENT:
{
RootInputInitializerEvent riEvent = (RootInputInitializerEvent) tezEvent.getEvent();
Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName());
Preconditions.checkArgument(target != null,
"Event sent to unkown vertex: " + riEvent.getTargetVertexName());
if (target == vertex) {
vertex.rootInputInitializerManager.handleInitializerEvent(riEvent);
} else {
checkEventSourceMetadata(vertex, sourceMeta);
vertex.eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
Collections.singletonList(tezEvent)));
}
}
break;
case INPUT_READ_ERROR_EVENT:
{
checkEventSourceMetadata(vertex, sourceMeta);
Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
sourceMeta.getEdgeVertexName()));
srcEdge.sendTezEventToSourceTasks(tezEvent);
}
break;
case TASK_STATUS_UPDATE_EVENT:
{
checkEventSourceMetadata(vertex, sourceMeta);
TaskStatusUpdateEvent sEvent =
(TaskStatusUpdateEvent) tezEvent.getEvent();
vertex.getEventHandler().handle(
new TaskAttemptEventStatusUpdate(sourceMeta.getTaskAttemptID(),
sEvent));
}
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
{
checkEventSourceMetadata(vertex, sourceMeta);
vertex.getEventHandler().handle(
new TaskAttemptEvent(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_DONE));
}
break;
case TASK_ATTEMPT_FAILED_EVENT:
{
checkEventSourceMetadata(vertex, sourceMeta);
TaskAttemptFailedEvent taskFailedEvent =
(TaskAttemptFailedEvent) tezEvent.getEvent();
vertex.getEventHandler().handle(
new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_FAILED,
"Error: " + taskFailedEvent.getDiagnostics()));
}
break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
+ tezEvent.getEventType());
}
}
}