in master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java [342:453]
private static List<Integer> roundRobin(
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots,
List<Integer> partitionIds,
List<WorkerInfo> workers,
Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions,
boolean shouldReplicate,
boolean shouldRackAware,
int availableStorageTypes) {
// workerInfo -> (diskIndexForPrimaryAndReplica)
Map<WorkerInfo, Integer> workerDiskIndex = new HashMap<>();
List<Integer> partitionIdList = new LinkedList<>(partitionIds);
final int workerSize = workers.size();
final IntUnaryOperator incrementIndex = v -> (v + 1) % workerSize;
int primaryIndex = rand.nextInt(workerSize);
int replicaIndex = rand.nextInt(workerSize);
ListIterator<Integer> iter = partitionIdList.listIterator(partitionIdList.size());
// Iterate from the end to preserve O(1) removal of processed partitions.
// This is important when we have a high number of concurrent apps that have a
// high number of partitions.
outer:
while (iter.hasPrevious()) {
int nextPrimaryInd = primaryIndex;
int partitionId = iter.previous();
StorageInfo storageInfo;
if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) {
// this means that we'll select a mount point
while (!haveUsableSlots(slotsRestrictions, workers, nextPrimaryInd)) {
nextPrimaryInd = incrementIndex.applyAsInt(nextPrimaryInd);
if (nextPrimaryInd == primaryIndex) {
break outer;
}
}
storageInfo =
getStorageInfo(
workers, nextPrimaryInd, slotsRestrictions, workerDiskIndex, availableStorageTypes);
} else {
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
while (!workers.get(nextPrimaryInd).haveDisk()) {
nextPrimaryInd = incrementIndex.applyAsInt(nextPrimaryInd);
if (nextPrimaryInd == primaryIndex) {
break outer;
}
}
}
storageInfo =
getStorageInfo(workers, nextPrimaryInd, null, workerDiskIndex, availableStorageTypes);
}
PartitionLocation primaryPartition =
createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true);
if (shouldReplicate) {
int nextReplicaInd = replicaIndex;
if (slotsRestrictions != null) {
while (nextReplicaInd == nextPrimaryInd
|| !haveUsableSlots(slotsRestrictions, workers, nextReplicaInd)
|| !satisfyRackAware(shouldRackAware, workers, nextPrimaryInd, nextReplicaInd)) {
nextReplicaInd = incrementIndex.applyAsInt(nextReplicaInd);
if (nextReplicaInd == replicaIndex) {
break outer;
}
}
storageInfo =
getStorageInfo(
workers,
nextReplicaInd,
slotsRestrictions,
workerDiskIndex,
availableStorageTypes);
} else if (shouldRackAware) {
while (nextReplicaInd == nextPrimaryInd
|| !satisfyRackAware(true, workers, nextPrimaryInd, nextReplicaInd)) {
nextReplicaInd = incrementIndex.applyAsInt(nextReplicaInd);
if (nextReplicaInd == replicaIndex) {
break outer;
}
}
} else {
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
while (nextReplicaInd == nextPrimaryInd || !workers.get(nextReplicaInd).haveDisk()) {
nextReplicaInd = incrementIndex.applyAsInt(nextReplicaInd);
if (nextReplicaInd == replicaIndex) {
break outer;
}
}
}
storageInfo =
getStorageInfo(workers, nextReplicaInd, null, workerDiskIndex, availableStorageTypes);
}
PartitionLocation replicaPartition =
createLocation(
partitionId, workers.get(nextReplicaInd), primaryPartition, storageInfo, false);
primaryPartition.setPeer(replicaPartition);
Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations =
slots.computeIfAbsent(
workers.get(nextReplicaInd),
v -> new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
locations._2.add(replicaPartition);
replicaIndex = incrementIndex.applyAsInt(nextReplicaInd);
}
Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations =
slots.computeIfAbsent(
workers.get(nextPrimaryInd), v -> new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
locations._1.add(primaryPartition);
primaryIndex = incrementIndex.applyAsInt(nextPrimaryInd);
iter.remove();
}
return partitionIdList;
}