in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [984:1059]
public VertexState restoreFromEvent(HistoryEvent historyEvent) {
switch (historyEvent.getEventType()) {
case VERTEX_INITIALIZED:
recoveryInitEventSeen = true;
recoveredState = setupVertex((VertexInitializedEvent) historyEvent);
createTasks();
if (LOG.isDebugEnabled()) {
LOG.debug("Recovered state for vertex after Init event"
+ ", vertex=" + logIdentifier
+ ", recoveredState=" + recoveredState);
}
return recoveredState;
case VERTEX_STARTED:
if (!recoveryInitEventSeen) {
throw new RuntimeException("Started Event seen but"
+ " no Init Event was encountered earlier");
}
recoveryStartEventSeen = true;
VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent;
startTimeRequested = startedEvent.getStartRequestedTime();
startedTime = startedEvent.getStartTime();
recoveredState = VertexState.RUNNING;
if (LOG.isDebugEnabled()) {
LOG.debug("Recovered state for vertex after Started event"
+ ", vertex=" + logIdentifier
+ ", recoveredState=" + recoveredState);
}
return recoveredState;
case VERTEX_PARALLELISM_UPDATED:
VertexParallelismUpdatedEvent updatedEvent =
(VertexParallelismUpdatedEvent) historyEvent;
if (updatedEvent.getVertexLocationHint() != null) {
setTaskLocationHints(updatedEvent.getVertexLocationHint());
}
numTasks = updatedEvent.getNumTasks();
handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers(),
updatedEvent.getRootInputSpecUpdates());
if (LOG.isDebugEnabled()) {
LOG.debug("Recovered state for vertex after parallelism updated event"
+ ", vertex=" + logIdentifier
+ ", recoveredState=" + recoveredState);
}
return recoveredState;
case VERTEX_COMMIT_STARTED:
recoveryCommitInProgress = true;
hasCommitter = true;
return recoveredState;
case VERTEX_FINISHED:
VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
if (finishedEvent.isFromSummary()) {
summaryCompleteSeen = true;
} else {
vertexCompleteSeen = true;
}
recoveryCommitInProgress = false;
recoveredState = finishedEvent.getState();
diagnostics.add(finishedEvent.getDiagnostics());
finishTime = finishedEvent.getFinishTime();
// TODO counters ??
if (LOG.isDebugEnabled()) {
LOG.debug("Recovered state for vertex after finished event"
+ ", vertex=" + logIdentifier
+ ", recoveredState=" + recoveredState);
}
return recoveredState;
case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
VertexDataMovementEventsGeneratedEvent vEvent =
(VertexDataMovementEventsGeneratedEvent) historyEvent;
this.recoveredEvents.addAll(vEvent.getTezEvents());
return recoveredState;
default:
throw new RuntimeException("Unexpected event received for restoring"
+ " state, eventType=" + historyEvent.getEventType());
}
}