private RebalanceResult doRebalance()

in pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java [206:637]


  private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig,
      @Nullable String rebalanceJobId, @Nullable Map<String, Set<String>> providedTierToSegmentsMap) {
    long startTimeMs = System.currentTimeMillis();
    String tableNameWithType = tableConfig.getTableName();
    String loggerName =
        getClass().getSimpleName() + '-' + tableNameWithType + (rebalanceJobId == null ? "" : '-' + rebalanceJobId);
    Logger tableRebalanceLogger = LoggerFactory.getLogger(loggerName);
    if (rebalanceJobId == null) {
      // If not passed along, create one.
      // TODO - Add rebalanceJobId to all log messages for easy tracking.
      rebalanceJobId = createUniqueRebalanceJobIdentifier();
    }
    boolean dryRun = rebalanceConfig.isDryRun();
    boolean preChecks = rebalanceConfig.isPreChecks();
    boolean reassignInstances = rebalanceConfig.isReassignInstances();
    boolean includeConsuming = rebalanceConfig.isIncludeConsuming();
    boolean bootstrap = rebalanceConfig.isBootstrap();
    boolean downtime = rebalanceConfig.isDowntime();
    int minReplicasToKeepUpForNoDowntime = rebalanceConfig.getMinAvailableReplicas();
    boolean lowDiskMode = rebalanceConfig.isLowDiskMode();
    boolean bestEfforts = rebalanceConfig.isBestEfforts();
    long externalViewCheckIntervalInMs = rebalanceConfig.getExternalViewCheckIntervalInMs();
    long externalViewStabilizationTimeoutInMs = rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
    Enablement minimizeDataMovement = rebalanceConfig.getMinimizeDataMovement();
    boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
        && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
        tableConfig.getRoutingConfig().getInstanceSelectorType());
    tableRebalanceLogger.info(
        "Start rebalancing with dryRun: {}, preChecks: {}, reassignInstances: {}, "
            + "includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, "
            + "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {}, externalViewCheckIntervalInMs: {}, "
            + "externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}",
        dryRun, preChecks, reassignInstances, includeConsuming, bootstrap, downtime,
        minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, lowDiskMode, bestEfforts,
        externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, minimizeDataMovement);

    // Dry-run must be enabled to run pre-checks
    if (preChecks && !dryRun) {
      String errorMsg = "Pre-checks can only be enabled in dry-run mode, not triggering rebalance";
      tableRebalanceLogger.error(errorMsg);
      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null, null,
          null);
    }

    // Fetch ideal state
    PropertyKey idealStatePropertyKey = _helixDataAccessor.keyBuilder().idealStates(tableNameWithType);
    IdealState currentIdealState;
    try {
      currentIdealState = _helixDataAccessor.getProperty(idealStatePropertyKey);
    } catch (Exception e) {
      onReturnFailure("Caught exception while fetching IdealState, aborting the rebalance", e,
          tableRebalanceLogger);
      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
          "Caught exception while fetching IdealState: " + e, null, null, null, null, null);
    }
    if (currentIdealState == null) {
      onReturnFailure("Cannot find the IdealState, aborting the rebalance", null,
          tableRebalanceLogger);
      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Cannot find the IdealState for table",
          null, null, null, null, null);
    }
    if (!currentIdealState.isEnabled() && !downtime) {
      onReturnFailure("Cannot rebalance disabled table without downtime, aborting the rebalance", null,
          tableRebalanceLogger);
      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
          "Cannot rebalance disabled table without downtime", null, null, null, null, null);
    }

    tableRebalanceLogger.info("Processing instance partitions");

    // Calculate instance partitions map
    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
    boolean instancePartitionsUnchanged;
    try {
      Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> instancePartitionsMapAndUnchanged =
          getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun, minimizeDataMovement,
              tableRebalanceLogger);
      instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft();
      instancePartitionsUnchanged = instancePartitionsMapAndUnchanged.getRight();
    } catch (Exception e) {
      onReturnFailure("Caught exception while fetching/calculating instance partitions, aborting the rebalance", e,
          tableRebalanceLogger);
      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
          "Caught exception while fetching/calculating instance partitions: " + e, null, null, null, null,
          null);
    }

    // Calculate instance partitions for tiers if configured
    List<Tier> sortedTiers;
    Map<String, InstancePartitions> tierToInstancePartitionsMap;
    boolean tierInstancePartitionsUnchanged;
    try {
      sortedTiers = getSortedTiers(tableConfig, providedTierToSegmentsMap);
      Pair<Map<String, InstancePartitions>, Boolean> tierToInstancePartitionsMapAndUnchanged =
          getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun,
              minimizeDataMovement, tableRebalanceLogger);
      tierToInstancePartitionsMap = tierToInstancePartitionsMapAndUnchanged.getLeft();
      tierInstancePartitionsUnchanged = tierToInstancePartitionsMapAndUnchanged.getRight();
    } catch (Exception e) {
      onReturnFailure("Caught exception while fetching/calculating tier instance partitions, aborting the rebalance", e,
          tableRebalanceLogger);
      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
          "Caught exception while fetching/calculating tier instance partitions: " + e, null,
          null, null, null, null);
    }

    tableRebalanceLogger.info("Calculating the target assignment");
    SegmentAssignment segmentAssignment =
        SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics);
    Map<String, Map<String, String>> currentAssignment = currentIdealState.getRecord().getMapFields();
    Map<String, Map<String, String>> targetAssignment;
    try {
      targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
          tierToInstancePartitionsMap, rebalanceConfig);
    } catch (Exception e) {
      onReturnFailure("Caught exception while calculating target assignment, aborting the rebalance", e,
          tableRebalanceLogger);
      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
          "Caught exception while calculating target assignment: " + e, instancePartitionsMap,
          tierToInstancePartitionsMap, null, null, null);
    }

    boolean segmentAssignmentUnchanged = currentAssignment.equals(targetAssignment);
    tableRebalanceLogger.info(
        "instancePartitionsUnchanged: {}, tierInstancePartitionsUnchanged: {}, "
            + "segmentAssignmentUnchanged: {}", instancePartitionsUnchanged,
        tierInstancePartitionsUnchanged, segmentAssignmentUnchanged);

    TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails =
        fetchTableSizeDetails(tableNameWithType, tableRebalanceLogger);

    Map<String, RebalancePreCheckerResult> preChecksResult = null;
    if (preChecks) {
      if (_rebalancePreChecker == null) {
        tableRebalanceLogger.warn(
            "Pre-checks are enabled but the pre-checker is not set, skipping pre-checks");
      } else {
        RebalancePreChecker.PreCheckContext preCheckContext =
            new RebalancePreChecker.PreCheckContext(rebalanceJobId, tableNameWithType,
                tableConfig, currentAssignment, targetAssignment, tableSubTypeSizeDetails, rebalanceConfig);
        preChecksResult = _rebalancePreChecker.check(preCheckContext);
      }
    }
    // Calculate summary here itself so that even if the table is already balanced, the caller can verify whether that
    // is expected or not based on the summary results
    RebalanceSummaryResult summaryResult =
        calculateDryRunSummary(currentAssignment, targetAssignment, tableNameWithType, tableSubTypeSizeDetails,
            tableConfig, tableRebalanceLogger);

    if (segmentAssignmentUnchanged) {
      tableRebalanceLogger.info("Table is already balanced");
      if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) {
        _tableRebalanceObserver.onNoop("Instance unchanged and table is already balanced");
        return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced",
            instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
      } else {
        if (dryRun) {
          return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
              "Instance reassigned in dry-run mode, table is already balanced",
              instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
        } else {
          _tableRebalanceObserver.onSuccess("Instance reassigned but table is already balanced");
          return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
              "Instance reassigned, table is already balanced", instancePartitionsMap,
              tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
        }
      }
    }

    if (dryRun) {
      tableRebalanceLogger.info("Rebalancing in dry-run mode, returning the target assignment");
      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap,
          tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
    }

    if (downtime) {
      tableRebalanceLogger.info("Rebalancing with downtime");

      // Reuse current IdealState to update the IdealState in cluster
      ZNRecord idealStateRecord = currentIdealState.getRecord();
      idealStateRecord.setMapFields(targetAssignment);
      currentIdealState.setNumPartitions(targetAssignment.size());
      currentIdealState.setReplicas(Integer.toString(targetAssignment.values().iterator().next().size()));

      // Check version and update IdealState
      try {
        Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor()
            .set(idealStatePropertyKey.getPath(), idealStateRecord, idealStateRecord.getVersion(),
                AccessOption.PERSISTENT), "Failed to update IdealState");
        String msg = "Finished rebalancing with downtime in " + (System.currentTimeMillis() - startTimeMs) + " ms.";
        tableRebalanceLogger.info(msg);
        _tableRebalanceObserver.onSuccess(msg);
        return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
            "Success with downtime (replaced IdealState with the target segment assignment, ExternalView might not "
                + "reach the target segment assignment yet)", instancePartitionsMap, tierToInstancePartitionsMap,
            targetAssignment, preChecksResult, summaryResult);
      } catch (Exception e) {
        onReturnFailure("Caught exception while updating IdealState, aborting the rebalance", e, tableRebalanceLogger);
        return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
            "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap,
            targetAssignment, preChecksResult, summaryResult);
      }
    }

    List<String> segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
    Set<String> segmentsToMonitor = new HashSet<>(segmentsToMove);

    long estimatedAverageSegmentSizeInBytes = summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes();
    Set<String> allSegmentsFromIdealState = currentAssignment.keySet();
    TableRebalanceObserver.RebalanceContext rebalanceContext = new TableRebalanceObserver.RebalanceContext(
        estimatedAverageSegmentSizeInBytes, allSegmentsFromIdealState, segmentsToMonitor);

    // Record the beginning of rebalance
    _tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.START_TRIGGER, currentAssignment,
        targetAssignment, rebalanceContext);

    // Calculate the min available replicas for no-downtime rebalance
    // NOTE:
    // 1. The calculation is based on the number of replicas of the target assignment. In case of increasing the number
    //    of replicas for the current assignment, the current instance state map might not have enough replicas to reach
    //    the minimum available replicas requirement. In this scenario we don't want to fail the check, but keep all the
    //    current instances as this is the best we can do, and can help the table get out of this state.
    // 2. Only check the segments to be moved because we don't need to maintain available replicas for segments not
    //    being moved, including segments with all replicas OFFLINE (error segments during consumption).
    int numReplicas = Integer.MAX_VALUE;
    for (String segment : segmentsToMove) {
      numReplicas = Math.min(targetAssignment.get(segment).size(), numReplicas);
    }
    int minAvailableReplicas;
    if (minReplicasToKeepUpForNoDowntime >= 0) {
      // For non-negative value, use it as min available replicas
      if (minReplicasToKeepUpForNoDowntime >= numReplicas) {
        onReturnFailure("Illegal config for minReplicasToKeepUpForNoDowntime: " + minReplicasToKeepUpForNoDowntime
                + ", must be less than number of replicas: " + numReplicas + ", aborting the rebalance", null,
            tableRebalanceLogger);
        return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
            "Illegal min available replicas config", instancePartitionsMap, tierToInstancePartitionsMap,
            targetAssignment, preChecksResult, summaryResult);
      }
      minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
    } else {
      // For negative value, use it as max unavailable replicas
      minAvailableReplicas = Math.max(numReplicas + minReplicasToKeepUpForNoDowntime, 0);
    }

    int numCurrentAssignmentReplicas = Integer.MAX_VALUE;
    for (String segment : segmentsToMove) {
      numCurrentAssignmentReplicas = Math.min(currentAssignment.get(segment).size(), numCurrentAssignmentReplicas);
    }
    if (minAvailableReplicas > numCurrentAssignmentReplicas) {
      tableRebalanceLogger.warn("minAvailableReplicas: {} larger than existing number of replicas: {}, "
              + "resetting minAvailableReplicas to {}", minAvailableReplicas, numCurrentAssignmentReplicas,
          numCurrentAssignmentReplicas);
      minAvailableReplicas = numCurrentAssignmentReplicas;
    }

    tableRebalanceLogger.info(
        "Rebalancing with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, "
            + "bestEfforts: {}, externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}",
        minAvailableReplicas, enableStrictReplicaGroup, bestEfforts, externalViewCheckIntervalInMs,
        externalViewStabilizationTimeoutInMs);
    int expectedVersion = currentIdealState.getRecord().getVersion();

    // We repeat the following steps until the target assignment is reached:
    // 1. Wait for ExternalView to converge with the IdealState. Fail the rebalance if it doesn't converge within the
    //    timeout.
    // 2. When IdealState changes during step 1, re-calculate the target assignment based on the new IdealState (current
    //    assignment).
    // 3. Check if the target assignment is reached. Rebalance is done if it is reached.
    // 4. Calculate the next assignment based on the current assignment, target assignment and min available replicas.
    // 5. Update the IdealState to the next assignment. If the IdealState changes before the update, go back to step 1.
    //
    // NOTE: Monitor the segments to be moved from both the previous round and this round to ensure the moved segments
    //       in the previous round are also converged.
    while (true) {
      // Wait for ExternalView to converge before updating the next IdealState
      IdealState idealState;
      try {
        idealState = waitForExternalViewToConverge(tableNameWithType, lowDiskMode, bestEfforts, segmentsToMonitor,
            externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, estimatedAverageSegmentSizeInBytes,
            allSegmentsFromIdealState, tableRebalanceLogger);
      } catch (Exception e) {
        String errorMsg = "Caught exception while waiting for ExternalView to converge, aborting the rebalance";
        tableRebalanceLogger.warn(errorMsg, e);
        if (_tableRebalanceObserver.isStopped()) {
          return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(),
              "Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap,
              tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
        }
        _tableRebalanceObserver.onError(errorMsg);
        return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
            "Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap,
            tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
      }

      // Re-calculate the target assignment if IdealState changed while waiting for ExternalView to converge
      ZNRecord idealStateRecord = idealState.getRecord();
      if (idealStateRecord.getVersion() != expectedVersion) {
        tableRebalanceLogger.info(
            "IdealState version changed while waiting for ExternalView to converge, re-calculating the target "
                + "assignment");
        Map<String, Map<String, String>> oldAssignment = currentAssignment;
        currentAssignment = idealStateRecord.getMapFields();
        expectedVersion = idealStateRecord.getVersion();

        // If all the segments to be moved remain unchanged (same instance state map) in the new ideal state, apply the
        // same target instance state map for these segments to the new ideal state as the target assignment
        boolean segmentsToMoveChanged = false;
        if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
          // For StrictRealtimeSegmentAssignment, we need to recompute the target assignment because the assignment for
          // new added segments is based on the existing assignment
          segmentsToMoveChanged = true;
        } else {
          for (String segment : segmentsToMove) {
            Map<String, String> oldInstanceStateMap = oldAssignment.get(segment);
            Map<String, String> currentInstanceStateMap = currentAssignment.get(segment);
            // TODO: Consider allowing segment state change from CONSUMING to ONLINE
            if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
              tableRebalanceLogger.info(
                  "Segment state changed in IdealState from: {} to: {} for segment: {}, re-calculating the target "
                      + "assignment based on the new IdealState",
                  oldInstanceStateMap, currentInstanceStateMap, segment);
              segmentsToMoveChanged = true;
              break;
            }
          }
        }
        if (segmentsToMoveChanged) {
          try {
            // Re-calculate the instance partitions in case the instance configs changed during the rebalance
            instancePartitionsMap =
                getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false,
                    minimizeDataMovement, tableRebalanceLogger).getLeft();
            tierToInstancePartitionsMap =
                getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, false,
                    minimizeDataMovement, tableRebalanceLogger).getLeft();
            targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
                tierToInstancePartitionsMap, rebalanceConfig);
          } catch (Exception e) {
            onReturnFailure("Caught exception while re-calculating the target assignment, aborting the rebalance", e,
                tableRebalanceLogger);
            return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
                "Caught exception while re-calculating the target assignment: " + e, instancePartitionsMap,
                tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
          }
        } else {
          tableRebalanceLogger.info(
              "No state change found for segments to be moved, re-calculating the target assignment based on the "
                  + "previous target assignment");
          Map<String, Map<String, String>> oldTargetAssignment = targetAssignment;
          // Other instance assignment code returns a TreeMap to keep it sorted, doing the same here
          targetAssignment = new TreeMap<>(currentAssignment);
          for (String segment : segmentsToMove) {
            targetAssignment.put(segment, oldTargetAssignment.get(segment));
          }
        }
      }

      if (currentAssignment.equals(targetAssignment)) {
        String msg =
            "Finished rebalancing with minAvailableReplicas: " + minAvailableReplicas + ", enableStrictReplicaGroup: "
                + enableStrictReplicaGroup + ", bestEfforts: " + bestEfforts + " in " + (System.currentTimeMillis()
                - startTimeMs) + " ms.";
        tableRebalanceLogger.info(msg);
        // Record completion
        _tableRebalanceObserver.onSuccess(msg);
        return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
            "Success with minAvailableReplicas: " + minAvailableReplicas
                + " (both IdealState and ExternalView should reach the target segment assignment)",
            instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
      }

      // Record change of current ideal state and the new target
      rebalanceContext = new TableRebalanceObserver.RebalanceContext(estimatedAverageSegmentSizeInBytes,
          allSegmentsFromIdealState, null);
      _tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, currentAssignment,
          targetAssignment, rebalanceContext);
      if (_tableRebalanceObserver.isStopped()) {
        return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(),
            "Rebalance has stopped already before updating the IdealState", instancePartitionsMap,
            tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
      }
      Map<String, Map<String, String>> nextAssignment =
          getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup,
              lowDiskMode);
      tableRebalanceLogger.info(
          "Got the next assignment with number of segments to be added/removed for each instance: {}",
          SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, nextAssignment));

      // Record change of current ideal state and the next assignment
      _tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER,
          currentAssignment, nextAssignment, rebalanceContext);
      if (_tableRebalanceObserver.isStopped()) {
        return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(),
            "Rebalance has stopped already before updating the IdealState with the next assignment",
            instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
      }

      // Reuse current IdealState to update the IdealState in cluster
      idealStateRecord.setMapFields(nextAssignment);
      idealState.setNumPartitions(nextAssignment.size());
      idealState.setReplicas(Integer.toString(nextAssignment.values().iterator().next().size()));

      // Check version and update IdealState
      try {
        Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor()
                .set(idealStatePropertyKey.getPath(), idealStateRecord, expectedVersion, AccessOption.PERSISTENT),
            "Failed to update IdealState");
        currentAssignment = nextAssignment;
        expectedVersion++;
        // IdealState update is successful. Update the segment list as the IDEAL_STATE_CHANGE_TRIGGER should have
        // captured the newly added / deleted segments
        allSegmentsFromIdealState = currentAssignment.keySet();
        tableRebalanceLogger.info("Successfully updated the IdealState");
      } catch (ZkBadVersionException e) {
        tableRebalanceLogger.info("Version changed while updating IdealState");
        // Since IdealState wasn't updated, rollback the stats changes made and continue. There is no need to update
        // segmentsToMonitor either since that hasn't changed without the IdealState update
        _tableRebalanceObserver.onRollback();
        continue;
      } catch (Exception e) {
        onReturnFailure("Caught exception while updating IdealState, aborting the rebalance", e, tableRebalanceLogger);
        return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
            "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap,
            targetAssignment, preChecksResult, summaryResult);
      }

      segmentsToMonitor = new HashSet<>(segmentsToMove);
      segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
      segmentsToMonitor.addAll(segmentsToMove);
    }
  }