private static Map getAssignmentForWagedFullAutoImpl()

in helix-core/src/main/java/org/apache/helix/util/HelixUtil.java [255:362]


  private static Map<String, ResourceAssignment> getAssignmentForWagedFullAutoImpl(
      ZkBucketDataAccessor zkBucketDataAccessor, BaseDataAccessor<ZNRecord> baseDataAccessor,
      ClusterConfig clusterConfig, List<InstanceConfig> instanceConfigs, List<String> liveInstances,
      List<IdealState> idealStates, List<ResourceConfig> resourceConfigs, boolean usePrefLists) {

    // Copy the cluster config and make globalRebalance happen synchronously
    // Otherwise, globalRebalance may not complete and this util might end up returning
    // an empty assignment.
    ClusterConfig globalSyncClusterConfig = new ClusterConfig(clusterConfig.getRecord());
    globalSyncClusterConfig.setGlobalRebalanceAsyncMode(false);

    // Prepare a data accessor for a dataProvider (cache) refresh
    HelixDataAccessor helixDataAccessor =
        new ZKHelixDataAccessor(globalSyncClusterConfig.getClusterName(), baseDataAccessor);

    // Create an instance of read-only WAGED rebalancer
    ReadOnlyWagedRebalancer readOnlyWagedRebalancer =
        new ReadOnlyWagedRebalancer(zkBucketDataAccessor, globalSyncClusterConfig.getClusterName(),
            globalSyncClusterConfig.getGlobalRebalancePreference());

    // Use a dummy event to run the required stages for BestPossibleState calculation
    // Attributes RESOURCES and RESOURCES_TO_REBALANCE are populated in ResourceComputationStage
    ClusterEvent event =
        new ClusterEvent(globalSyncClusterConfig.getClusterName(), ClusterEventType.Unknown);

    try {
      // First, prepare waged rebalancer with a snapshot, so that it can react on the difference
      // between the current snapshot and the provided parameters which act as the new snapshot
      ResourceControllerDataProvider dataProvider =
          new ResourceControllerDataProvider(globalSyncClusterConfig.getClusterName());
      dataProvider.requireFullRefresh();
      dataProvider.refresh(helixDataAccessor);
      readOnlyWagedRebalancer.updateChangeDetectorSnapshots(dataProvider);
      // Refresh dataProvider completely to populate _refreshedChangeTypes
      dataProvider.requireFullRefresh();
      dataProvider.refresh(helixDataAccessor);

      dataProvider.setClusterConfig(globalSyncClusterConfig);
      dataProvider.setInstanceConfigMap(instanceConfigs.stream()
          .collect(Collectors.toMap(InstanceConfig::getInstanceName, Function.identity())));
      // For LiveInstances, we must preserve the existing session IDs
      // So read LiveInstance objects from the cluster and do a "retainAll" on them
      // assignableLiveInstanceMap is an unmodifiableMap instances, so we filter using a stream
      Map<String, LiveInstance> assignableLiveInstanceMap = dataProvider.getAssignableLiveInstances();
      List<LiveInstance> filteredLiveInstances = assignableLiveInstanceMap.entrySet().stream()
          .filter(entry -> liveInstances.contains(entry.getKey())).map(Map.Entry::getValue)
          .collect(Collectors.toList());
      // Synthetically create LiveInstance objects that are passed in as the parameter
      // First, determine which new LiveInstance objects need to be created
      List<String> liveInstanceList = new ArrayList<>(liveInstances);
      liveInstanceList.removeAll(filteredLiveInstances.stream().map(LiveInstance::getInstanceName)
          .collect(Collectors.toList()));
      liveInstanceList.forEach(liveInstanceName -> {
        // Create a new LiveInstance object and give it a random UUID as a session ID
        LiveInstance newLiveInstanceObj = new LiveInstance(liveInstanceName);
        newLiveInstanceObj.getRecord()
            .setSimpleField(LiveInstance.LiveInstanceProperty.SESSION_ID.name(),
                UUID.randomUUID().toString().replace("-", ""));
        filteredLiveInstances.add(newLiveInstanceObj);
      });
      dataProvider.setLiveInstances(new ArrayList<>(filteredLiveInstances));
      dataProvider.setIdealStates(idealStates);
      dataProvider.setResourceConfigMap(resourceConfigs.stream()
          .collect(Collectors.toMap(ResourceConfig::getResourceName, Function.identity())));

      event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);
      event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(), readOnlyWagedRebalancer);

      // Run the required stages to obtain the BestPossibleOutput
      RebalanceUtil.runStage(event, new ResourceComputationStage());
      RebalanceUtil.runStage(event, new CurrentStateComputationStage());
      RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
    } catch (Exception e) {
      LOG.error("getIdealAssignmentForWagedFullAuto(): Failed to compute ResourceAssignments!", e);
    } finally {
      // Close all ZK connections
      readOnlyWagedRebalancer.close();
    }

    // Convert the resulting BestPossibleStateOutput to Map<String, ResourceAssignment>
    Map<String, ResourceAssignment> result = new HashMap<>();
    BestPossibleStateOutput output = event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
    if (output == null || (output.getPreferenceLists() == null && output.getResourceStatesMap()
        .isEmpty())) {
      throw new HelixException(
          "getIdealAssignmentForWagedFullAuto(): Calculation failed: Failed to compute BestPossibleState!");
    }
    for (IdealState idealState : idealStates) {
      String resourceName = idealState.getResourceName();
      StateModelDefinition stateModelDefinition =
          BuiltInStateModelDefinitions.valueOf(idealState.getStateModelDefRef())
              .getStateModelDefinition();
      PartitionStateMap partitionStateMap = output.getPartitionStateMap(resourceName);
      ResourceAssignment resourceAssignment = new ResourceAssignment(resourceName);
      for (String partitionName : idealState.getPartitionSet()) {
        Partition partition = new Partition(partitionName);
        if (usePrefLists) {
          resourceAssignment.addReplicaMap(partition,
              computeIdealMapping(output.getPreferenceList(resourceName, partitionName),
                  stateModelDefinition, new HashSet<>(liveInstances)));
        } else {
          resourceAssignment.addReplicaMap(partition, partitionStateMap.getPartitionMap(partition));
        }
      }
      result.put(resourceName, resourceAssignment);
    }
    return result;
  }