in pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java [661:871]
private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, String>> currentAssignment,
Map<String, Map<String, String>> targetAssignment, String tableNameWithType,
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, TableConfig tableConfig,
Logger tableRebalanceLogger) {
tableRebalanceLogger.info("Calculating rebalance summary");
boolean isOfflineTable = TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == TableType.OFFLINE;
int existingReplicationFactor = 0;
int newReplicationFactor = 0;
Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
Map<String, Set<String>> existingServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>();
Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable ? null : new HashMap<>();
for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) {
existingReplicationFactor = entrySet.getValue().size();
String segmentName = entrySet.getKey();
Collection<String> segmentStates = entrySet.getValue().values();
boolean isSegmentConsuming = existingServersToConsumingSegmentMap != null && segmentStates.stream()
.noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && segmentStates.stream()
.anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));
for (String instanceName : entrySet.getValue().keySet()) {
existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName);
if (isSegmentConsuming) {
existingServersToConsumingSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName);
}
}
}
for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) {
newReplicationFactor = entrySet.getValue().size();
String segmentName = entrySet.getKey();
Collection<String> segmentStates = entrySet.getValue().values();
boolean isSegmentConsuming = existingServersToConsumingSegmentMap != null && segmentStates.stream()
.noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && segmentStates.stream()
.anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));
for (String instanceName : entrySet.getValue().keySet()) {
newServersToSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName);
if (isSegmentConsuming) {
newServersToConsumingSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(segmentName);
}
}
}
RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
= new RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, newReplicationFactor);
int existingNumServers = existingServersToSegmentMap.size();
int newNumServers = newServersToSegmentMap.size();
RebalanceSummaryResult.RebalanceChangeInfo numServers
= new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, newNumServers);
List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
_helixDataAccessor.keyBuilder().instanceConfigs(), true);
Map<String, List<String>> instanceToTagsMap = new HashMap<>();
for (InstanceConfig instanceConfig : instanceConfigs) {
instanceToTagsMap.put(instanceConfig.getInstanceName(), instanceConfig.getTags());
}
Set<String> serversAdded = new HashSet<>();
Set<String> serversRemoved = new HashSet<>();
Set<String> serversUnchanged = new HashSet<>();
Set<String> serversGettingNewSegments = new HashSet<>();
Map<String, RebalanceSummaryResult.TagInfo> tagsInfoMap = new HashMap<>();
String serverTenantName = tableConfig.getTenantConfig().getServer();
if (serverTenantName != null) {
String serverTenantTag =
TagNameUtils.getServerTagForTenant(serverTenantName, tableConfig.getTableType());
tagsInfoMap.put(serverTenantTag,
new RebalanceSummaryResult.TagInfo(serverTenantTag));
}
TagOverrideConfig tagOverrideConfig = tableConfig.getTenantConfig().getTagOverrideConfig();
if (tagOverrideConfig != null) {
String completedTag = tagOverrideConfig.getRealtimeCompleted();
String consumingTag = tagOverrideConfig.getRealtimeConsuming();
if (completedTag != null) {
tagsInfoMap.put(completedTag, new RebalanceSummaryResult.TagInfo(completedTag));
}
if (consumingTag != null) {
tagsInfoMap.put(consumingTag, new RebalanceSummaryResult.TagInfo(consumingTag));
}
}
if (tableConfig.getInstanceAssignmentConfigMap() != null) {
// for simplicity, including all segment types present in instanceAssignmentConfigMap
tableConfig.getInstanceAssignmentConfigMap().values().forEach(instanceAssignmentConfig -> {
String tag = instanceAssignmentConfig.getTagPoolConfig().getTag();
tagsInfoMap.put(tag, new RebalanceSummaryResult.TagInfo(tag));
});
}
if (tableConfig.getTierConfigsList() != null) {
tableConfig.getTierConfigsList().forEach(tierConfig -> {
String tierTag = tierConfig.getServerTag();
tagsInfoMap.put(tierTag, new RebalanceSummaryResult.TagInfo(tierTag));
});
}
Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = new HashMap<>();
int segmentsNotMoved = 0;
int totalSegmentsToBeDeleted = 0;
int maxSegmentsAddedToServer = 0;
for (Map.Entry<String, Set<String>> entry : newServersToSegmentMap.entrySet()) {
String server = entry.getKey();
Set<String> segmentSet = entry.getValue();
int totalNewSegments = segmentSet.size();
Set<String> newSegmentSet = new HashSet<>(segmentSet);
Set<String> existingSegmentSet = new HashSet<>();
int segmentsUnchanged = 0;
int totalExistingSegments = 0;
RebalanceSummaryResult.ServerStatus serverStatus = RebalanceSummaryResult.ServerStatus.ADDED;
if (existingServersToSegmentMap.containsKey(server)) {
Set<String> segmentSetForServer = existingServersToSegmentMap.get(server);
totalExistingSegments = segmentSetForServer.size();
existingSegmentSet.addAll(segmentSetForServer);
Set<String> intersection = new HashSet<>(segmentSetForServer);
intersection.retainAll(newSegmentSet);
segmentsUnchanged = intersection.size();
segmentsNotMoved += segmentsUnchanged;
serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
serversUnchanged.add(server);
} else {
serversAdded.add(server);
}
newSegmentSet.removeAll(existingSegmentSet);
int segmentsAdded = newSegmentSet.size();
if (segmentsAdded > 0) {
serversGettingNewSegments.add(server);
}
maxSegmentsAddedToServer = Math.max(maxSegmentsAddedToServer, segmentsAdded);
int segmentsDeleted = existingSegmentSet.size() - segmentsUnchanged;
totalSegmentsToBeDeleted += segmentsDeleted;
serverSegmentChangeInfoMap.put(server, new RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus,
totalNewSegments, totalExistingSegments, segmentsAdded, segmentsDeleted, segmentsUnchanged,
instanceToTagsMap.getOrDefault(server, null)));
List<String> serverTags = getServerTag(server);
Set<String> relevantTags = new HashSet<>(serverTags);
relevantTags.retainAll(tagsInfoMap.keySet());
// The segments remain unchanged or need to download will be accounted to every tag associated with this
// server instance
if (relevantTags.isEmpty()) {
// this could happen when server's tags changed but reassignInstance=false in the rebalance config
tableRebalanceLogger.warn("Server: {} was assigned but does not have any relevant tags", server);
RebalanceSummaryResult.TagInfo tagsInfo =
tagsInfoMap.computeIfAbsent(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS,
RebalanceSummaryResult.TagInfo::new);
tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged);
tagsInfo.increaseNumSegmentsToDownload(segmentsAdded);
tagsInfo.increaseNumServerParticipants(1);
} else {
for (String tag : relevantTags) {
RebalanceSummaryResult.TagInfo tagsInfo = tagsInfoMap.get(tag);
tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged);
tagsInfo.increaseNumSegmentsToDownload(segmentsAdded);
tagsInfo.increaseNumServerParticipants(1);
}
}
}
for (Map.Entry<String, Set<String>> entry : existingServersToSegmentMap.entrySet()) {
String server = entry.getKey();
if (!serverSegmentChangeInfoMap.containsKey(server)) {
serversRemoved.add(server);
serverSegmentChangeInfoMap.put(server, new RebalanceSummaryResult.ServerSegmentChangeInfo(
RebalanceSummaryResult.ServerStatus.REMOVED, 0, entry.getValue().size(), 0, entry.getValue().size(), 0,
instanceToTagsMap.getOrDefault(server, null)));
totalSegmentsToBeDeleted += entry.getValue().size();
}
}
if (existingServersToConsumingSegmentMap != null && newServersToConsumingSegmentMap != null) {
// turn the map into {server: added consuming segments}
for (Map.Entry<String, Set<String>> entry : newServersToConsumingSegmentMap.entrySet()) {
String server = entry.getKey();
entry.getValue().removeAll(existingServersToConsumingSegmentMap.getOrDefault(server, Collections.emptySet()));
}
newServersToConsumingSegmentMap.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
RebalanceSummaryResult.RebalanceChangeInfo numSegmentsInSingleReplica
= new RebalanceSummaryResult.RebalanceChangeInfo(currentAssignment.size(), targetAssignment.size());
int existingNumberSegmentsTotal = existingReplicationFactor * currentAssignment.size();
int newNumberSegmentsTotal = newReplicationFactor * targetAssignment.size();
RebalanceSummaryResult.RebalanceChangeInfo numSegmentsAcrossAllReplicas
= new RebalanceSummaryResult.RebalanceChangeInfo(existingNumberSegmentsTotal, newNumberSegmentsTotal);
int totalSegmentsToBeAdded = newNumberSegmentsTotal - segmentsNotMoved;
long tableSizePerReplicaInBytes = calculateTableSizePerReplicaInBytes(tableSubTypeSizeDetails);
long averageSegmentSizeInBytes = tableSizePerReplicaInBytes <= 0 ? tableSizePerReplicaInBytes
: tableSizePerReplicaInBytes / ((long) currentAssignment.size());
long totalEstimatedDataToBeMovedInBytes = tableSizePerReplicaInBytes <= 0 ? tableSizePerReplicaInBytes
: ((long) totalSegmentsToBeAdded) * averageSegmentSizeInBytes;
// Set some of the sets to null if they are empty to ensure they don't show up in the result
RebalanceSummaryResult.ServerInfo serverInfo = new RebalanceSummaryResult.ServerInfo(
serversGettingNewSegments.size(), numServers, serversAdded, serversRemoved, serversUnchanged,
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary =
isOfflineTable ? null
: getConsumingSegmentSummary(tableConfig, newServersToConsumingSegmentMap, tableRebalanceLogger);
RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeAdded,
totalSegmentsToBeDeleted, maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes, replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
tableRebalanceLogger.info("Calculated rebalance summary");
return new RebalanceSummaryResult(serverInfo, segmentInfo, new ArrayList<>(tagsInfoMap.values()));
}