public void stopReplicas()

in fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java [550:628]


    public void stopReplicas(
            int requestCoordinatorEpoch,
            List<StopReplicaData> stopReplicaDataList,
            Consumer<List<StopReplicaResultForBucket>> responseCallback) {
        List<StopReplicaResultForBucket> result = new ArrayList<>();
        inLock(
                replicaStateChangeLock,
                () -> {
                    // check or apply coordinator epoch.
                    validateAndApplyCoordinatorEpoch(requestCoordinatorEpoch, "stopReplicas");

                    // store the deleted table id and the table dir path to delete the table dir
                    // after delete all the buckets of this table.
                    Map<Long, Path> deletedTableIds = new HashMap<>();
                    // the same to partition id and partition dir path
                    Map<Long, Path> deletedPartitionIds = new HashMap<>();

                    for (StopReplicaData data : stopReplicaDataList) {
                        TableBucket tb = data.getTableBucket();
                        HostedReplica hostedReplica = getReplica(tb);
                        if (hostedReplica instanceof NoneReplica) {
                            // do nothing fort this case.
                            result.add(new StopReplicaResultForBucket(tb));
                        } else if (hostedReplica instanceof OfflineReplica) {
                            LOG.warn(
                                    "Ignoring stopReplica request for table bucket {} as the local replica is offline",
                                    tb);
                            result.add(
                                    new StopReplicaResultForBucket(
                                            tb,
                                            Errors.LOG_STORAGE_EXCEPTION,
                                            "local replica is offline"));
                        } else if (hostedReplica instanceof OnlineReplica) {
                            Replica replica = ((OnlineReplica) hostedReplica).getReplica();
                            int requestLeaderEpoch = data.getLeaderEpoch();
                            int currentLeaderEpoch = replica.getLeaderEpoch();
                            if (requestLeaderEpoch < currentLeaderEpoch) {
                                String errorMessage =
                                        String.format(
                                                "invalid leader epoch %s in stop replica request, "
                                                        + "The latest known leader epoch is %s for table bucket %s.",
                                                requestLeaderEpoch, currentLeaderEpoch, tb);
                                LOG.warn(
                                        "Ignore the stop replica request because {}", errorMessage);
                                result.add(
                                        new StopReplicaResultForBucket(
                                                tb,
                                                Errors.FENCED_LEADER_EPOCH_EXCEPTION,
                                                errorMessage));
                            } else {
                                try {
                                    result.add(
                                            stopReplica(
                                                    tb,
                                                    data.isDelete(),
                                                    deletedTableIds,
                                                    deletedPartitionIds));
                                } catch (Exception e) {
                                    LOG.error(
                                            "Error processing stopReplica operation on hostedReplica {}",
                                            tb,
                                            e);
                                    result.add(
                                            new StopReplicaResultForBucket(
                                                    tb, ApiError.fromThrowable(e)));
                                }
                            }
                        }
                    }

                    // must delete partition dir first, then table dir
                    deletedPartitionIds.forEach(
                            (id, dir) -> dropEmptyTableOrPartitionDir(dir, id, "partition"));
                    deletedTableIds.forEach(
                            (id, dir) -> dropEmptyTableOrPartitionDir(dir, id, "table"));
                });

        responseCallback.accept(result);
    }