in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachine.java [201:332]
private void doHandleStateChanges(
Collection<TableBucketReplica> replicas, ReplicaState targetState) {
replicas.forEach(
replica ->
coordinatorContext.putReplicaStateIfNotExists(
replica, ReplicaState.NonExistentReplica));
Collection<TableBucketReplica> validReplicas =
checkValidReplicaStateChange(replicas, targetState);
switch (targetState) {
case NewReplica:
validReplicas.forEach(replica -> doStateChange(replica, targetState));
break;
case OnlineReplica:
validReplicas.forEach(
replica -> {
ReplicaState currentState = coordinatorContext.getReplicaState(replica);
if (currentState != ReplicaState.NewReplica) {
TableBucket tableBucket = replica.getTableBucket();
String partitionName;
try {
partitionName = getPartitionName(tableBucket);
} catch (PartitionNotExistException e) {
LOG.error(e.getMessage());
logFailedSateChange(replica, currentState, targetState);
return;
}
coordinatorContext
.getBucketLeaderAndIsr(tableBucket)
.ifPresent(
leaderAndIsr -> {
// send leader request to the replica server
coordinatorRequestBatch
.addNotifyLeaderRequestForTabletServers(
Collections.singleton(
replica.getReplica()),
PhysicalTablePath.of(
coordinatorContext
.getTablePathById(
tableBucket
.getTableId()),
partitionName),
replica.getTableBucket(),
coordinatorContext
.getAssignment(
tableBucket),
leaderAndIsr);
});
}
doStateChange(replica, targetState);
});
break;
case OfflineReplica:
// first, send stop replica request to servers
validReplicas.forEach(
replica ->
coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
Collections.singleton(replica.getReplica()),
replica.getTableBucket(),
false,
coordinatorContext.getBucketLeaderEpoch(
replica.getTableBucket())));
// then, may remove the offline replica from isr
Map<TableBucketReplica, LeaderAndIsr> adjustedLeaderAndIsr =
doRemoveReplicaFromIsr(validReplicas);
// notify leader and isr changes
for (Map.Entry<TableBucketReplica, LeaderAndIsr> leaderAndIsrEntry :
adjustedLeaderAndIsr.entrySet()) {
TableBucketReplica tableBucketReplica = leaderAndIsrEntry.getKey();
TableBucket tableBucket = tableBucketReplica.getTableBucket();
LeaderAndIsr leaderAndIsr = leaderAndIsrEntry.getValue();
if (!coordinatorContext.isToBeDeleted(tableBucket)) {
Set<Integer> recipients =
coordinatorContext.getAssignment(tableBucket).stream()
.filter(
replica ->
replica != tableBucketReplica.getReplica())
.collect(Collectors.toSet());
String partitionName;
try {
partitionName = getPartitionName(tableBucket);
} catch (PartitionNotExistException e) {
LOG.error(e.getMessage());
logFailedSateChange(
tableBucketReplica,
coordinatorContext.getReplicaState(tableBucketReplica),
targetState);
continue;
}
// send leader request to the replica server
coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
recipients,
PhysicalTablePath.of(
coordinatorContext.getTablePathById(
tableBucket.getTableId()),
partitionName),
tableBucket,
coordinatorContext.getAssignment(tableBucket),
leaderAndIsr);
}
}
// finally, set to offline
validReplicas.forEach(
replica -> doStateChange(replica, ReplicaState.OfflineReplica));
break;
case ReplicaDeletionStarted:
validReplicas.forEach(
replica -> doStateChange(replica, ReplicaState.ReplicaDeletionStarted));
// send stop replica request with delete = true
validReplicas.forEach(
tableBucketReplica -> {
int replicaServer = tableBucketReplica.getReplica();
coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
Collections.singleton(replicaServer),
tableBucketReplica.getTableBucket(),
true,
coordinatorContext.getBucketLeaderEpoch(
tableBucketReplica.getTableBucket()));
});
break;
case ReplicaDeletionSuccessful:
validReplicas.forEach(
replica -> doStateChange(replica, ReplicaState.ReplicaDeletionSuccessful));
break;
case NonExistentReplica:
validReplicas.forEach(replica -> doStateChange(replica, null));
break;
}
}