in helix-core/src/main/java/org/apache/helix/util/InstanceValidationUtil.java [417:477]
public static boolean siblingNodesActiveReplicaCheck(HelixDataAccessor dataAccessor,
String instanceName, Set<String> toBeStoppedInstances) {
PropertyKey.Builder propertyKeyBuilder = dataAccessor.keyBuilder();
List<String> resources = dataAccessor.getChildNames(propertyKeyBuilder.idealStates());
for (String resourceName : resources) {
IdealState idealState = dataAccessor.getProperty(propertyKeyBuilder.idealStates(resourceName));
if (idealState == null || !idealState.isEnabled() || !idealState.isValid()
|| TaskConstants.STATE_MODEL_NAME.equals(idealState.getStateModelDefRef())) {
continue;
}
ExternalView externalView =
dataAccessor.getProperty(propertyKeyBuilder.externalView(resourceName));
if (externalView == null) {
throw new HelixException(
String.format("Resource %s does not have external view!", resourceName));
}
// Get the minActiveReplicas constraint for the resource
int minActiveReplicas = externalView.getMinActiveReplicas();
if (minActiveReplicas == -1) {
_logger.warn("Resource {} is missing minActiveReplica field. Skip the sibling check",
resourceName);
continue;
}
String stateModeDef = externalView.getStateModelDefRef();
StateModelDefinition stateModelDefinition =
dataAccessor.getProperty(propertyKeyBuilder.stateModelDef(stateModeDef));
Set<String> unhealthyStates = new HashSet<>(UNHEALTHY_STATES);
if (stateModelDefinition != null) {
unhealthyStates.add(stateModelDefinition.getInitialState());
}
for (String partition : externalView.getPartitionSet()) {
Map<String, String> stateByInstanceMap = externalView.getStateMap(partition);
// found the resource hosted on the instance
if (stateByInstanceMap.containsKey(instanceName)) {
// If this node's replica is in unhealthy state, skip the sibling check as removing this replica will not
// negatively affect availability.
if (unhealthyStates.contains(stateByInstanceMap.get(instanceName))) {
continue;
}
int numHealthySiblings = 0;
for (Map.Entry<String, String> entry : stateByInstanceMap.entrySet()) {
String siblingInstanceName = entry.getKey();
if (!siblingInstanceName.equals(instanceName) && (toBeStoppedInstances == null
|| !toBeStoppedInstances.contains(siblingInstanceName))
&& !unhealthyStates.contains(entry.getValue())) {
numHealthySiblings++;
}
}
if (numHealthySiblings < minActiveReplicas) {
_logger.info(
"Partition {} doesn't have enough active replicas in sibling nodes. NumHealthySiblings: {}, minActiveReplicas: {}",
partition, numHealthySiblings, minActiveReplicas);
return false;
}
}
}
}
return true;
}