in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachine.java [168:243]
private void doHandleStateChange(TableBucket tableBucket, BucketState targetState) {
coordinatorContext.putBucketStateIfNotExists(tableBucket, BucketState.NonExistentBucket);
if (!checkValidTableBucketStateChange(tableBucket, targetState)) {
return;
}
switch (targetState) {
case NewBucket:
doStateChange(tableBucket, targetState);
break;
case OnlineBucket:
BucketState currentState = coordinatorContext.getBucketState(tableBucket);
String partitionName = null;
if (tableBucket.getPartitionId() != null) {
partitionName =
coordinatorContext.getPartitionName(tableBucket.getPartitionId());
if (partitionName == null) {
LOG.error(
"Can't find partition name for partition: {}.",
tableBucket.getBucket());
logFailedStateChange(tableBucket, currentState, targetState);
return;
}
}
if (currentState == BucketState.NewBucket) {
List<Integer> assignedServers = coordinatorContext.getAssignment(tableBucket);
// init the leader for table bucket
Optional<ElectionResult> optionalElectionResult =
initLeaderForTableBuckets(tableBucket, assignedServers);
if (!optionalElectionResult.isPresent()) {
logFailedStateChange(tableBucket, currentState, targetState);
} else {
// transmit state
doStateChange(tableBucket, targetState);
// then send request to the tablet servers
coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
new HashSet<>(optionalElectionResult.get().liveReplicas),
PhysicalTablePath.of(
coordinatorContext.getTablePathById(
tableBucket.getTableId()),
partitionName),
tableBucket,
coordinatorContext.getAssignment(tableBucket),
optionalElectionResult.get().leaderAndIsr);
}
} else {
// current state is Online or Offline
// not new bucket, we then need to update leader/epoch for the bucket
Optional<ElectionResult> optionalElectionResult =
electNewLeaderForTableBuckets(tableBucket);
if (!optionalElectionResult.isPresent()) {
logFailedStateChange(tableBucket, currentState, targetState);
} else {
// transmit state
doStateChange(tableBucket, targetState);
ElectionResult electionResult = optionalElectionResult.get();
// then send request to the tablet servers
coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
new HashSet<>(electionResult.liveReplicas),
PhysicalTablePath.of(
coordinatorContext.getTablePathById(
tableBucket.getTableId()),
partitionName),
tableBucket,
coordinatorContext.getAssignment(tableBucket),
electionResult.leaderAndIsr);
}
}
break;
case OfflineBucket:
doStateChange(tableBucket, targetState);
break;
case NonExistentBucket:
doStateChange(tableBucket, null);
break;
}
}