in helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java [484:607]
private boolean canCompleteSwap(String clusterName, String swapOutInstanceName,
String swapInInstanceName) {
BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
// 1. Check that swap-in instance is live and enabled.
LiveInstance swapOutLiveInstance =
accessor.getProperty(keyBuilder.liveInstance(swapOutInstanceName));
LiveInstance swapInLiveInstance =
accessor.getProperty(keyBuilder.liveInstance(swapInInstanceName));
InstanceConfig swapOutInstanceConfig = getInstanceConfig(clusterName, swapOutInstanceName);
InstanceConfig swapInInstanceConfig = getInstanceConfig(clusterName, swapInInstanceName);
if (swapInLiveInstance == null) {
logger.warn(
"SwapOutInstance {} is {} + {} and SwapInInstance {} is OFFLINE + {} for cluster {}. Swap will"
+ " not complete unless SwapInInstance instance is ONLINE.",
swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE",
swapOutInstanceConfig.getInstanceOperation().getOperation(), swapInInstanceName,
swapInInstanceConfig.getInstanceOperation().getOperation(), clusterName);
return false;
}
// 2. Check that both instances only have one session and are not carrying any over.
// count number of sessions under CurrentState folder. If it is carrying over from prv session,
// then there are > 1 session ZNodes.
List<String> swapOutSessions = baseAccessor.getChildNames(
PropertyPathBuilder.instanceCurrentState(clusterName, swapOutInstanceName), 0);
List<String> swapInSessions = baseAccessor.getChildNames(
PropertyPathBuilder.instanceCurrentState(clusterName, swapInInstanceName), 0);
if (swapOutSessions.size() > 1 || swapInSessions.size() > 1) {
logger.warn(
"SwapOutInstance {} is carrying over from prev session and SwapInInstance {} is carrying over from prev session for cluster {}."
+ " Swap will not complete unless both instances have only one session.",
swapOutInstanceName, swapInInstanceName, clusterName);
return false;
}
// 3. Check that the swapOutInstance has no pending messages.
List<Message> swapOutMessages =
accessor.getChildValues(keyBuilder.messages(swapOutInstanceName), true);
int swapOutPendingMessageCount = swapOutMessages != null ? swapOutMessages.size() : 0;
List<Message> swapInMessages =
accessor.getChildValues(keyBuilder.messages(swapInInstanceName), true);
int swapInPendingMessageCount = swapInMessages != null ? swapInMessages.size() : 0;
if ((swapOutLiveInstance != null && swapOutPendingMessageCount > 0)
|| swapInPendingMessageCount > 0) {
logger.warn(
"SwapOutInstance {} has {} pending messages and SwapInInstance {} has {} pending messages for cluster {}."
+ " Swap will not complete unless both SwapOutInstance(only when live)"
+ " and SwapInInstance have no pending messages unless.",
swapOutInstanceName, swapOutPendingMessageCount, swapInInstanceName,
swapInPendingMessageCount, clusterName);
return false;
}
// 4. If the swap-out instance is not alive or is disabled, we return true without checking
// the current states on the swap-in instance.
if (swapOutLiveInstance == null || swapOutInstanceConfig.getInstanceOperation().getOperation()
.equals(InstanceConstants.InstanceOperation.DISABLE)) {
return true;
}
// 5. Collect a list of all partitions that have a current state on swapOutInstance
String swapOutLastActiveSession = swapOutLiveInstance.getEphemeralOwner();
String swapInActiveSession = swapInLiveInstance.getEphemeralOwner();
// Iterate over all resources with current states on the swapOutInstance
List<String> swapOutResources = baseAccessor.getChildNames(
PropertyPathBuilder.instanceCurrentState(clusterName, swapOutInstanceName,
swapOutLastActiveSession), 0);
for (String swapOutResource : swapOutResources) {
// Get the topState and secondTopStates for the stateModelDef used by the resource.
IdealState idealState = accessor.getProperty(keyBuilder.idealStates(swapOutResource));
StateModelDefinition stateModelDefinition =
accessor.getProperty(keyBuilder.stateModelDef(idealState.getStateModelDefRef()));
String topState = stateModelDefinition.getTopState();
Set<String> secondTopStates = stateModelDefinition.getSecondTopStates();
CurrentState swapOutResourceCurrentState = accessor.getProperty(
keyBuilder.currentState(swapOutInstanceName, swapOutLastActiveSession, swapOutResource));
CurrentState swapInResourceCurrentState = accessor.getProperty(
keyBuilder.currentState(swapInInstanceName, swapInActiveSession, swapOutResource));
// Check to make sure swapInInstance has a current state for the resource
if (swapInResourceCurrentState == null) {
logger.warn(
"SwapOutInstance {} has current state for resource {} but SwapInInstance {} does not for cluster {}."
+ " Swap will not complete unless both instances have current states for all resources.",
swapOutInstanceName, swapOutResource, swapInInstanceName, clusterName);
return false;
}
// Iterate over all partitions in the swapOutInstance's current state for the resource
// and ensure that the swapInInstance has the correct state for the partition.
for (String partitionName : swapOutResourceCurrentState.getPartitionStateMap().keySet()) {
String swapOutPartitionState = swapOutResourceCurrentState.getState(partitionName);
String swapInPartitionState = swapInResourceCurrentState.getState(partitionName);
// SwapInInstance should have the correct state for the partition.
// All states should match except for the case where the topState is not ALL_REPLICAS or ALL_CANDIDATE_NODES
// or the swap-out partition is ERROR state.
// When the topState is not ALL_REPLICAS or ALL_CANDIDATE_NODES, the swap-in partition should be in a secondTopStates.
if (!(swapOutPartitionState.equals(HelixDefinedState.ERROR.name()) || (
topState.equals(swapOutPartitionState) && (
swapOutPartitionState.equals(swapInPartitionState) ||
!ImmutableSet.of(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS,
StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES).contains(
stateModelDefinition.getNumInstancesPerState(
stateModelDefinition.getTopState())) && secondTopStates.contains(
swapInPartitionState))) || swapOutPartitionState.equals(
swapInPartitionState))) {
logger.warn(
"SwapOutInstance {} has partition {} in {} but SwapInInstance {} has partition {} in state {} for cluster {}."
+ " Swap will not complete unless SwapInInstance has partition in correct states.",
swapOutInstanceName, partitionName, swapOutPartitionState, swapInInstanceName,
partitionName, swapInPartitionState, clusterName);
return false;
}
}
}
return true;
}