in coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java [63:145]
public PartitionRangeAssignment assign(
int totalPartitionNum,
int partitionNumPerRange,
int replica,
Set<String> requiredTags,
int requiredShuffleServerNumber,
int estimateTaskConcurrency) {
if (partitionNumPerRange != 1) {
throw new RssException("PartitionNumPerRange must be one");
}
SortedMap<PartitionRange, List<ServerNode>> assignments;
synchronized (this) {
List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos = JavaUtils.newConcurrentMap();
for (ServerNode node : nodes) {
newPartitionInfos.computeIfAbsent(
node,
key -> {
PartitionAssignmentInfo partitionInfo;
if (serverToPartitions.containsKey(node)) {
partitionInfo = serverToPartitions.get(node);
if (partitionInfo.getTimestamp() < node.getTimestamp()) {
partitionInfo.resetPartitionNum();
partitionInfo.setTimestamp(node.getTimestamp());
}
} else {
partitionInfo = new PartitionAssignmentInfo();
}
return partitionInfo;
});
}
serverToPartitions = newPartitionInfos;
int averagePartitions = totalPartitionNum * replica / clusterManager.getShuffleNodesMax();
int assignPartitions = Math.max(averagePartitions, 1);
nodes.sort(
new Comparator<ServerNode>() {
@Override
public int compare(ServerNode o1, ServerNode o2) {
PartitionAssignmentInfo partitionInfo1 = serverToPartitions.get(o1);
PartitionAssignmentInfo partitionInfo2 = serverToPartitions.get(o2);
double v1 =
o1.getAvailableMemory()
* 1.0
/ (partitionInfo1.getPartitionNum() + assignPartitions);
double v2 =
o2.getAvailableMemory()
* 1.0
/ (partitionInfo2.getPartitionNum() + assignPartitions);
return Double.compare(v2, v1);
}
});
if (nodes.isEmpty() || nodes.size() < replica) {
throw new RssException("There isn't enough shuffle servers");
}
final int assignmentMaxNum = clusterManager.getShuffleNodesMax();
int expectNum = assignmentMaxNum;
if (requiredShuffleServerNumber < assignmentMaxNum && requiredShuffleServerNumber > 0) {
expectNum = requiredShuffleServerNumber;
}
if (nodes.size() < expectNum) {
LOG.warn("Can't get expected servers [{}] and found only [{}]", expectNum, nodes.size());
expectNum = nodes.size();
}
List<ServerNode> candidatesNodes = getCandidateNodes(nodes, expectNum);
assignments =
getPartitionAssignment(
totalPartitionNum,
partitionNumPerRange,
replica,
candidatesNodes,
estimateTaskConcurrency);
assignments.values().stream()
.flatMap(Collection::stream)
.forEach(server -> serverToPartitions.get(server).incrementPartitionNum());
}
return new PartitionRangeAssignment(assignments);
}