in hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java [703:831]
public Map<String, Object> canAllPartitionsMovedOut(Metapb.Store sourceStore) throws
PDException {
if (!isLeader()) {
return null;
}
// Analyze whether the partition on a store can be completely checked out
Map<String, Object> resultMap = new HashMap<>();
// The definition object is used to hold the partition above the source store StoreId
// ->PartitionID, ShardRole
Map<Long, Map<Integer, Metapb.ShardRole>> sourcePartitionMap = new HashMap<>();
sourcePartitionMap.put(sourceStore.getId(), new HashMap<>());
// The definition object is used to hold the partition above the other active stores
// StoreId ->PartitionID, ShardRole
Map<Long, Map<Integer, Metapb.ShardRole>> otherPartitionMap = new HashMap<>();
// The amount of disk space remaining for each store
Map<Long, Long> availableDiskSpace = new HashMap<>();
// Record the amount of data in the partition to be migrated
Map<Integer, Long> partitionDataSize = new HashMap<>();
storeService.getActiveStores().forEach(store -> {
if (store.getId() != sourceStore.getId()) {
otherPartitionMap.put(store.getId(), new HashMap<>());
// Records the remaining disk space of other stores, in bytes
availableDiskSpace.put(store.getId(), store.getStats().getAvailable());
} else {
resultMap.put("current_store_is_online", true);
}
});
// Count the size of the partition to be migrated (from storeStats in KB)
for (Metapb.GraphStats graphStats : sourceStore.getStats().getGraphStatsList()) {
partitionDataSize.put(graphStats.getPartitionId(),
partitionDataSize.getOrDefault(graphStats.getPartitionId(), 0L)
+ graphStats.getApproximateSize());
}
// Assign values to sourcePartitionMap and otherPartitionMap
partitionService.getPartitions().forEach(partition -> {
try {
storeService.getShardList(partition.getId()).forEach(shard -> {
long storeId = shard.getStoreId();
if (storeId == sourceStore.getId()) {
sourcePartitionMap.get(storeId).put(partition.getId(), shard.getRole());
} else {
if (otherPartitionMap.containsKey(storeId)) {
otherPartitionMap.get(storeId).put(partition.getId(), shard.getRole());
}
}
});
} catch (PDException e) {
throw new RuntimeException(e);
}
});
// Count the partitions to be removed: all partitions on the source store
Map<Integer, KVPair<Long, Long>> movedPartitions = new HashMap<>();
for (Map.Entry<Integer, Metapb.ShardRole> entry : sourcePartitionMap.get(
sourceStore.getId()).entrySet()) {
movedPartitions.put(entry.getKey(), new KVPair<>(sourceStore.getId(), 0L));
}
// Count the number of partitions of other stores and save them with a small top heap, so
// that stores with fewer partitions are always prioritized
PriorityQueue<KVPair<Long, Integer>> minHeap = new PriorityQueue<>(otherPartitionMap.size(),
(o1, o2) -> o1.getValue()
.compareTo(
o2.getValue()));
otherPartitionMap.forEach((storeId, shards) -> {
minHeap.add(new KVPair(storeId, shards.size()));
});
// Traverse the partitions to be migrated, and prioritize the migration to the store with
// fewer partitions
Iterator<Map.Entry<Integer, KVPair<Long, Long>>> moveIterator =
movedPartitions.entrySet().iterator();
while (moveIterator.hasNext()) {
Map.Entry<Integer, KVPair<Long, Long>> moveEntry = moveIterator.next();
int partitionId = moveEntry.getKey();
// Record the elements that have popped up in the priority
List<KVPair<Long, Integer>> tmpList = new ArrayList<>();
while (minHeap.size() > 0) {
KVPair<Long, Integer> pair = minHeap.poll(); // The first element pops up
long storeId = pair.getKey();
int partitionCount = pair.getValue();
Map<Integer, Metapb.ShardRole> shards = otherPartitionMap.get(storeId);
final int unitRate = 1024; // Balance the feed rate of different storage units
if ((!shards.containsKey(partitionId)) && (
availableDiskSpace.getOrDefault(storeId, 0L) / unitRate >=
partitionDataSize.getOrDefault(partitionId, 0L))) {
// If the partition is not included on the destination store and the
// remaining space of the destination store can accommodate the partition,
// the migration is performed
moveEntry.getValue().setValue(storeId); // Set the target store for the move
log.info("plan to move partition {} to store {}, " +
"available disk space {}, current partitionSize:{}",
partitionId,
storeId,
availableDiskSpace.getOrDefault(storeId, 0L) / unitRate,
partitionDataSize.getOrDefault(partitionId, 0L)
);
// Update the expected remaining space for the store
availableDiskSpace.put(storeId, availableDiskSpace.getOrDefault(storeId, 0L)
- partitionDataSize.getOrDefault(partitionId,
0L) *
unitRate);
// Update the number of partitions for that store in the stat variable
partitionCount += 1;
pair.setValue(partitionCount);
tmpList.add(pair);
break;
} else {
tmpList.add(pair);
}
}
minHeap.addAll(tmpList);
}
// Check that there are no partitions that don't have a target store assigned
List<Integer> remainPartitions = new ArrayList<>();
movedPartitions.forEach((partId, storePair) -> {
if (storePair.getValue() == 0L) {
remainPartitions.add(partId);
}
});
if (remainPartitions.size() > 0) {
resultMap.put("flag", false);
resultMap.put("movedPartitions", null);
} else {
resultMap.put("flag", true);
resultMap.put("movedPartitions", movedPartitions);
}
return resultMap;
}