private RebalanceSummaryResult calculateDryRunSummary()

in pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java [661:871]


  private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, String>> currentAssignment,
      Map<String, Map<String, String>> targetAssignment, String tableNameWithType,
      TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, TableConfig tableConfig,
      Logger tableRebalanceLogger) {
    tableRebalanceLogger.info("Calculating rebalance summary");
    boolean isOfflineTable = TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE;
    int existingReplicationFactor = 0;
    int newReplicationFactor = 0;
    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
    Map<String, Set<String>> existingServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>();
    Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>();

    for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) {
      existingReplicationFactor = entrySet.getValue().size();
      String segmentName = entrySet.getKey();
      Collection<String> segmentStates = entrySet.getValue().values();
      boolean isSegmentConsuming = existingServersToConsumingSegmentMap != null && segmentStates.stream()
          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && segmentStates.stream()
          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));

      for (String instanceName : entrySet.getValue().keySet()) {
        existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName);
        if (isSegmentConsuming) {
          existingServersToConsumingSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName);
        }
      }
    }

    for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) {
      newReplicationFactor = entrySet.getValue().size();
      String segmentName = entrySet.getKey();
      Collection<String> segmentStates = entrySet.getValue().values();
      boolean isSegmentConsuming = existingServersToConsumingSegmentMap != null && segmentStates.stream()
          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && segmentStates.stream()
          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));
      for (String instanceName : entrySet.getValue().keySet()) {
        newServersToSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName);
        if (isSegmentConsuming) {
          newServersToConsumingSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName);
        }
      }
    }
    RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
        = new RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, newReplicationFactor);

    int existingNumServers = existingServersToSegmentMap.size();
    int newNumServers = newServersToSegmentMap.size();
    RebalanceSummaryResult.RebalanceChangeInfo numServers
        = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, newNumServers);

    List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
        _helixDataAccessor.keyBuilder().instanceConfigs(), true);
    Map<String, List<String>> instanceToTagsMap = new HashMap<>();
    for (InstanceConfig instanceConfig : instanceConfigs) {
      instanceToTagsMap.put(instanceConfig.getInstanceName(), instanceConfig.getTags());
    }

    Set<String> serversAdded = new HashSet<>();
    Set<String> serversRemoved = new HashSet<>();
    Set<String> serversUnchanged = new HashSet<>();
    Set<String> serversGettingNewSegments = new HashSet<>();
    Map<String, RebalanceSummaryResult.TagInfo> tagsInfoMap = new HashMap<>();
    String serverTenantName = tableConfig.getTenantConfig().getServer();
    if (serverTenantName != null) {
      String serverTenantTag =
          TagNameUtils.getServerTagForTenant(serverTenantName, tableConfig.getTableType());
      tagsInfoMap.put(serverTenantTag,
          new RebalanceSummaryResult.TagInfo(serverTenantTag));
    }
    TagOverrideConfig tagOverrideConfig = tableConfig.getTenantConfig().getTagOverrideConfig();
    if (tagOverrideConfig != null) {
      String completedTag = tagOverrideConfig.getRealtimeCompleted();
      String consumingTag = tagOverrideConfig.getRealtimeConsuming();
      if (completedTag != null) {
        tagsInfoMap.put(completedTag, new RebalanceSummaryResult.TagInfo(completedTag));
      }
      if (consumingTag != null) {
        tagsInfoMap.put(consumingTag, new RebalanceSummaryResult.TagInfo(consumingTag));
      }
    }
    if (tableConfig.getInstanceAssignmentConfigMap() != null) {
      // for simplicity, including all segment types present in instanceAssignmentConfigMap
      tableConfig.getInstanceAssignmentConfigMap().values().forEach(instanceAssignmentConfig -> {
        String tag = instanceAssignmentConfig.getTagPoolConfig().getTag();
        tagsInfoMap.put(tag, new RebalanceSummaryResult.TagInfo(tag));
      });
    }
    if (tableConfig.getTierConfigsList() != null) {
      tableConfig.getTierConfigsList().forEach(tierConfig -> {
        String tierTag = tierConfig.getServerTag();
        tagsInfoMap.put(tierTag, new RebalanceSummaryResult.TagInfo(tierTag));
      });
    }
    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = new HashMap<>();
    int segmentsNotMoved = 0;
    int totalSegmentsToBeDeleted = 0;
    int maxSegmentsAddedToServer = 0;
    for (Map.Entry<String, Set<String>> entry : newServersToSegmentMap.entrySet()) {
      String server = entry.getKey();
      Set<String> segmentSet = entry.getValue();
      int totalNewSegments = segmentSet.size();

      Set<String> newSegmentSet = new HashSet<>(segmentSet);
      Set<String> existingSegmentSet = new HashSet<>();
      int segmentsUnchanged = 0;
      int totalExistingSegments = 0;
      RebalanceSummaryResult.ServerStatus serverStatus = RebalanceSummaryResult.ServerStatus.ADDED;
      if (existingServersToSegmentMap.containsKey(server)) {
        Set<String> segmentSetForServer = existingServersToSegmentMap.get(server);
        totalExistingSegments = segmentSetForServer.size();
        existingSegmentSet.addAll(segmentSetForServer);
        Set<String> intersection = new HashSet<>(segmentSetForServer);
        intersection.retainAll(newSegmentSet);
        segmentsUnchanged = intersection.size();
        segmentsNotMoved += segmentsUnchanged;
        serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
        serversUnchanged.add(server);
      } else {
        serversAdded.add(server);
      }
      newSegmentSet.removeAll(existingSegmentSet);
      int segmentsAdded = newSegmentSet.size();
      if (segmentsAdded > 0) {
        serversGettingNewSegments.add(server);
      }
      maxSegmentsAddedToServer = Math.max(maxSegmentsAddedToServer, segmentsAdded);
      int segmentsDeleted = existingSegmentSet.size() - segmentsUnchanged;
      totalSegmentsToBeDeleted += segmentsDeleted;

      serverSegmentChangeInfoMap.put(server, new RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus,
          totalNewSegments, totalExistingSegments, segmentsAdded, segmentsDeleted, segmentsUnchanged,
          instanceToTagsMap.getOrDefault(server, null)));
      List<String> serverTags = getServerTag(server);
      Set<String> relevantTags = new HashSet<>(serverTags);
      relevantTags.retainAll(tagsInfoMap.keySet());
      // The segments remain unchanged or need to download will be accounted to every tag associated with this
      // server instance
      if (relevantTags.isEmpty()) {
        // this could happen when server's tags changed but reassignInstance=false in the rebalance config
        tableRebalanceLogger.warn("Server: {} was assigned but does not have any relevant tags", server);

        RebalanceSummaryResult.TagInfo tagsInfo =
            tagsInfoMap.computeIfAbsent(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS,
                RebalanceSummaryResult.TagInfo::new);
        tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged);
        tagsInfo.increaseNumSegmentsToDownload(segmentsAdded);
        tagsInfo.increaseNumServerParticipants(1);
      } else {
        for (String tag : relevantTags) {
          RebalanceSummaryResult.TagInfo tagsInfo = tagsInfoMap.get(tag);
          tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged);
          tagsInfo.increaseNumSegmentsToDownload(segmentsAdded);
          tagsInfo.increaseNumServerParticipants(1);
        }
      }
    }

    for (Map.Entry<String, Set<String>> entry : existingServersToSegmentMap.entrySet()) {
      String server = entry.getKey();
      if (!serverSegmentChangeInfoMap.containsKey(server)) {
        serversRemoved.add(server);
        serverSegmentChangeInfoMap.put(server, new RebalanceSummaryResult.ServerSegmentChangeInfo(
            RebalanceSummaryResult.ServerStatus.REMOVED, 0, entry.getValue().size(), 0, entry.getValue().size(), 0,
            instanceToTagsMap.getOrDefault(server, null)));
        totalSegmentsToBeDeleted += entry.getValue().size();
      }
    }

    if (existingServersToConsumingSegmentMap != null && newServersToConsumingSegmentMap != null) {
      // turn the map into {server: added consuming segments}
      for (Map.Entry<String, Set<String>> entry : newServersToConsumingSegmentMap.entrySet()) {
        String server = entry.getKey();
        entry.getValue().removeAll(existingServersToConsumingSegmentMap.getOrDefault(server, Collections.emptySet()));
      }
      newServersToConsumingSegmentMap.entrySet().removeIf(entry -> entry.getValue().isEmpty());
    }

    RebalanceSummaryResult.RebalanceChangeInfo numSegmentsInSingleReplica
        = new RebalanceSummaryResult.RebalanceChangeInfo(currentAssignment.size(), targetAssignment.size());

    int existingNumberSegmentsTotal = existingReplicationFactor * currentAssignment.size();
    int newNumberSegmentsTotal = newReplicationFactor * targetAssignment.size();
    RebalanceSummaryResult.RebalanceChangeInfo numSegmentsAcrossAllReplicas
        = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumberSegmentsTotal, newNumberSegmentsTotal);

    int totalSegmentsToBeAdded = newNumberSegmentsTotal - segmentsNotMoved;

    long tableSizePerReplicaInBytes = calculateTableSizePerReplicaInBytes(tableSubTypeSizeDetails);
    long averageSegmentSizeInBytes = tableSizePerReplicaInBytes <= 0 ? tableSizePerReplicaInBytes
        : tableSizePerReplicaInBytes / ((long) currentAssignment.size());
    long totalEstimatedDataToBeMovedInBytes = tableSizePerReplicaInBytes <= 0 ? tableSizePerReplicaInBytes
        : ((long) totalSegmentsToBeAdded) * averageSegmentSizeInBytes;

    // Set some of the sets to null if they are empty to ensure they don't show up in the result
    RebalanceSummaryResult.ServerInfo serverInfo = new RebalanceSummaryResult.ServerInfo(
        serversGettingNewSegments.size(), numServers, serversAdded, serversRemoved, serversUnchanged,
        serversGettingNewSegments, serverSegmentChangeInfoMap);
    // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how
    //       rebalance time can vary with number of segments added
    RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary =
        isOfflineTable ? null
            : getConsumingSegmentSummary(tableConfig, newServersToConsumingSegmentMap, tableRebalanceLogger);
    RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeAdded,
        totalSegmentsToBeDeleted, maxSegmentsAddedToServer, averageSegmentSizeInBytes,
        totalEstimatedDataToBeMovedInBytes, replicationFactor, numSegmentsInSingleReplica,
        numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);

    tableRebalanceLogger.info("Calculated rebalance summary");
    return new RebalanceSummaryResult(serverInfo, segmentInfo, new ArrayList<>(tagsInfoMap.values()));
  }