in pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java [206:637]
private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig,
@Nullable String rebalanceJobId, @Nullable Map<String, Set<String>> providedTierToSegmentsMap) {
long startTimeMs = System.currentTimeMillis();
String tableNameWithType = tableConfig.getTableName();
String loggerName =
getClass().getSimpleName() + '-' + tableNameWithType + (rebalanceJobId == null ? "" : '-' + rebalanceJobId);
Logger tableRebalanceLogger = LoggerFactory.getLogger(loggerName);
if (rebalanceJobId == null) {
// If not passed along, create one.
// TODO - Add rebalanceJobId to all log messages for easy tracking.
rebalanceJobId = createUniqueRebalanceJobIdentifier();
}
boolean dryRun = rebalanceConfig.isDryRun();
boolean preChecks = rebalanceConfig.isPreChecks();
boolean reassignInstances = rebalanceConfig.isReassignInstances();
boolean includeConsuming = rebalanceConfig.isIncludeConsuming();
boolean bootstrap = rebalanceConfig.isBootstrap();
boolean downtime = rebalanceConfig.isDowntime();
int minReplicasToKeepUpForNoDowntime = rebalanceConfig.getMinAvailableReplicas();
boolean lowDiskMode = rebalanceConfig.isLowDiskMode();
boolean bestEfforts = rebalanceConfig.isBestEfforts();
long externalViewCheckIntervalInMs = rebalanceConfig.getExternalViewCheckIntervalInMs();
long externalViewStabilizationTimeoutInMs = rebalanceConfig.getExternalViewStabilizationTimeoutInMs();
Enablement minimizeDataMovement = rebalanceConfig.getMinimizeDataMovement();
boolean enableStrictReplicaGroup = tableConfig.getRoutingConfig() != null
&& RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
tableConfig.getRoutingConfig().getInstanceSelectorType());
tableRebalanceLogger.info(
"Start rebalancing with dryRun: {}, preChecks: {}, reassignInstances: {}, "
+ "includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, "
+ "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {}, externalViewCheckIntervalInMs: {}, "
+ "externalViewStabilizationTimeoutInMs: {}, minimizeDataMovement: {}",
dryRun, preChecks, reassignInstances, includeConsuming, bootstrap, downtime,
minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, lowDiskMode, bestEfforts,
externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, minimizeDataMovement);
// Dry-run must be enabled to run pre-checks
if (preChecks && !dryRun) {
String errorMsg = "Pre-checks can only be enabled in dry-run mode, not triggering rebalance";
tableRebalanceLogger.error(errorMsg);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null, null,
null);
}
// Fetch ideal state
PropertyKey idealStatePropertyKey = _helixDataAccessor.keyBuilder().idealStates(tableNameWithType);
IdealState currentIdealState;
try {
currentIdealState = _helixDataAccessor.getProperty(idealStatePropertyKey);
} catch (Exception e) {
onReturnFailure("Caught exception while fetching IdealState, aborting the rebalance", e,
tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while fetching IdealState: " + e, null, null, null, null, null);
}
if (currentIdealState == null) {
onReturnFailure("Cannot find the IdealState, aborting the rebalance", null,
tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Cannot find the IdealState for table",
null, null, null, null, null);
}
if (!currentIdealState.isEnabled() && !downtime) {
onReturnFailure("Cannot rebalance disabled table without downtime, aborting the rebalance", null,
tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Cannot rebalance disabled table without downtime", null, null, null, null, null);
}
tableRebalanceLogger.info("Processing instance partitions");
// Calculate instance partitions map
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap;
boolean instancePartitionsUnchanged;
try {
Pair<Map<InstancePartitionsType, InstancePartitions>, Boolean> instancePartitionsMapAndUnchanged =
getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, dryRun, minimizeDataMovement,
tableRebalanceLogger);
instancePartitionsMap = instancePartitionsMapAndUnchanged.getLeft();
instancePartitionsUnchanged = instancePartitionsMapAndUnchanged.getRight();
} catch (Exception e) {
onReturnFailure("Caught exception while fetching/calculating instance partitions, aborting the rebalance", e,
tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while fetching/calculating instance partitions: " + e, null, null, null, null,
null);
}
// Calculate instance partitions for tiers if configured
List<Tier> sortedTiers;
Map<String, InstancePartitions> tierToInstancePartitionsMap;
boolean tierInstancePartitionsUnchanged;
try {
sortedTiers = getSortedTiers(tableConfig, providedTierToSegmentsMap);
Pair<Map<String, InstancePartitions>, Boolean> tierToInstancePartitionsMapAndUnchanged =
getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, dryRun,
minimizeDataMovement, tableRebalanceLogger);
tierToInstancePartitionsMap = tierToInstancePartitionsMapAndUnchanged.getLeft();
tierInstancePartitionsUnchanged = tierToInstancePartitionsMapAndUnchanged.getRight();
} catch (Exception e) {
onReturnFailure("Caught exception while fetching/calculating tier instance partitions, aborting the rebalance", e,
tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while fetching/calculating tier instance partitions: " + e, null,
null, null, null, null);
}
tableRebalanceLogger.info("Calculating the target assignment");
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics);
Map<String, Map<String, String>> currentAssignment = currentIdealState.getRecord().getMapFields();
Map<String, Map<String, String>> targetAssignment;
try {
targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
onReturnFailure("Caught exception while calculating target assignment, aborting the rebalance", e,
tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while calculating target assignment: " + e, instancePartitionsMap,
tierToInstancePartitionsMap, null, null, null);
}
boolean segmentAssignmentUnchanged = currentAssignment.equals(targetAssignment);
tableRebalanceLogger.info(
"instancePartitionsUnchanged: {}, tierInstancePartitionsUnchanged: {}, "
+ "segmentAssignmentUnchanged: {}", instancePartitionsUnchanged,
tierInstancePartitionsUnchanged, segmentAssignmentUnchanged);
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails =
fetchTableSizeDetails(tableNameWithType, tableRebalanceLogger);
Map<String, RebalancePreCheckerResult> preChecksResult = null;
if (preChecks) {
if (_rebalancePreChecker == null) {
tableRebalanceLogger.warn(
"Pre-checks are enabled but the pre-checker is not set, skipping pre-checks");
} else {
RebalancePreChecker.PreCheckContext preCheckContext =
new RebalancePreChecker.PreCheckContext(rebalanceJobId, tableNameWithType,
tableConfig, currentAssignment, targetAssignment, tableSubTypeSizeDetails, rebalanceConfig);
preChecksResult = _rebalancePreChecker.check(preCheckContext);
}
}
// Calculate summary here itself so that even if the table is already balanced, the caller can verify whether that
// is expected or not based on the summary results
RebalanceSummaryResult summaryResult =
calculateDryRunSummary(currentAssignment, targetAssignment, tableNameWithType, tableSubTypeSizeDetails,
tableConfig, tableRebalanceLogger);
if (segmentAssignmentUnchanged) {
tableRebalanceLogger.info("Table is already balanced");
if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) {
_tableRebalanceObserver.onNoop("Instance unchanged and table is already balanced");
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced",
instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
} else {
if (dryRun) {
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Instance reassigned in dry-run mode, table is already balanced",
instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
} else {
_tableRebalanceObserver.onSuccess("Instance reassigned but table is already balanced");
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Instance reassigned, table is already balanced", instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
}
}
}
if (dryRun) {
tableRebalanceLogger.info("Rebalancing in dry-run mode, returning the target assignment");
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
}
if (downtime) {
tableRebalanceLogger.info("Rebalancing with downtime");
// Reuse current IdealState to update the IdealState in cluster
ZNRecord idealStateRecord = currentIdealState.getRecord();
idealStateRecord.setMapFields(targetAssignment);
currentIdealState.setNumPartitions(targetAssignment.size());
currentIdealState.setReplicas(Integer.toString(targetAssignment.values().iterator().next().size()));
// Check version and update IdealState
try {
Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor()
.set(idealStatePropertyKey.getPath(), idealStateRecord, idealStateRecord.getVersion(),
AccessOption.PERSISTENT), "Failed to update IdealState");
String msg = "Finished rebalancing with downtime in " + (System.currentTimeMillis() - startTimeMs) + " ms.";
tableRebalanceLogger.info(msg);
_tableRebalanceObserver.onSuccess(msg);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Success with downtime (replaced IdealState with the target segment assignment, ExternalView might not "
+ "reach the target segment assignment yet)", instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult, summaryResult);
} catch (Exception e) {
onReturnFailure("Caught exception while updating IdealState, aborting the rebalance", e, tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult, summaryResult);
}
}
List<String> segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
Set<String> segmentsToMonitor = new HashSet<>(segmentsToMove);
long estimatedAverageSegmentSizeInBytes = summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes();
Set<String> allSegmentsFromIdealState = currentAssignment.keySet();
TableRebalanceObserver.RebalanceContext rebalanceContext = new TableRebalanceObserver.RebalanceContext(
estimatedAverageSegmentSizeInBytes, allSegmentsFromIdealState, segmentsToMonitor);
// Record the beginning of rebalance
_tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.START_TRIGGER, currentAssignment,
targetAssignment, rebalanceContext);
// Calculate the min available replicas for no-downtime rebalance
// NOTE:
// 1. The calculation is based on the number of replicas of the target assignment. In case of increasing the number
// of replicas for the current assignment, the current instance state map might not have enough replicas to reach
// the minimum available replicas requirement. In this scenario we don't want to fail the check, but keep all the
// current instances as this is the best we can do, and can help the table get out of this state.
// 2. Only check the segments to be moved because we don't need to maintain available replicas for segments not
// being moved, including segments with all replicas OFFLINE (error segments during consumption).
int numReplicas = Integer.MAX_VALUE;
for (String segment : segmentsToMove) {
numReplicas = Math.min(targetAssignment.get(segment).size(), numReplicas);
}
int minAvailableReplicas;
if (minReplicasToKeepUpForNoDowntime >= 0) {
// For non-negative value, use it as min available replicas
if (minReplicasToKeepUpForNoDowntime >= numReplicas) {
onReturnFailure("Illegal config for minReplicasToKeepUpForNoDowntime: " + minReplicasToKeepUpForNoDowntime
+ ", must be less than number of replicas: " + numReplicas + ", aborting the rebalance", null,
tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Illegal min available replicas config", instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult, summaryResult);
}
minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
} else {
// For negative value, use it as max unavailable replicas
minAvailableReplicas = Math.max(numReplicas + minReplicasToKeepUpForNoDowntime, 0);
}
int numCurrentAssignmentReplicas = Integer.MAX_VALUE;
for (String segment : segmentsToMove) {
numCurrentAssignmentReplicas = Math.min(currentAssignment.get(segment).size(), numCurrentAssignmentReplicas);
}
if (minAvailableReplicas > numCurrentAssignmentReplicas) {
tableRebalanceLogger.warn("minAvailableReplicas: {} larger than existing number of replicas: {}, "
+ "resetting minAvailableReplicas to {}", minAvailableReplicas, numCurrentAssignmentReplicas,
numCurrentAssignmentReplicas);
minAvailableReplicas = numCurrentAssignmentReplicas;
}
tableRebalanceLogger.info(
"Rebalancing with minAvailableReplicas: {}, enableStrictReplicaGroup: {}, "
+ "bestEfforts: {}, externalViewCheckIntervalInMs: {}, externalViewStabilizationTimeoutInMs: {}",
minAvailableReplicas, enableStrictReplicaGroup, bestEfforts, externalViewCheckIntervalInMs,
externalViewStabilizationTimeoutInMs);
int expectedVersion = currentIdealState.getRecord().getVersion();
// We repeat the following steps until the target assignment is reached:
// 1. Wait for ExternalView to converge with the IdealState. Fail the rebalance if it doesn't converge within the
// timeout.
// 2. When IdealState changes during step 1, re-calculate the target assignment based on the new IdealState (current
// assignment).
// 3. Check if the target assignment is reached. Rebalance is done if it is reached.
// 4. Calculate the next assignment based on the current assignment, target assignment and min available replicas.
// 5. Update the IdealState to the next assignment. If the IdealState changes before the update, go back to step 1.
//
// NOTE: Monitor the segments to be moved from both the previous round and this round to ensure the moved segments
// in the previous round are also converged.
while (true) {
// Wait for ExternalView to converge before updating the next IdealState
IdealState idealState;
try {
idealState = waitForExternalViewToConverge(tableNameWithType, lowDiskMode, bestEfforts, segmentsToMonitor,
externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs, estimatedAverageSegmentSizeInBytes,
allSegmentsFromIdealState, tableRebalanceLogger);
} catch (Exception e) {
String errorMsg = "Caught exception while waiting for ExternalView to converge, aborting the rebalance";
tableRebalanceLogger.warn(errorMsg, e);
if (_tableRebalanceObserver.isStopped()) {
return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(),
"Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
}
_tableRebalanceObserver.onError(errorMsg);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
}
// Re-calculate the target assignment if IdealState changed while waiting for ExternalView to converge
ZNRecord idealStateRecord = idealState.getRecord();
if (idealStateRecord.getVersion() != expectedVersion) {
tableRebalanceLogger.info(
"IdealState version changed while waiting for ExternalView to converge, re-calculating the target "
+ "assignment");
Map<String, Map<String, String>> oldAssignment = currentAssignment;
currentAssignment = idealStateRecord.getMapFields();
expectedVersion = idealStateRecord.getVersion();
// If all the segments to be moved remain unchanged (same instance state map) in the new ideal state, apply the
// same target instance state map for these segments to the new ideal state as the target assignment
boolean segmentsToMoveChanged = false;
if (segmentAssignment instanceof StrictRealtimeSegmentAssignment) {
// For StrictRealtimeSegmentAssignment, we need to recompute the target assignment because the assignment for
// new added segments is based on the existing assignment
segmentsToMoveChanged = true;
} else {
for (String segment : segmentsToMove) {
Map<String, String> oldInstanceStateMap = oldAssignment.get(segment);
Map<String, String> currentInstanceStateMap = currentAssignment.get(segment);
// TODO: Consider allowing segment state change from CONSUMING to ONLINE
if (!oldInstanceStateMap.equals(currentInstanceStateMap)) {
tableRebalanceLogger.info(
"Segment state changed in IdealState from: {} to: {} for segment: {}, re-calculating the target "
+ "assignment based on the new IdealState",
oldInstanceStateMap, currentInstanceStateMap, segment);
segmentsToMoveChanged = true;
break;
}
}
}
if (segmentsToMoveChanged) {
try {
// Re-calculate the instance partitions in case the instance configs changed during the rebalance
instancePartitionsMap =
getInstancePartitionsMap(tableConfig, reassignInstances, bootstrap, false,
minimizeDataMovement, tableRebalanceLogger).getLeft();
tierToInstancePartitionsMap =
getTierToInstancePartitionsMap(tableConfig, sortedTiers, reassignInstances, bootstrap, false,
minimizeDataMovement, tableRebalanceLogger).getLeft();
targetAssignment = segmentAssignment.rebalanceTable(currentAssignment, instancePartitionsMap, sortedTiers,
tierToInstancePartitionsMap, rebalanceConfig);
} catch (Exception e) {
onReturnFailure("Caught exception while re-calculating the target assignment, aborting the rebalance", e,
tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while re-calculating the target assignment: " + e, instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
}
} else {
tableRebalanceLogger.info(
"No state change found for segments to be moved, re-calculating the target assignment based on the "
+ "previous target assignment");
Map<String, Map<String, String>> oldTargetAssignment = targetAssignment;
// Other instance assignment code returns a TreeMap to keep it sorted, doing the same here
targetAssignment = new TreeMap<>(currentAssignment);
for (String segment : segmentsToMove) {
targetAssignment.put(segment, oldTargetAssignment.get(segment));
}
}
}
if (currentAssignment.equals(targetAssignment)) {
String msg =
"Finished rebalancing with minAvailableReplicas: " + minAvailableReplicas + ", enableStrictReplicaGroup: "
+ enableStrictReplicaGroup + ", bestEfforts: " + bestEfforts + " in " + (System.currentTimeMillis()
- startTimeMs) + " ms.";
tableRebalanceLogger.info(msg);
// Record completion
_tableRebalanceObserver.onSuccess(msg);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Success with minAvailableReplicas: " + minAvailableReplicas
+ " (both IdealState and ExternalView should reach the target segment assignment)",
instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
}
// Record change of current ideal state and the new target
rebalanceContext = new TableRebalanceObserver.RebalanceContext(estimatedAverageSegmentSizeInBytes,
allSegmentsFromIdealState, null);
_tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, currentAssignment,
targetAssignment, rebalanceContext);
if (_tableRebalanceObserver.isStopped()) {
return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(),
"Rebalance has stopped already before updating the IdealState", instancePartitionsMap,
tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
}
Map<String, Map<String, String>> nextAssignment =
getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup,
lowDiskMode);
tableRebalanceLogger.info(
"Got the next assignment with number of segments to be added/removed for each instance: {}",
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, nextAssignment));
// Record change of current ideal state and the next assignment
_tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER,
currentAssignment, nextAssignment, rebalanceContext);
if (_tableRebalanceObserver.isStopped()) {
return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(),
"Rebalance has stopped already before updating the IdealState with the next assignment",
instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult);
}
// Reuse current IdealState to update the IdealState in cluster
idealStateRecord.setMapFields(nextAssignment);
idealState.setNumPartitions(nextAssignment.size());
idealState.setReplicas(Integer.toString(nextAssignment.values().iterator().next().size()));
// Check version and update IdealState
try {
Preconditions.checkState(_helixDataAccessor.getBaseDataAccessor()
.set(idealStatePropertyKey.getPath(), idealStateRecord, expectedVersion, AccessOption.PERSISTENT),
"Failed to update IdealState");
currentAssignment = nextAssignment;
expectedVersion++;
// IdealState update is successful. Update the segment list as the IDEAL_STATE_CHANGE_TRIGGER should have
// captured the newly added / deleted segments
allSegmentsFromIdealState = currentAssignment.keySet();
tableRebalanceLogger.info("Successfully updated the IdealState");
} catch (ZkBadVersionException e) {
tableRebalanceLogger.info("Version changed while updating IdealState");
// Since IdealState wasn't updated, rollback the stats changes made and continue. There is no need to update
// segmentsToMonitor either since that hasn't changed without the IdealState update
_tableRebalanceObserver.onRollback();
continue;
} catch (Exception e) {
onReturnFailure("Caught exception while updating IdealState, aborting the rebalance", e, tableRebalanceLogger);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult, summaryResult);
}
segmentsToMonitor = new HashSet<>(segmentsToMove);
segmentsToMove = SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
segmentsToMonitor.addAll(segmentsToMove);
}
}