public PartitionRangeAssignment assign()

in coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java [63:145]


  public PartitionRangeAssignment assign(
      int totalPartitionNum,
      int partitionNumPerRange,
      int replica,
      Set<String> requiredTags,
      int requiredShuffleServerNumber,
      int estimateTaskConcurrency) {

    if (partitionNumPerRange != 1) {
      throw new RssException("PartitionNumPerRange must be one");
    }

    SortedMap<PartitionRange, List<ServerNode>> assignments;
    synchronized (this) {
      List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
      Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos = JavaUtils.newConcurrentMap();
      for (ServerNode node : nodes) {
        newPartitionInfos.computeIfAbsent(
            node,
            key -> {
              PartitionAssignmentInfo partitionInfo;
              if (serverToPartitions.containsKey(node)) {
                partitionInfo = serverToPartitions.get(node);
                if (partitionInfo.getTimestamp() < node.getTimestamp()) {
                  partitionInfo.resetPartitionNum();
                  partitionInfo.setTimestamp(node.getTimestamp());
                }
              } else {
                partitionInfo = new PartitionAssignmentInfo();
              }
              return partitionInfo;
            });
      }
      serverToPartitions = newPartitionInfos;
      int averagePartitions = totalPartitionNum * replica / clusterManager.getShuffleNodesMax();
      int assignPartitions = Math.max(averagePartitions, 1);
      nodes.sort(
          new Comparator<ServerNode>() {
            @Override
            public int compare(ServerNode o1, ServerNode o2) {
              PartitionAssignmentInfo partitionInfo1 = serverToPartitions.get(o1);
              PartitionAssignmentInfo partitionInfo2 = serverToPartitions.get(o2);
              double v1 =
                  o1.getAvailableMemory()
                      * 1.0
                      / (partitionInfo1.getPartitionNum() + assignPartitions);
              double v2 =
                  o2.getAvailableMemory()
                      * 1.0
                      / (partitionInfo2.getPartitionNum() + assignPartitions);
              return Double.compare(v2, v1);
            }
          });

      if (nodes.isEmpty() || nodes.size() < replica) {
        throw new RssException("There isn't enough shuffle servers");
      }

      final int assignmentMaxNum = clusterManager.getShuffleNodesMax();
      int expectNum = assignmentMaxNum;
      if (requiredShuffleServerNumber < assignmentMaxNum && requiredShuffleServerNumber > 0) {
        expectNum = requiredShuffleServerNumber;
      }

      if (nodes.size() < expectNum) {
        LOG.warn("Can't get expected servers [{}] and found only [{}]", expectNum, nodes.size());
        expectNum = nodes.size();
      }

      List<ServerNode> candidatesNodes = getCandidateNodes(nodes, expectNum);
      assignments =
          getPartitionAssignment(
              totalPartitionNum,
              partitionNumPerRange,
              replica,
              candidatesNodes,
              estimateTaskConcurrency);
      assignments.values().stream()
          .flatMap(Collection::stream)
          .forEach(server -> serverToPartitions.get(server).incrementPartitionNum());
    }
    return new PartitionRangeAssignment(assignments);
  }