public Map canAllPartitionsMovedOut()

in hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java [703:831]


    public Map<String, Object> canAllPartitionsMovedOut(Metapb.Store sourceStore) throws
                                                                                  PDException {
        if (!isLeader()) {
            return null;
        }
        // Analyze whether the partition on a store can be completely checked out
        Map<String, Object> resultMap = new HashMap<>();
        // The definition object is used to hold the partition above the source store StoreId
        // ->PartitionID, ShardRole
        Map<Long, Map<Integer, Metapb.ShardRole>> sourcePartitionMap = new HashMap<>();
        sourcePartitionMap.put(sourceStore.getId(), new HashMap<>());
        // The definition object is used to hold the partition above the other active stores
        // StoreId ->PartitionID, ShardRole
        Map<Long, Map<Integer, Metapb.ShardRole>> otherPartitionMap = new HashMap<>();
        // The amount of disk space remaining for each store
        Map<Long, Long> availableDiskSpace = new HashMap<>();
        // Record the amount of data in the partition to be migrated
        Map<Integer, Long> partitionDataSize = new HashMap<>();

        storeService.getActiveStores().forEach(store -> {
            if (store.getId() != sourceStore.getId()) {
                otherPartitionMap.put(store.getId(), new HashMap<>());
                // Records the remaining disk space of other stores, in bytes
                availableDiskSpace.put(store.getId(), store.getStats().getAvailable());
            } else {
                resultMap.put("current_store_is_online", true);
            }
        });
        // Count the size of the partition to be migrated (from storeStats in KB)
        for (Metapb.GraphStats graphStats : sourceStore.getStats().getGraphStatsList()) {
            partitionDataSize.put(graphStats.getPartitionId(),
                                  partitionDataSize.getOrDefault(graphStats.getPartitionId(), 0L)
                                  + graphStats.getApproximateSize());
        }
        // Assign values to sourcePartitionMap and otherPartitionMap
        partitionService.getPartitions().forEach(partition -> {
            try {
                storeService.getShardList(partition.getId()).forEach(shard -> {
                    long storeId = shard.getStoreId();
                    if (storeId == sourceStore.getId()) {
                        sourcePartitionMap.get(storeId).put(partition.getId(), shard.getRole());
                    } else {
                        if (otherPartitionMap.containsKey(storeId)) {
                            otherPartitionMap.get(storeId).put(partition.getId(), shard.getRole());
                        }
                    }

                });
            } catch (PDException e) {
                throw new RuntimeException(e);
            }
        });
        // Count the partitions to be removed: all partitions on the source store
        Map<Integer, KVPair<Long, Long>> movedPartitions = new HashMap<>();
        for (Map.Entry<Integer, Metapb.ShardRole> entry : sourcePartitionMap.get(
                sourceStore.getId()).entrySet()) {
            movedPartitions.put(entry.getKey(), new KVPair<>(sourceStore.getId(), 0L));
        }
        // Count the number of partitions of other stores and save them with a small top heap, so
        // that stores with fewer partitions are always prioritized
        PriorityQueue<KVPair<Long, Integer>> minHeap = new PriorityQueue<>(otherPartitionMap.size(),
                                                                           (o1, o2) -> o1.getValue()
                                                                                         .compareTo(
                                                                                                 o2.getValue()));
        otherPartitionMap.forEach((storeId, shards) -> {
            minHeap.add(new KVPair(storeId, shards.size()));
        });
        // Traverse the partitions to be migrated, and prioritize the migration to the store with
        // fewer partitions
        Iterator<Map.Entry<Integer, KVPair<Long, Long>>> moveIterator =
                movedPartitions.entrySet().iterator();
        while (moveIterator.hasNext()) {
            Map.Entry<Integer, KVPair<Long, Long>> moveEntry = moveIterator.next();
            int partitionId = moveEntry.getKey();
            // Record the elements that have popped up in the priority
            List<KVPair<Long, Integer>> tmpList = new ArrayList<>();
            while (minHeap.size() > 0) {
                KVPair<Long, Integer> pair = minHeap.poll(); // The first element pops up
                long storeId = pair.getKey();
                int partitionCount = pair.getValue();
                Map<Integer, Metapb.ShardRole> shards = otherPartitionMap.get(storeId);
                final int unitRate = 1024; // Balance the feed rate of different storage units
                if ((!shards.containsKey(partitionId)) && (
                        availableDiskSpace.getOrDefault(storeId, 0L) / unitRate >=
                        partitionDataSize.getOrDefault(partitionId, 0L))) {
                    // If the partition is not included on the destination store and the
                    // remaining space of the destination store can accommodate the partition,
                    // the migration is performed
                    moveEntry.getValue().setValue(storeId); // Set the target store for the move
                    log.info("plan to move partition {} to store {}, " +
                             "available disk space {}, current partitionSize:{}",
                             partitionId,
                             storeId,
                             availableDiskSpace.getOrDefault(storeId, 0L) / unitRate,
                             partitionDataSize.getOrDefault(partitionId, 0L)
                    );
                    // Update the expected remaining space for the store
                    availableDiskSpace.put(storeId, availableDiskSpace.getOrDefault(storeId, 0L)
                                                    - partitionDataSize.getOrDefault(partitionId,
                                                                                     0L) *
                                                      unitRate);
                    // Update the number of partitions for that store in the stat variable
                    partitionCount += 1;
                    pair.setValue(partitionCount);
                    tmpList.add(pair);
                    break;
                } else {
                    tmpList.add(pair);
                }
            }
            minHeap.addAll(tmpList);
        }
        // Check that there are no partitions that don't have a target store assigned
        List<Integer> remainPartitions = new ArrayList<>();
        movedPartitions.forEach((partId, storePair) -> {
            if (storePair.getValue() == 0L) {
                remainPartitions.add(partId);
            }
        });
        if (remainPartitions.size() > 0) {
            resultMap.put("flag", false);
            resultMap.put("movedPartitions", null);
        } else {
            resultMap.put("flag", true);
            resultMap.put("movedPartitions", movedPartitions);
        }
        return resultMap;

    }