in hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/TransitPeerSyncReplicationStateProcedure.java [234:392]
protected Flow executeFromState(MasterProcedureEnv env,
PeerSyncReplicationStateTransitionState state) throws ProcedureSuspendedException {
switch (state) {
case PRE_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try {
if (
env.getMasterServices().getProcedures().stream()
.filter(p -> p instanceof MigrateReplicationQueueFromZkToTableProcedure)
.anyMatch(p -> !p.isFinished())
) {
LOG.info("There is a pending {}, give up execution of {}",
MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName(),
getClass().getSimpleName());
setFailure("master-transit-peer-sync-replication-state",
new DoNotRetryIOException("There is a pending "
+ MigrateReplicationQueueFromZkToTableProcedure.class.getSimpleName()));
return Flow.NO_MORE_STATE;
}
checkPeerModificationEnabled(env);
preTransit(env);
} catch (IOException e) {
LOG.warn("Failed to call pre CP hook or the pre check is failed for peer {} "
+ "when transiting sync replication peer state to {}, "
+ "mark the procedure as failure and give up", peerId, toState, e);
setFailure("master-transit-peer-sync-replication-state", e);
return Flow.NO_MORE_STATE;
}
setNextState(PeerSyncReplicationStateTransitionState.SET_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
case SET_PEER_NEW_SYNC_REPLICATION_STATE:
try {
setPeerNewSyncReplicationState(env);
} catch (ReplicationException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"Failed to update peer storage for peer {} when starting transiting sync "
+ "replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e));
}
resetRetry();
setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_BEGIN:
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 0))
.toArray(RefreshPeerProcedure[]::new));
setNextStateAfterRefreshBegin();
return Flow.HAS_MORE_STATE;
case REOPEN_ALL_REGIONS_IN_PEER:
reopenRegions(env);
if (fromState.equals(SyncReplicationState.STANDBY)) {
assert serial;
setNextState(
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER);
} else {
setNextState(
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
}
return Flow.HAS_MORE_STATE;
case SYNC_REPLICATION_UPDATE_LAST_PUSHED_SEQ_ID_FOR_SERIAL_PEER:
try {
setLastPushedSequenceId(env, env.getReplicationPeerManager().getPeerConfig(peerId).get());
} catch (Exception e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"Failed to update last pushed sequence id for peer {} when transiting sync "
+ "replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e));
}
resetRetry();
setNextState(PeerSyncReplicationStateTransitionState.REPLAY_REMOTE_WAL_IN_PEER);
return Flow.HAS_MORE_STATE;
case REPLAY_REMOTE_WAL_IN_PEER:
replayRemoteWAL(env.getReplicationPeerManager().getPeerConfig(peerId).get().isSerial());
setNextState(
PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
case REMOVE_ALL_REPLICATION_QUEUES_IN_PEER:
try {
removeAllReplicationQueues(env);
} catch (ReplicationException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"Failed to remove all replication queues peer {} when starting transiting"
+ " sync replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e));
}
resetRetry();
setNextState(fromState.equals(SyncReplicationState.ACTIVE)
? PeerSyncReplicationStateTransitionState.REOPEN_ALL_REGIONS_IN_PEER
: PeerSyncReplicationStateTransitionState.TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE);
return Flow.HAS_MORE_STATE;
case TRANSIT_PEER_NEW_SYNC_REPLICATION_STATE:
try {
transitPeerSyncReplicationState(env);
} catch (ReplicationException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"Failed to update peer storage for peer {} when ending transiting sync "
+ "replication peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e));
}
resetRetry();
setNextState(
PeerSyncReplicationStateTransitionState.REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_SYNC_REPLICATION_STATE_ON_RS_END:
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn, 1))
.toArray(RefreshPeerProcedure[]::new));
setNextStateAfterRefreshEnd();
return Flow.HAS_MORE_STATE;
case SYNC_REPLICATION_SET_PEER_ENABLED:
try {
enablePeer(env);
} catch (ReplicationException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"Failed to set peer enabled for peer {} when transiting sync replication peer "
+ "state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e));
}
resetRetry();
setNextState(
PeerSyncReplicationStateTransitionState.SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case SYNC_REPLICATION_ENABLE_PEER_REFRESH_PEER_ON_RS:
refreshPeer(env, PeerOperationType.ENABLE);
setNextState(PeerSyncReplicationStateTransitionState.CREATE_DIR_FOR_REMOTE_WAL);
return Flow.HAS_MORE_STATE;
case CREATE_DIR_FOR_REMOTE_WAL:
try {
createDirForRemoteWAL(env);
} catch (IOException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"Failed to create remote wal dir for peer {} when transiting sync replication "
+ "peer state from {} to {}, sleep {} secs and retry",
peerId, fromState, toState, backoff / 1000, e));
}
resetRetry();
setNextState(
PeerSyncReplicationStateTransitionState.POST_PEER_SYNC_REPLICATION_STATE_TRANSITION);
return Flow.HAS_MORE_STATE;
case POST_PEER_SYNC_REPLICATION_STATE_TRANSITION:
try {
postTransit(env);
} catch (IOException e) {
LOG.warn(
"Failed to call post CP hook for peer {} when transiting sync replication "
+ "peer state from {} to {}, ignore since the procedure has already done",
peerId, fromState, toState, e);
}
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}