protected Flow executeFromState()

in hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java [142:302]


  protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
    throws ProcedureSuspendedException, ProcedureYieldException {
    final MasterServices services = env.getMasterServices();
    final AssignmentManager am = env.getAssignmentManager();
    updateProgress(true);
    // HBASE-14802 If we have not yet notified that we are processing a dead server, do so now.
    // This adds server to the DeadServer processing list but not to the DeadServers list.
    // Server gets removed from processing list below on procedure successful finish.
    if (!notifiedDeadServer) {
      notifiedDeadServer = true;
    }

    switch (state) {
      case SERVER_CRASH_START:
      case SERVER_CRASH_SPLIT_META_LOGS:
      case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
      case SERVER_CRASH_ASSIGN_META:
        break;
      default:
        // If hbase:meta is not assigned, yield.
        if (env.getAssignmentManager().waitMetaLoaded(this)) {
          throw new ProcedureSuspendedException();
        }
    }
    try {
      switch (state) {
        case SERVER_CRASH_START:
          LOG.info("Start " + this);
          // If carrying meta, process it first. Else, get list of regions on crashed server.
          if (this.carryingMeta) {
            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
          } else {
            setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
          }
          break;
        case SERVER_CRASH_SPLIT_META_LOGS:
          if (
            env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
              DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
          ) {
            zkCoordinatedSplitMetaLogs(env);
            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
          } else {
            am.getRegionStates().metaLogSplitting(serverName);
            addChildProcedure(createSplittingWalProcedures(env, true));
            setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR);
          }
          break;
        case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
          if (isSplittingDone(env, true)) {
            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
            am.getRegionStates().metaLogSplit(serverName);
          } else {
            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
          }
          break;
        case SERVER_CRASH_ASSIGN_META:
          assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
          setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
          break;
        case SERVER_CRASH_GET_REGIONS:
          this.regionsOnCrashedServer = getRegionsOnCrashedServer(env);
          // Where to go next? Depends on whether we should split logs at all or
          // if we should do distributed log splitting.
          if (regionsOnCrashedServer != null) {
            LOG.info("{} had {} regions", serverName, regionsOnCrashedServer.size());
            if (LOG.isTraceEnabled()) {
              this.regionsOnCrashedServer.stream().forEach(ri -> LOG.trace(ri.getShortNameToLog()));
            }
          }
          if (!this.shouldSplitWal) {
            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
          } else {
            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
          }
          break;
        case SERVER_CRASH_SPLIT_LOGS:
          if (
            env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
              DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
          ) {
            zkCoordinatedSplitLogs(env);
            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
          } else {
            am.getRegionStates().logSplitting(this.serverName);
            addChildProcedure(createSplittingWalProcedures(env, false));
            setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_WALS_DIR);
          }
          break;
        case SERVER_CRASH_DELETE_SPLIT_WALS_DIR:
          if (isSplittingDone(env, false)) {
            cleanupSplitDir(env);
            setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
            am.getRegionStates().logSplit(this.serverName);
          } else {
            setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
          }
          break;
        case SERVER_CRASH_ASSIGN:
          // If no regions to assign, skip assign and skip to the finish.
          // Filter out meta regions. Those are handled elsewhere in this procedure.
          // Filter changes this.regionsOnCrashedServer.
          if (filterDefaultMetaRegions()) {
            if (LOG.isTraceEnabled()) {
              LOG.trace("Assigning regions " + RegionInfo.getShortNameToLog(regionsOnCrashedServer)
                + ", " + this + "; cycles=" + getCycles());
            }
            assignRegions(env, regionsOnCrashedServer);
          }
          // If there is no replication peer, we do not need to enter the claim queues stage.
          // This is also very important that now we will later initialize ReplicationQueueStorage
          // so if there is no replication peer added yet, the storage can not be accessed.
          // And there will be no race because:
          // 1. For adding replication peer, if the peer storage has not been updated yet, the crash
          // region server will not have any replication queues for this peer, so it is safe to skip
          // claiming.
          // 2. For removing replication peer, it it has already updated the peer storage, then
          // there is no way to rollback and region servers are already started to close and delete
          // replication queues, so it is also safe to skip claiming.
          if (env.getReplicationPeerManager().listPeers(null).isEmpty()) {
            setNextState(ServerCrashState.SERVER_CRASH_FINISH);
          } else {
            setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
          }
          break;
        case SERVER_CRASH_HANDLE_RIT2:
          // Noop. Left in place because we used to call handleRIT here for a second time
          // but no longer necessary since HBASE-20634.
          if (env.getReplicationPeerManager().listPeers(null).isEmpty()) {
            setNextState(ServerCrashState.SERVER_CRASH_FINISH);
          } else {
            setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
          }
          break;
        case SERVER_CRASH_CLAIM_REPLICATION_QUEUES:
          if (
            env.getMasterServices().getProcedures().stream()
              .filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
              .anyMatch(p -> !p.isFinished())
          ) {
            LOG.info("There is a pending {}, will retry claim replication queue later",
              MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName());
            suspend(10_000, true);
            return Flow.NO_MORE_STATE;
          }
          addChildProcedure(new AssignReplicationQueuesProcedure(serverName));
          setNextState(ServerCrashState.SERVER_CRASH_FINISH);
          break;
        case SERVER_CRASH_FINISH:
          LOG.info("removed crashed server {} after splitting done", serverName);
          services.getAssignmentManager().getRegionStates().removeServer(serverName);
          updateProgress(true);
          return Flow.NO_MORE_STATE;
        default:
          throw new UnsupportedOperationException("unhandled state=" + state);
      }
    } catch (IOException e) {
      LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + getCycles(), e);
    }
    return Flow.HAS_MORE_STATE;
  }