in fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java [550:628]
public void stopReplicas(
int requestCoordinatorEpoch,
List<StopReplicaData> stopReplicaDataList,
Consumer<List<StopReplicaResultForBucket>> responseCallback) {
List<StopReplicaResultForBucket> result = new ArrayList<>();
inLock(
replicaStateChangeLock,
() -> {
// check or apply coordinator epoch.
validateAndApplyCoordinatorEpoch(requestCoordinatorEpoch, "stopReplicas");
// store the deleted table id and the table dir path to delete the table dir
// after delete all the buckets of this table.
Map<Long, Path> deletedTableIds = new HashMap<>();
// the same to partition id and partition dir path
Map<Long, Path> deletedPartitionIds = new HashMap<>();
for (StopReplicaData data : stopReplicaDataList) {
TableBucket tb = data.getTableBucket();
HostedReplica hostedReplica = getReplica(tb);
if (hostedReplica instanceof NoneReplica) {
// do nothing fort this case.
result.add(new StopReplicaResultForBucket(tb));
} else if (hostedReplica instanceof OfflineReplica) {
LOG.warn(
"Ignoring stopReplica request for table bucket {} as the local replica is offline",
tb);
result.add(
new StopReplicaResultForBucket(
tb,
Errors.LOG_STORAGE_EXCEPTION,
"local replica is offline"));
} else if (hostedReplica instanceof OnlineReplica) {
Replica replica = ((OnlineReplica) hostedReplica).getReplica();
int requestLeaderEpoch = data.getLeaderEpoch();
int currentLeaderEpoch = replica.getLeaderEpoch();
if (requestLeaderEpoch < currentLeaderEpoch) {
String errorMessage =
String.format(
"invalid leader epoch %s in stop replica request, "
+ "The latest known leader epoch is %s for table bucket %s.",
requestLeaderEpoch, currentLeaderEpoch, tb);
LOG.warn(
"Ignore the stop replica request because {}", errorMessage);
result.add(
new StopReplicaResultForBucket(
tb,
Errors.FENCED_LEADER_EPOCH_EXCEPTION,
errorMessage));
} else {
try {
result.add(
stopReplica(
tb,
data.isDelete(),
deletedTableIds,
deletedPartitionIds));
} catch (Exception e) {
LOG.error(
"Error processing stopReplica operation on hostedReplica {}",
tb,
e);
result.add(
new StopReplicaResultForBucket(
tb, ApiError.fromThrowable(e)));
}
}
}
}
// must delete partition dir first, then table dir
deletedPartitionIds.forEach(
(id, dir) -> dropEmptyTableOrPartitionDir(dir, id, "partition"));
deletedTableIds.forEach(
(id, dir) -> dropEmptyTableOrPartitionDir(dir, id, "table"));
});
responseCallback.accept(result);
}