in helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/InstancesAccessor.java [223:386]
private Response batchGetStoppableInstances(String clusterId, JsonNode node, boolean skipZKRead,
boolean continueOnFailures, Set<StoppableCheck.Category> skipHealthCheckCategories,
boolean random) throws IOException {
try {
// TODO: Process input data from the content
// TODO: Implement the logic to automatically detect the selection base. https://github.com/apache/helix/issues/2968#issue-2691677799
InstancesAccessor.InstanceHealthSelectionBase selectionBase =
node.get(InstancesAccessor.InstancesProperties.selection_base.name()) == null
? InstanceHealthSelectionBase.non_zone_based : InstanceHealthSelectionBase.valueOf(
node.get(InstancesAccessor.InstancesProperties.selection_base.name()).textValue());
List<String> instances = OBJECT_MAPPER.readValue(
node.get(InstancesAccessor.InstancesProperties.instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
ClusterService clusterService =
new ClusterServiceImpl(getDataAccssor(clusterId), getConfigAccessor());
List<String> orderOfZone = null;
String customizedInput = null;
List<String> toBeStoppedInstances = Collections.emptyList();
// By default, if skip_stoppable_check_list is unset, all checks are performed to maintain
// backward compatibility with existing clients.
List<HealthCheck> skipStoppableCheckList = Collections.emptyList();
if (node.get(InstancesAccessor.InstancesProperties.customized_values.name()) != null) {
customizedInput =
node.get(InstancesAccessor.InstancesProperties.customized_values.name()).toString();
}
if (node.get(InstancesAccessor.InstancesProperties.zone_order.name()) != null) {
orderOfZone = OBJECT_MAPPER.readValue(
node.get(InstancesAccessor.InstancesProperties.zone_order.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
if (!orderOfZone.isEmpty() && random) {
String message =
"Both 'zone_order' and 'random' parameters are set. Please specify only one option.";
_logger.error(message);
return badRequest(message);
}
if (!orderOfZone.isEmpty() && selectionBase == InstanceHealthSelectionBase.non_zone_based) {
String message =
"'zone_order' is set but 'selection_base' is 'non_zone_based'. Please set 'selection_base' to 'zone_based' or 'cross_zone_based'.";
_logger.error(message);
return badRequest(message);
}
}
if (node.get(InstancesAccessor.InstancesProperties.to_be_stopped_instances.name()) != null) {
toBeStoppedInstances = OBJECT_MAPPER.readValue(
node.get(InstancesProperties.to_be_stopped_instances.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
Set<String> instanceSet = new HashSet<>(instances);
instanceSet.retainAll(toBeStoppedInstances);
if (!instanceSet.isEmpty()) {
String message =
"'to_be_stopped_instances' and 'instances' have intersection: " + instanceSet
+ ". Please make them mutually exclusive.";
_logger.error(message);
return badRequest(message);
}
}
if (node.get(InstancesProperties.skip_stoppable_check_list.name()) != null) {
List<String> list = OBJECT_MAPPER.readValue(
node.get(InstancesProperties.skip_stoppable_check_list.name()).toString(),
OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, String.class));
try {
skipStoppableCheckList =
list.stream().map(HealthCheck::valueOf).collect(Collectors.toList());
} catch (IllegalArgumentException e) {
String message =
"'skip_stoppable_check_list' has invalid check names: " + list
+ ". Supported checks: " + HealthCheck.STOPPABLE_CHECK_LIST;
_logger.error(message, e);
return badRequest(message);
}
}
boolean skipCustomChecksIfNoLiveness = false;
if (node.get(InstancesProperties.skip_custom_check_if_instance_not_alive.name()) != null) {
skipCustomChecksIfNoLiveness = node.get(
InstancesAccessor.InstancesProperties.skip_custom_check_if_instance_not_alive.name())
.asBoolean();
}
ClusterTopology clusterTopology = clusterService.getClusterTopology(clusterId);
if (selectionBase != InstanceHealthSelectionBase.non_zone_based) {
if (!clusterService.isClusterTopologyAware(clusterId)) {
String message = "Cluster " + clusterId
+ " is not topology aware. Please enable the topology in cluster config or set "
+ "'selection_base' to 'non_zone_based'.";
_logger.error(message);
return badRequest(message);
}
// Find instances that lack topology information
Set<String> instancesWithTopology =
clusterTopology.toZoneMapping().entrySet().stream().flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toSet());
Set<String> allInstances = clusterTopology.getAllInstances();
Set<String> topologyUnawareInstances = new HashSet<>(instances).stream().filter(
instance -> !instancesWithTopology.contains(instance) && allInstances.contains(instance))
.collect(Collectors.toSet());
if (!topologyUnawareInstances.isEmpty()) {
String message = "Instances " + topologyUnawareInstances
+ " do not have topology information. Please set topology information in instance config or"
+ " set 'selection_base' to 'non_zone_based'.";
_logger.error(message);
return badRequest(message);
}
}
String namespace = getNamespace();
MaintenanceManagementService maintenanceService =
new MaintenanceManagementService.MaintenanceManagementServiceBuilder()
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.setConfigAccessor(getConfigAccessor())
.setSkipZKRead(skipZKRead)
.setNonBlockingHealthChecks(
continueOnFailures ? Collections.singleton(ALL_HEALTH_CHECK_NONBLOCK) : null)
.setCustomRestClient(CustomRestClientFactory.get())
.setSkipHealthCheckCategories(skipHealthCheckCategories)
.setNamespace(namespace)
.setSkipStoppableHealthCheckList(skipStoppableCheckList)
.setSkipCustomChecksIfNoLiveness(skipCustomChecksIfNoLiveness)
.build();
StoppableInstancesSelector stoppableInstancesSelector =
new StoppableInstancesSelector.StoppableInstancesSelectorBuilder()
.setClusterId(clusterId)
.setOrderOfZone(orderOfZone)
.setCustomizedInput(customizedInput)
.setMaintenanceService(maintenanceService)
.setClusterTopology(clusterTopology)
.setDataAccessor((ZKHelixDataAccessor) getDataAccssor(clusterId))
.build();
ObjectNode result;
switch (selectionBase) {
case zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesInSingleZone(instances, toBeStoppedInstances);
break;
case cross_zone_based:
stoppableInstancesSelector.calculateOrderOfZone(instances, random);
result = stoppableInstancesSelector.getStoppableInstancesCrossZones(instances, toBeStoppedInstances);
break;
case non_zone_based:
result = stoppableInstancesSelector.getStoppableInstancesNonZoneBased(instances, toBeStoppedInstances);
break;
default:
throw new UnsupportedOperationException("instance_based selection is not supported yet!");
}
return JSONRepresentation(result);
} catch (HelixException e) {
_logger
.error(String.format("Current cluster %s has issue with health checks!", clusterId), e);
throw new HelixHealthException(e);
} catch (Exception e) {
_logger.error(String.format(
"Failed to get parallel stoppable instances for cluster %s with a HelixException!",
clusterId), e);
throw e;
}
}