private void doHandleStateChange()

in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachine.java [168:243]


    private void doHandleStateChange(TableBucket tableBucket, BucketState targetState) {
        coordinatorContext.putBucketStateIfNotExists(tableBucket, BucketState.NonExistentBucket);
        if (!checkValidTableBucketStateChange(tableBucket, targetState)) {
            return;
        }
        switch (targetState) {
            case NewBucket:
                doStateChange(tableBucket, targetState);
                break;
            case OnlineBucket:
                BucketState currentState = coordinatorContext.getBucketState(tableBucket);
                String partitionName = null;
                if (tableBucket.getPartitionId() != null) {
                    partitionName =
                            coordinatorContext.getPartitionName(tableBucket.getPartitionId());
                    if (partitionName == null) {
                        LOG.error(
                                "Can't find partition name for partition: {}.",
                                tableBucket.getBucket());
                        logFailedStateChange(tableBucket, currentState, targetState);
                        return;
                    }
                }
                if (currentState == BucketState.NewBucket) {
                    List<Integer> assignedServers = coordinatorContext.getAssignment(tableBucket);
                    // init the leader for table bucket
                    Optional<ElectionResult> optionalElectionResult =
                            initLeaderForTableBuckets(tableBucket, assignedServers);
                    if (!optionalElectionResult.isPresent()) {
                        logFailedStateChange(tableBucket, currentState, targetState);
                    } else {
                        // transmit state
                        doStateChange(tableBucket, targetState);
                        // then send request to the tablet servers
                        coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
                                new HashSet<>(optionalElectionResult.get().liveReplicas),
                                PhysicalTablePath.of(
                                        coordinatorContext.getTablePathById(
                                                tableBucket.getTableId()),
                                        partitionName),
                                tableBucket,
                                coordinatorContext.getAssignment(tableBucket),
                                optionalElectionResult.get().leaderAndIsr);
                    }
                } else {
                    // current state is Online or Offline
                    // not new bucket, we then need to update leader/epoch for the bucket
                    Optional<ElectionResult> optionalElectionResult =
                            electNewLeaderForTableBuckets(tableBucket);
                    if (!optionalElectionResult.isPresent()) {
                        logFailedStateChange(tableBucket, currentState, targetState);
                    } else {
                        // transmit state
                        doStateChange(tableBucket, targetState);
                        ElectionResult electionResult = optionalElectionResult.get();
                        // then send request to the tablet servers
                        coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
                                new HashSet<>(electionResult.liveReplicas),
                                PhysicalTablePath.of(
                                        coordinatorContext.getTablePathById(
                                                tableBucket.getTableId()),
                                        partitionName),
                                tableBucket,
                                coordinatorContext.getAssignment(tableBucket),
                                electionResult.leaderAndIsr);
                    }
                }
                break;
            case OfflineBucket:
                doStateChange(tableBucket, targetState);
                break;
            case NonExistentBucket:
                doStateChange(tableBucket, null);
                break;
        }
    }