private void doHandleStateChanges()

in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachine.java [201:332]


    private void doHandleStateChanges(
            Collection<TableBucketReplica> replicas, ReplicaState targetState) {
        replicas.forEach(
                replica ->
                        coordinatorContext.putReplicaStateIfNotExists(
                                replica, ReplicaState.NonExistentReplica));
        Collection<TableBucketReplica> validReplicas =
                checkValidReplicaStateChange(replicas, targetState);
        switch (targetState) {
            case NewReplica:
                validReplicas.forEach(replica -> doStateChange(replica, targetState));
                break;
            case OnlineReplica:
                validReplicas.forEach(
                        replica -> {
                            ReplicaState currentState = coordinatorContext.getReplicaState(replica);
                            if (currentState != ReplicaState.NewReplica) {
                                TableBucket tableBucket = replica.getTableBucket();
                                String partitionName;
                                try {
                                    partitionName = getPartitionName(tableBucket);
                                } catch (PartitionNotExistException e) {
                                    LOG.error(e.getMessage());
                                    logFailedSateChange(replica, currentState, targetState);
                                    return;
                                }

                                coordinatorContext
                                        .getBucketLeaderAndIsr(tableBucket)
                                        .ifPresent(
                                                leaderAndIsr -> {
                                                    // send leader request to the replica server
                                                    coordinatorRequestBatch
                                                            .addNotifyLeaderRequestForTabletServers(
                                                                    Collections.singleton(
                                                                            replica.getReplica()),
                                                                    PhysicalTablePath.of(
                                                                            coordinatorContext
                                                                                    .getTablePathById(
                                                                                            tableBucket
                                                                                                    .getTableId()),
                                                                            partitionName),
                                                                    replica.getTableBucket(),
                                                                    coordinatorContext
                                                                            .getAssignment(
                                                                                    tableBucket),
                                                                    leaderAndIsr);
                                                });
                            }
                            doStateChange(replica, targetState);
                        });
                break;
            case OfflineReplica:
                // first, send stop replica request to servers
                validReplicas.forEach(
                        replica ->
                                coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
                                        Collections.singleton(replica.getReplica()),
                                        replica.getTableBucket(),
                                        false,
                                        coordinatorContext.getBucketLeaderEpoch(
                                                replica.getTableBucket())));

                // then, may remove the offline replica from isr
                Map<TableBucketReplica, LeaderAndIsr> adjustedLeaderAndIsr =
                        doRemoveReplicaFromIsr(validReplicas);
                // notify leader and isr changes
                for (Map.Entry<TableBucketReplica, LeaderAndIsr> leaderAndIsrEntry :
                        adjustedLeaderAndIsr.entrySet()) {
                    TableBucketReplica tableBucketReplica = leaderAndIsrEntry.getKey();
                    TableBucket tableBucket = tableBucketReplica.getTableBucket();
                    LeaderAndIsr leaderAndIsr = leaderAndIsrEntry.getValue();
                    if (!coordinatorContext.isToBeDeleted(tableBucket)) {
                        Set<Integer> recipients =
                                coordinatorContext.getAssignment(tableBucket).stream()
                                        .filter(
                                                replica ->
                                                        replica != tableBucketReplica.getReplica())
                                        .collect(Collectors.toSet());
                        String partitionName;
                        try {
                            partitionName = getPartitionName(tableBucket);
                        } catch (PartitionNotExistException e) {
                            LOG.error(e.getMessage());
                            logFailedSateChange(
                                    tableBucketReplica,
                                    coordinatorContext.getReplicaState(tableBucketReplica),
                                    targetState);
                            continue;
                        }
                        // send leader request to the replica server
                        coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
                                recipients,
                                PhysicalTablePath.of(
                                        coordinatorContext.getTablePathById(
                                                tableBucket.getTableId()),
                                        partitionName),
                                tableBucket,
                                coordinatorContext.getAssignment(tableBucket),
                                leaderAndIsr);
                    }
                }

                // finally, set to offline
                validReplicas.forEach(
                        replica -> doStateChange(replica, ReplicaState.OfflineReplica));

                break;
            case ReplicaDeletionStarted:
                validReplicas.forEach(
                        replica -> doStateChange(replica, ReplicaState.ReplicaDeletionStarted));
                // send stop replica request with delete = true
                validReplicas.forEach(
                        tableBucketReplica -> {
                            int replicaServer = tableBucketReplica.getReplica();
                            coordinatorRequestBatch.addStopReplicaRequestForTabletServers(
                                    Collections.singleton(replicaServer),
                                    tableBucketReplica.getTableBucket(),
                                    true,
                                    coordinatorContext.getBucketLeaderEpoch(
                                            tableBucketReplica.getTableBucket()));
                        });
                break;
            case ReplicaDeletionSuccessful:
                validReplicas.forEach(
                        replica -> doStateChange(replica, ReplicaState.ReplicaDeletionSuccessful));
                break;
            case NonExistentReplica:
                validReplicas.forEach(replica -> doStateChange(replica, null));
                break;
        }
    }