in hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java [142:302]
protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state)
throws ProcedureSuspendedException, ProcedureYieldException {
final MasterServices services = env.getMasterServices();
final AssignmentManager am = env.getAssignmentManager();
updateProgress(true);
// HBASE-14802 If we have not yet notified that we are processing a dead server, do so now.
// This adds server to the DeadServer processing list but not to the DeadServers list.
// Server gets removed from processing list below on procedure successful finish.
if (!notifiedDeadServer) {
notifiedDeadServer = true;
}
switch (state) {
case SERVER_CRASH_START:
case SERVER_CRASH_SPLIT_META_LOGS:
case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
case SERVER_CRASH_ASSIGN_META:
break;
default:
// If hbase:meta is not assigned, yield.
if (env.getAssignmentManager().waitMetaLoaded(this)) {
throw new ProcedureSuspendedException();
}
}
try {
switch (state) {
case SERVER_CRASH_START:
LOG.info("Start " + this);
// If carrying meta, process it first. Else, get list of regions on crashed server.
if (this.carryingMeta) {
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
} else {
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
}
break;
case SERVER_CRASH_SPLIT_META_LOGS:
if (
env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
) {
zkCoordinatedSplitMetaLogs(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
} else {
am.getRegionStates().metaLogSplitting(serverName);
addChildProcedure(createSplittingWalProcedures(env, true));
setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR);
}
break;
case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR:
if (isSplittingDone(env, true)) {
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META);
am.getRegionStates().metaLogSplit(serverName);
} else {
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS);
}
break;
case SERVER_CRASH_ASSIGN_META:
assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO));
setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS);
break;
case SERVER_CRASH_GET_REGIONS:
this.regionsOnCrashedServer = getRegionsOnCrashedServer(env);
// Where to go next? Depends on whether we should split logs at all or
// if we should do distributed log splitting.
if (regionsOnCrashedServer != null) {
LOG.info("{} had {} regions", serverName, regionsOnCrashedServer.size());
if (LOG.isTraceEnabled()) {
this.regionsOnCrashedServer.stream().forEach(ri -> LOG.trace(ri.getShortNameToLog()));
}
}
if (!this.shouldSplitWal) {
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
} else {
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
}
break;
case SERVER_CRASH_SPLIT_LOGS:
if (
env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)
) {
zkCoordinatedSplitLogs(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
} else {
am.getRegionStates().logSplitting(this.serverName);
addChildProcedure(createSplittingWalProcedures(env, false));
setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_WALS_DIR);
}
break;
case SERVER_CRASH_DELETE_SPLIT_WALS_DIR:
if (isSplittingDone(env, false)) {
cleanupSplitDir(env);
setNextState(ServerCrashState.SERVER_CRASH_ASSIGN);
am.getRegionStates().logSplit(this.serverName);
} else {
setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS);
}
break;
case SERVER_CRASH_ASSIGN:
// If no regions to assign, skip assign and skip to the finish.
// Filter out meta regions. Those are handled elsewhere in this procedure.
// Filter changes this.regionsOnCrashedServer.
if (filterDefaultMetaRegions()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Assigning regions " + RegionInfo.getShortNameToLog(regionsOnCrashedServer)
+ ", " + this + "; cycles=" + getCycles());
}
assignRegions(env, regionsOnCrashedServer);
}
// If there is no replication peer, we do not need to enter the claim queues stage.
// This is also very important that now we will later initialize ReplicationQueueStorage
// so if there is no replication peer added yet, the storage can not be accessed.
// And there will be no race because:
// 1. For adding replication peer, if the peer storage has not been updated yet, the crash
// region server will not have any replication queues for this peer, so it is safe to skip
// claiming.
// 2. For removing replication peer, it it has already updated the peer storage, then
// there is no way to rollback and region servers are already started to close and delete
// replication queues, so it is also safe to skip claiming.
if (env.getReplicationPeerManager().listPeers(null).isEmpty()) {
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
} else {
setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
}
break;
case SERVER_CRASH_HANDLE_RIT2:
// Noop. Left in place because we used to call handleRIT here for a second time
// but no longer necessary since HBASE-20634.
if (env.getReplicationPeerManager().listPeers(null).isEmpty()) {
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
} else {
setNextState(ServerCrashState.SERVER_CRASH_CLAIM_REPLICATION_QUEUES);
}
break;
case SERVER_CRASH_CLAIM_REPLICATION_QUEUES:
if (
env.getMasterServices().getProcedures().stream()
.filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
.anyMatch(p -> !p.isFinished())
) {
LOG.info("There is a pending {}, will retry claim replication queue later",
MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName());
suspend(10_000, true);
return Flow.NO_MORE_STATE;
}
addChildProcedure(new AssignReplicationQueuesProcedure(serverName));
setNextState(ServerCrashState.SERVER_CRASH_FINISH);
break;
case SERVER_CRASH_FINISH:
LOG.info("removed crashed server {} after splitting done", serverName);
services.getAssignmentManager().getRegionStates().removeServer(serverName);
updateProgress(true);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
} catch (IOException e) {
LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + getCycles(), e);
}
return Flow.HAS_MORE_STATE;
}