protected Flow executeFromState()

in hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java [234:392]


  protected Flow executeFromState(MasterProcedureEnv env,
    PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException {
    switch (state) {
      case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
        try {
          if (
            env.getMasterServices().getProcedures().stream()
              .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
              .anyMatch(p -> !p.isFinished())
          ) {
            LOG.info("There is a pending {}, give up execution of {}",
              MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
              getClass().getSimpleName());
            setFailure("master-transit-peer-sync-replication-state",
              new DoNotRetryIOException("There is a pending "
                + MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
            return Flow.NO_MORE_STATE;
          }
          checkPeerModificationEnabled(env);
          preTransit(env);
        } catch (IOException e) {
          LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} "
            + "when transiting sync replication peer state to {}, "
            + "mark the procedure as failure and give up", peerId, toState, e);
          setFailure("master-transit-peer-sync-replication-state", e);
          return Flow.NO_MORE_STATE;
        }
        setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE);
        return Flow.HAS_MORE_STATE;
      case SET_PEER_NEW_SYNC_REPLICATION_STATE:
        try {
          setPeerNewSyncReplicationState(env);
        } catch (ReplicationException e) {
          throw suspend(env.getMasterConfiguration(),
            backoff -> LOG.warn(
              "Failed to update peer storage for peer {} when starting transiting sync "
                + "replication peer state from {} to {}, sleep {} secs and retry",
              peerId, fromState, toState, backoff / 1000, e));
        }
        resetRetry();
        setNextState(
          PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
        return Flow.HAS_MORE_STATE;
      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN:
        addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
          .toArray(RefreshPeerProcedure[]::new));
        setNextStateAfterRefreshBegin();
        return Flow.HAS_MORE_STATE;
      case REOPEN_ALL_REGIONS_IN_PEER:
        reopenRegions(env);
        if (fromState.equals(SyncReplicationState.STANDBY)) {
          assert serial;
          setNextState(
            PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER);
        } else {
          setNextState(
            PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
        }
        return Flow.HAS_MORE_STATE;
      case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER:
        try {
          setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
        } catch (Exception e) {
          throw suspend(env.getMasterConfiguration(),
            backoff -> LOG.warn(
              "Failed to update last pushed sequence id for peer {} when transiting sync "
                + "replication peer state from {} to {}, sleep {} secs and retry",
              peerId, fromState, toState, backoff / 1000, e));
        }
        resetRetry();
        setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
        return Flow.HAS_MORE_STATE;
      case REPLAY_REMOTE_WAL_IN_PEER:
        replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
        setNextState(
          PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
        return Flow.HAS_MORE_STATE;
      case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
        try {
          removeAllReplicationQueues(env);
        } catch (ReplicationException e) {
          throw suspend(env.getMasterConfiguration(),
            backoff -> LOG.warn(
              "Failed to remove all replication queues peer {} when starting transiting"
                + " sync replication peer state from {} to {}, sleep {} secs and retry",
              peerId, fromState, toState, backoff / 1000, e));
        }
        resetRetry();
        setNextState(fromState.equals(SyncReplicationState.ACTIVE)
          ? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
          : PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
        return Flow.HAS_MORE_STATE;
      case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
        try {
          transitPeerSyncReplicationState(env);
        } catch (ReplicationException e) {
          throw suspend(env.getMasterConfiguration(),
            backoff -> LOG.warn(
              "Failed to update peer storage for peer {} when ending transiting sync "
                + "replication peer state from {} to {}, sleep {} secs and retry",
              peerId, fromState, toState, backoff / 1000, e));
        }
        resetRetry();
        setNextState(
          PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
        return Flow.HAS_MORE_STATE;
      case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END:
        addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
          .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
          .toArray(RefreshPeerProcedure[]::new));
        setNextStateAfterRefreshEnd();
        return Flow.HAS_MORE_STATE;
      case SYNC_REPLICATION_SET_PEER_ENABLED:
        try {
          enablePeer(env);
        } catch (ReplicationException e) {
          throw suspend(env.getMasterConfiguration(),
            backoff -> LOG.warn(
              "Failed to set peer enabled for peer {} when transiting sync replication peer "
                + "state from {} to {}, sleep {} secs and retry",
              peerId, fromState, toState, backoff / 1000, e));
        }
        resetRetry();
        setNextState(
          PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
        return Flow.HAS_MORE_STATE;
      case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS:
        refreshPeer(env, PeerOperationType.ENABLE);
        setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
        return Flow.HAS_MORE_STATE;
      case CREATE_DIR_FOR_REMOTE_WAL:
        try {
          createDirForRemoteWAL(env);
        } catch (IOException e) {
          throw suspend(env.getMasterConfiguration(),
            backoff -> LOG.warn(
              "Failed to create remote wal dir for peer {} when transiting sync replication "
                + "peer state from {} to {}, sleep {} secs and retry",
              peerId, fromState, toState, backoff / 1000, e));
        }
        resetRetry();
        setNextState(
          PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
        return Flow.HAS_MORE_STATE;
      case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
        try {
          postTransit(env);
        } catch (IOException e) {
          LOG.warn(
            "Failed to call post CP hook for peer {} when transiting sync replication "
              + "peer state from {} to {}, ignore since the procedure has already done",
            peerId, fromState, toState, e);
        }
        return Flow.NO_MORE_STATE;
      default:
        throw new UnsupportedOperationException("unhandled state=" + state);
    }
  }