private static List roundRobin()

in master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java [342:453]


  private static List<Integer> roundRobin(
      Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots,
      List<Integer> partitionIds,
      List<WorkerInfo> workers,
      Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions,
      boolean shouldReplicate,
      boolean shouldRackAware,
      int availableStorageTypes) {
    // workerInfo -> (diskIndexForPrimaryAndReplica)
    Map<WorkerInfo, Integer> workerDiskIndex = new HashMap<>();
    List<Integer> partitionIdList = new LinkedList<>(partitionIds);

    final int workerSize = workers.size();
    final IntUnaryOperator incrementIndex = v -> (v + 1) % workerSize;
    int primaryIndex = rand.nextInt(workerSize);
    int replicaIndex = rand.nextInt(workerSize);

    ListIterator<Integer> iter = partitionIdList.listIterator(partitionIdList.size());
    // Iterate from the end to preserve O(1) removal of processed partitions.
    // This is important when we have a high number of concurrent apps that have a
    // high number of partitions.
    outer:
    while (iter.hasPrevious()) {
      int nextPrimaryInd = primaryIndex;

      int partitionId = iter.previous();
      StorageInfo storageInfo;
      if (slotsRestrictions != null && !slotsRestrictions.isEmpty()) {
        // this means that we'll select a mount point
        while (!haveUsableSlots(slotsRestrictions, workers, nextPrimaryInd)) {
          nextPrimaryInd = incrementIndex.applyAsInt(nextPrimaryInd);
          if (nextPrimaryInd == primaryIndex) {
            break outer;
          }
        }
        storageInfo =
            getStorageInfo(
                workers, nextPrimaryInd, slotsRestrictions, workerDiskIndex, availableStorageTypes);
      } else {
        if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
          while (!workers.get(nextPrimaryInd).haveDisk()) {
            nextPrimaryInd = incrementIndex.applyAsInt(nextPrimaryInd);
            if (nextPrimaryInd == primaryIndex) {
              break outer;
            }
          }
        }
        storageInfo =
            getStorageInfo(workers, nextPrimaryInd, null, workerDiskIndex, availableStorageTypes);
      }
      PartitionLocation primaryPartition =
          createLocation(partitionId, workers.get(nextPrimaryInd), null, storageInfo, true);

      if (shouldReplicate) {
        int nextReplicaInd = replicaIndex;
        if (slotsRestrictions != null) {
          while (nextReplicaInd == nextPrimaryInd
              || !haveUsableSlots(slotsRestrictions, workers, nextReplicaInd)
              || !satisfyRackAware(shouldRackAware, workers, nextPrimaryInd, nextReplicaInd)) {
            nextReplicaInd = incrementIndex.applyAsInt(nextReplicaInd);
            if (nextReplicaInd == replicaIndex) {
              break outer;
            }
          }
          storageInfo =
              getStorageInfo(
                  workers,
                  nextReplicaInd,
                  slotsRestrictions,
                  workerDiskIndex,
                  availableStorageTypes);
        } else if (shouldRackAware) {
          while (nextReplicaInd == nextPrimaryInd
              || !satisfyRackAware(true, workers, nextPrimaryInd, nextReplicaInd)) {
            nextReplicaInd = incrementIndex.applyAsInt(nextReplicaInd);
            if (nextReplicaInd == replicaIndex) {
              break outer;
            }
          }
        } else {
          if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
            while (nextReplicaInd == nextPrimaryInd || !workers.get(nextReplicaInd).haveDisk()) {
              nextReplicaInd = incrementIndex.applyAsInt(nextReplicaInd);
              if (nextReplicaInd == replicaIndex) {
                break outer;
              }
            }
          }
          storageInfo =
              getStorageInfo(workers, nextReplicaInd, null, workerDiskIndex, availableStorageTypes);
        }
        PartitionLocation replicaPartition =
            createLocation(
                partitionId, workers.get(nextReplicaInd), primaryPartition, storageInfo, false);
        primaryPartition.setPeer(replicaPartition);
        Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations =
            slots.computeIfAbsent(
                workers.get(nextReplicaInd),
                v -> new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
        locations._2.add(replicaPartition);
        replicaIndex = incrementIndex.applyAsInt(nextReplicaInd);
      }

      Tuple2<List<PartitionLocation>, List<PartitionLocation>> locations =
          slots.computeIfAbsent(
              workers.get(nextPrimaryInd), v -> new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
      locations._1.add(primaryPartition);
      primaryIndex = incrementIndex.applyAsInt(nextPrimaryInd);
      iter.remove();
    }
    return partitionIdList;
  }