public VertexState restoreFromEvent()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [984:1059]


  public VertexState restoreFromEvent(HistoryEvent historyEvent) {
    switch (historyEvent.getEventType()) {
      case VERTEX_INITIALIZED:
        recoveryInitEventSeen = true;
        recoveredState = setupVertex((VertexInitializedEvent) historyEvent);
        createTasks();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Recovered state for vertex after Init event"
              + ", vertex=" + logIdentifier
              + ", recoveredState=" + recoveredState);
        }
        return recoveredState;
      case VERTEX_STARTED:
        if (!recoveryInitEventSeen) {
          throw new RuntimeException("Started Event seen but"
              + " no Init Event was encountered earlier");
        }
        recoveryStartEventSeen = true;
        VertexStartedEvent startedEvent = (VertexStartedEvent) historyEvent;
        startTimeRequested = startedEvent.getStartRequestedTime();
        startedTime = startedEvent.getStartTime();
        recoveredState = VertexState.RUNNING;
        if (LOG.isDebugEnabled()) {
          LOG.debug("Recovered state for vertex after Started event"
              + ", vertex=" + logIdentifier
              + ", recoveredState=" + recoveredState);
        }
        return recoveredState;
      case VERTEX_PARALLELISM_UPDATED:
        VertexParallelismUpdatedEvent updatedEvent =
            (VertexParallelismUpdatedEvent) historyEvent;
        if (updatedEvent.getVertexLocationHint() != null) {
          setTaskLocationHints(updatedEvent.getVertexLocationHint());
        }
        numTasks = updatedEvent.getNumTasks();
        handleParallelismUpdate(numTasks, updatedEvent.getSourceEdgeManagers(),
          updatedEvent.getRootInputSpecUpdates());
        if (LOG.isDebugEnabled()) {
          LOG.debug("Recovered state for vertex after parallelism updated event"
              + ", vertex=" + logIdentifier
              + ", recoveredState=" + recoveredState);
        }
        return recoveredState;
      case VERTEX_COMMIT_STARTED:
        recoveryCommitInProgress = true;
        hasCommitter = true;
        return recoveredState;
      case VERTEX_FINISHED:
        VertexFinishedEvent finishedEvent = (VertexFinishedEvent) historyEvent;
        if (finishedEvent.isFromSummary()) {
          summaryCompleteSeen  = true;
        } else {
          vertexCompleteSeen = true;
        }
        recoveryCommitInProgress = false;
        recoveredState = finishedEvent.getState();
        diagnostics.add(finishedEvent.getDiagnostics());
        finishTime = finishedEvent.getFinishTime();
        // TODO counters ??
        if (LOG.isDebugEnabled()) {
          LOG.debug("Recovered state for vertex after finished event"
              + ", vertex=" + logIdentifier
              + ", recoveredState=" + recoveredState);
        }
        return recoveredState;
      case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
        VertexDataMovementEventsGeneratedEvent vEvent =
            (VertexDataMovementEventsGeneratedEvent) historyEvent;
        this.recoveredEvents.addAll(vEvent.getTezEvents());
        return recoveredState;
      default:
        throw new RuntimeException("Unexpected event received for restoring"
            + " state, eventType=" + historyEvent.getEventType());

    }
  }