in helix-core/src/main/java/org/apache/helix/util/HelixUtil.java [255:362]
private static Map<String, ResourceAssignment> getAssignmentForWagedFullAutoImpl(
ZkBucketDataAccessor zkBucketDataAccessor, BaseDataAccessor<ZNRecord> baseDataAccessor,
ClusterConfig clusterConfig, List<InstanceConfig> instanceConfigs, List<String> liveInstances,
List<IdealState> idealStates, List<ResourceConfig> resourceConfigs, boolean usePrefLists) {
// Copy the cluster config and make globalRebalance happen synchronously
// Otherwise, globalRebalance may not complete and this util might end up returning
// an empty assignment.
ClusterConfig globalSyncClusterConfig = new ClusterConfig(clusterConfig.getRecord());
globalSyncClusterConfig.setGlobalRebalanceAsyncMode(false);
// Prepare a data accessor for a dataProvider (cache) refresh
HelixDataAccessor helixDataAccessor =
new ZKHelixDataAccessor(globalSyncClusterConfig.getClusterName(), baseDataAccessor);
// Create an instance of read-only WAGED rebalancer
ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
new ReadOnlyWagedRebalancer(zkBucketDataAccessor, globalSyncClusterConfig.getClusterName(),
globalSyncClusterConfig.getGlobalRebalancePreference());
// Use a dummy event to run the required stages for BestPossibleState calculation
// Attributes RESOURCES and RESOURCES_TO_REBALANCE are populated in ResourceComputationStage
ClusterEvent event =
new ClusterEvent(globalSyncClusterConfig.getClusterName(), ClusterEventType.Unknown);
try {
// First, prepare waged rebalancer with a snapshot, so that it can react on the difference
// between the current snapshot and the provided parameters which act as the new snapshot
ResourceControllerDataProvider dataProvider =
new ResourceControllerDataProvider(globalSyncClusterConfig.getClusterName());
dataProvider.requireFullRefresh();
dataProvider.refresh(helixDataAccessor);
readOnlyWagedRebalancer.updateChangeDetectorSnapshots(dataProvider);
// Refresh dataProvider completely to populate _refreshedChangeTypes
dataProvider.requireFullRefresh();
dataProvider.refresh(helixDataAccessor);
dataProvider.setClusterConfig(globalSyncClusterConfig);
dataProvider.setInstanceConfigMap(instanceConfigs.stream()
.collect(Collectors.toMap(InstanceConfig::getInstanceName, Function.identity())));
// For LiveInstances, we must preserve the existing session IDs
// So read LiveInstance objects from the cluster and do a "retainAll" on them
// assignableLiveInstanceMap is an unmodifiableMap instances, so we filter using a stream
Map<String, LiveInstance> assignableLiveInstanceMap = dataProvider.getAssignableLiveInstances();
List<LiveInstance> filteredLiveInstances = assignableLiveInstanceMap.entrySet().stream()
.filter(entry -> liveInstances.contains(entry.getKey())).map(Map.Entry::getValue)
.collect(Collectors.toList());
// Synthetically create LiveInstance objects that are passed in as the parameter
// First, determine which new LiveInstance objects need to be created
List<String> liveInstanceList = new ArrayList<>(liveInstances);
liveInstanceList.removeAll(filteredLiveInstances.stream().map(LiveInstance::getInstanceName)
.collect(Collectors.toList()));
liveInstanceList.forEach(liveInstanceName -> {
// Create a new LiveInstance object and give it a random UUID as a session ID
LiveInstance newLiveInstanceObj = new LiveInstance(liveInstanceName);
newLiveInstanceObj.getRecord()
.setSimpleField(LiveInstance.LiveInstanceProperty.SESSION_ID.name(),
UUID.randomUUID().toString().replace("-", ""));
filteredLiveInstances.add(newLiveInstanceObj);
});
dataProvider.setLiveInstances(new ArrayList<>(filteredLiveInstances));
dataProvider.setIdealStates(idealStates);
dataProvider.setResourceConfigMap(resourceConfigs.stream()
.collect(Collectors.toMap(ResourceConfig::getResourceName, Function.identity())));
event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);
event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), readOnlyWagedRebalancer);
// Run the required stages to obtain the BestPossibleOutput
RebalanceUtil.runStage(event, new ResourceComputationStage());
RebalanceUtil.runStage(event, new CurrentStateComputationStage());
RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
} catch (Exception e) {
LOG.error("getIdealAssignmentForWagedFullAuto(): Failed to compute ResourceAssignments!", e);
} finally {
// Close all ZK connections
readOnlyWagedRebalancer.close();
}
// Convert the resulting BestPossibleStateOutput to Map<String, ResourceAssignment>
Map<String, ResourceAssignment> result = new HashMap<>();
BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
if (output == null || (output.getPreferenceLists() == null && output.getResourceStatesMap()
.isEmpty())) {
throw new HelixException(
"getIdealAssignmentForWagedFullAuto(): Calculation failed: Failed to compute BestPossibleState!");
}
for (IdealState idealState : idealStates) {
String resourceName = idealState.getResourceName();
StateModelDefinition stateModelDefinition =
BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
.getStateModelDefinition();
PartitionStateMap partitionStateMap = output.getPartitionStateMap(resourceName);
ResourceAssignment resourceAssignment = new ResourceAssignment(resourceName);
for (String partitionName : idealState.getPartitionSet()) {
Partition partition = new Partition(partitionName);
if (usePrefLists) {
resourceAssignment.addReplicaMap(partition,
computeIdealMapping(output.getPreferenceList(resourceName, partitionName),
stateModelDefinition, new HashSet<>(liveInstances)));
} else {
resourceAssignment.addReplicaMap(partition, partitionStateMap.getPartitionMap(partition));
}
}
result.put(resourceName, resourceAssignment);
}
return result;
}