private static List roundRobin()

in master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java [191:262]


  private static List<Integer> roundRobin(
      Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots,
      List<Integer> partitionIds,
      List<WorkerInfo> workers,
      Map<WorkerInfo, List<UsableDiskInfo>> restrictions,
      boolean shouldReplicate,
      boolean shouldRackAware) {
    // workerInfo -> (diskIndexForPrimary, diskIndexForReplica)
    Map<WorkerInfo, Integer> workerDiskIndexForPrimary = new HashMap<>();
    Map<WorkerInfo, Integer> workerDiskIndexForReplica = new HashMap<>();
    List<Integer> partitionIdList = new ArrayList<>(partitionIds);
    int primaryIndex = rand.nextInt(workers.size());
    Iterator<Integer> iter = partitionIdList.iterator();
    outer:
    while (iter.hasNext()) {
      int nextPrimayInd = primaryIndex;

      int partitionId = iter.next();
      StorageInfo storageInfo = new StorageInfo();
      if (restrictions != null) {
        while (!haveUsableSlots(restrictions, workers, nextPrimayInd)) {
          nextPrimayInd = (nextPrimayInd + 1) % workers.size();
          if (nextPrimayInd == primaryIndex) {
            break outer;
          }
        }
        storageInfo =
            getStorageInfo(workers, nextPrimayInd, restrictions, workerDiskIndexForPrimary);
      }
      PartitionLocation primaryPartition =
          createLocation(partitionId, workers.get(nextPrimayInd), null, storageInfo, true);

      if (shouldReplicate) {
        int nextReplicaInd = (nextPrimayInd + 1) % workers.size();
        if (restrictions != null) {
          while (!haveUsableSlots(restrictions, workers, nextReplicaInd)
              || !satisfyRackAware(shouldRackAware, workers, nextPrimayInd, nextReplicaInd)) {
            nextReplicaInd = (nextReplicaInd + 1) % workers.size();
            if (nextReplicaInd == nextPrimayInd) {
              break outer;
            }
          }
          storageInfo =
              getStorageInfo(workers, nextReplicaInd, restrictions, workerDiskIndexForReplica);
        } else if (shouldRackAware) {
          while (!satisfyRackAware(true, workers, nextPrimayInd, nextReplicaInd)) {
            nextReplicaInd = (nextReplicaInd + 1) % workers.size();
            if (nextReplicaInd == nextPrimayInd) {
              break outer;
            }
          }
        }
        PartitionLocation replicaPartition =
            createLocation(
                partitionId, workers.get(nextReplicaInd), primaryPartition, storageInfo, false);
        primaryPartition.setPeer(replicaPartition);
        Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations =
            slots.computeIfAbsent(
                workers.get(nextReplicaInd),
                v -> new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
        locations._2.add(replicaPartition);
      }

      Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations =
          slots.computeIfAbsent(
              workers.get(nextPrimayInd), v -> new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
      locations._1.add(primaryPartition);
      primaryIndex = (nextPrimayInd + 1) % workers.size();
      iter.remove();
    }
    return partitionIdList;
  }