in master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java [191:262]
private static List<Integer> roundRobin(
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots,
List<Integer> partitionIds,
List<WorkerInfo> workers,
Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
boolean shouldReplicate,
boolean shouldRackAware) {
// workerInfo -> (diskIndexForPrimary, diskIndexForReplica)
Map<WorkerInfo, Integer> workerDiskIndexForPrimary = new HashMap<>();
Map<WorkerInfo, Integer> workerDiskIndexForReplica = new HashMap<>();
List<Integer> partitionIdList = new ArrayList<>(partitionIds);
int primaryIndex = rand.nextInt(workers.size());
Iterator<Integer> iter = partitionIdList.iterator();
outer:
while (iter.hasNext()) {
int nextPrimayInd = primaryIndex;
int partitionId = iter.next();
StorageInfo storageInfo = new StorageInfo();
if (restrictions != null) {
while (!haveUsableSlots(restrictions, workers, nextPrimayInd)) {
nextPrimayInd = (nextPrimayInd + 1) % workers.size();
if (nextPrimayInd == primaryIndex) {
break outer;
}
}
storageInfo =
getStorageInfo(workers, nextPrimayInd, restrictions, workerDiskIndexForPrimary);
}
PartitionLocation primaryPartition =
createLocation(partitionId, workers.get(nextPrimayInd), null, storageInfo, true);
if (shouldReplicate) {
int nextReplicaInd = (nextPrimayInd + 1) % workers.size();
if (restrictions != null) {
while (!haveUsableSlots(restrictions, workers, nextReplicaInd)
|| !satisfyRackAware(shouldRackAware, workers, nextPrimayInd, nextReplicaInd)) {
nextReplicaInd = (nextReplicaInd + 1) % workers.size();
if (nextReplicaInd == nextPrimayInd) {
break outer;
}
}
storageInfo =
getStorageInfo(workers, nextReplicaInd, restrictions, workerDiskIndexForReplica);
} else if (shouldRackAware) {
while (!satisfyRackAware(true, workers, nextPrimayInd, nextReplicaInd)) {
nextReplicaInd = (nextReplicaInd + 1) % workers.size();
if (nextReplicaInd == nextPrimayInd) {
break outer;
}
}
}
PartitionLocation replicaPartition =
createLocation(
partitionId, workers.get(nextReplicaInd), primaryPartition, storageInfo, false);
primaryPartition.setPeer(replicaPartition);
Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations =
slots.computeIfAbsent(
workers.get(nextReplicaInd),
v -> new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
locations._2.add(replicaPartition);
}
Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations =
slots.computeIfAbsent(
workers.get(nextPrimayInd), v -> new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
locations._1.add(primaryPartition);
primaryIndex = (nextPrimayInd + 1) % workers.size();
iter.remove();
}
return partitionIdList;
}