private Response batchGetStoppableInstances()

in helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java [223:386]


  private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead,
      boolean continueOnFailures, Set<StoppableCheck.Category> skipHealthCheckCategories,
      boolean random) throws IOException {
    try {
      // TODO: Process input data from the content
      // TODO: Implement the logic to automatically detect the selection base. https://github.com/apache/helix/issues/2968#issue-2691677799
      InstancesAccessor.InstanceHealthSelectionBase selectionBase =
          node.get(InstancesAccessor.InstancesProperties.selection_base.name()) == null
              ? InstanceHealthSelectionBase.non_zone_based : InstanceHealthSelectionBase.valueOf(
              node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue());

      List<String> instances = OBJECT_MAPPER.readValue(
          node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
          OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
      ClusterService clusterService =
          new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());

      List<String> orderOfZone = null;
      String customizedInput = null;
      List<String> toBeStoppedInstances = Collections.emptyList();
      // By default, if skip_stoppable_check_list is unset, all checks are performed to maintain
      // backward compatibility with existing clients.
      List<HealthCheck> skipStoppableCheckList = Collections.emptyList();
      if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) {
        customizedInput =
            node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString();
      }

      if (node.get(InstancesAccessor.InstancesProperties.zone_order.name()) != null) {
        orderOfZone = OBJECT_MAPPER.readValue(
            node.get(InstancesAccessor.InstancesProperties.zone_order.name()).toString(),
            OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
        if (!orderOfZone.isEmpty() && random) {
          String message =
              "Both 'zone_order' and 'random' parameters are set. Please specify only one option.";
          _logger.error(message);
          return badRequest(message);
        }
        if (!orderOfZone.isEmpty() && selectionBase == InstanceHealthSelectionBase.non_zone_based) {
          String message =
              "'zone_order' is set but 'selection_base' is 'non_zone_based'. Please set 'selection_base' to 'zone_based' or 'cross_zone_based'.";
          _logger.error(message);
          return badRequest(message);
        }
      }

      if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) {
        toBeStoppedInstances = OBJECT_MAPPER.readValue(
            node.get(InstancesProperties.to_be_stopped_instances.name()).toString(),
            OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
        Set<String> instanceSet = new HashSet<>(instances);
        instanceSet.retainAll(toBeStoppedInstances);
        if (!instanceSet.isEmpty()) {
          String message =
              "'to_be_stopped_instances' and 'instances' have intersection: " + instanceSet
                  + ". Please make them mutually exclusive.";
          _logger.error(message);
          return badRequest(message);
        }
      }

      if (node.get(InstancesProperties.skip_stoppable_check_list.name()) != null) {
        List<String> list = OBJECT_MAPPER.readValue(
            node.get(InstancesProperties.skip_stoppable_check_list.name()).toString(),
            OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
        try {
          skipStoppableCheckList =
              list.stream().map(HealthCheck::valueOf).collect(Collectors.toList());
        } catch (IllegalArgumentException e) {
          String message =
              "'skip_stoppable_check_list' has invalid check names: " + list
                  + ". Supported checks: " + HealthCheck.STOPPABLE_CHECK_LIST;
          _logger.error(message, e);
          return badRequest(message);
        }
      }

      boolean skipCustomChecksIfNoLiveness = false;
      if (node.get(InstancesProperties.skip_custom_check_if_instance_not_alive.name()) != null) {
        skipCustomChecksIfNoLiveness = node.get(
                InstancesAccessor.InstancesProperties.skip_custom_check_if_instance_not_alive.name())
            .asBoolean();
      }

      ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
      if (selectionBase != InstanceHealthSelectionBase.non_zone_based) {
        if (!clusterService.isClusterTopologyAware(clusterId)) {
          String message = "Cluster " + clusterId
              + " is not topology aware. Please enable the topology in cluster config or set "
              + "'selection_base' to 'non_zone_based'.";
          _logger.error(message);
          return badRequest(message);
        }

        // Find instances that lack topology information
        Set<String> instancesWithTopology =
            clusterTopology.toZoneMapping().entrySet().stream().flatMap(entry -> entry.getValue().stream())
                .collect(Collectors.toSet());
        Set<String> allInstances = clusterTopology.getAllInstances();
        Set<String> topologyUnawareInstances = new HashSet<>(instances).stream().filter(
                instance -> !instancesWithTopology.contains(instance) && allInstances.contains(instance))
            .collect(Collectors.toSet());
        if (!topologyUnawareInstances.isEmpty()) {
          String message = "Instances " + topologyUnawareInstances
              + " do not have topology information. Please set topology information in instance config or"
              + " set 'selection_base' to 'non_zone_based'.";
          _logger.error(message);
          return badRequest(message);
        }
      }

      String namespace = getNamespace();
      MaintenanceManagementService maintenanceService =
          new MaintenanceManagementService.MaintenanceManagementServiceBuilder()
              .setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
              .setConfigAccessor(getConfigAccessor())
              .setSkipZKRead(skipZKRead)
              .setNonBlockingHealthChecks(
                  continueOnFailures ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK) : null)
              .setCustomRestClient(CustomRestClientFactory.get())
              .setSkipHealthCheckCategories(skipHealthCheckCategories)
              .setNamespace(namespace)
              .setSkipStoppableHealthCheckList(skipStoppableCheckList)
              .setSkipCustomChecksIfNoLiveness(skipCustomChecksIfNoLiveness)
              .build();

      StoppableInstancesSelector stoppableInstancesSelector =
          new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
              .setClusterId(clusterId)
              .setOrderOfZone(orderOfZone)
              .setCustomizedInput(customizedInput)
              .setMaintenanceService(maintenanceService)
              .setClusterTopology(clusterTopology)
              .setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
              .build();
      ObjectNode result;

      switch (selectionBase) {
        case zone_based:
          stoppableInstancesSelector.calculateOrderOfZone(instances, random);
          result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances);
          break;
        case cross_zone_based:
          stoppableInstancesSelector.calculateOrderOfZone(instances, random);
          result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances);
          break;
        case non_zone_based:
          result = stoppableInstancesSelector.getStoppableInstancesNonZoneBased(instances, toBeStoppedInstances);
          break;
        default:
          throw new UnsupportedOperationException("instance_based selection is not supported yet!");
      }
      return JSONRepresentation(result);
    } catch (HelixException e) {
      _logger
              .error(String.format("Current cluster %s has issue with health checks!", clusterId), e);
      throw new HelixHealthException(e);
    } catch (Exception e) {
      _logger.error(String.format(
              "Failed to get parallel stoppable instances for cluster %s with a HelixException!",
              clusterId), e);
      throw e;
    }
  }