private boolean canCompleteSwap()

in helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java [484:607]


  private boolean canCompleteSwap(String clusterName, String swapOutInstanceName,
      String swapInInstanceName) {
    BaseDataAccessor<ZNRecord> baseAccessor = _baseDataAccessor;
    HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();

    // 1. Check that swap-in instance is live and enabled.
    LiveInstance swapOutLiveInstance =
        accessor.getProperty(keyBuilder.liveInstance(swapOutInstanceName));
    LiveInstance swapInLiveInstance =
        accessor.getProperty(keyBuilder.liveInstance(swapInInstanceName));
    InstanceConfig swapOutInstanceConfig = getInstanceConfig(clusterName, swapOutInstanceName);
    InstanceConfig swapInInstanceConfig = getInstanceConfig(clusterName, swapInInstanceName);
    if (swapInLiveInstance == null) {
      logger.warn(
          "SwapOutInstance {} is {} + {} and SwapInInstance {} is OFFLINE + {} for cluster {}. Swap will"
              + " not complete unless SwapInInstance instance is ONLINE.",
          swapOutInstanceName, swapOutLiveInstance != null ? "ONLINE" : "OFFLINE",
          swapOutInstanceConfig.getInstanceOperation().getOperation(), swapInInstanceName,
          swapInInstanceConfig.getInstanceOperation().getOperation(), clusterName);
      return false;
    }

    // 2. Check that both instances only have one session and are not carrying any over.
    // count number of sessions under CurrentState folder. If it is carrying over from prv session,
    // then there are > 1 session ZNodes.
    List<String> swapOutSessions = baseAccessor.getChildNames(
        PropertyPathBuilder.instanceCurrentState(clusterName, swapOutInstanceName), 0);
    List<String> swapInSessions = baseAccessor.getChildNames(
        PropertyPathBuilder.instanceCurrentState(clusterName, swapInInstanceName), 0);
    if (swapOutSessions.size() > 1 || swapInSessions.size() > 1) {
      logger.warn(
          "SwapOutInstance {} is carrying over from prev session and SwapInInstance {} is carrying over from prev session for cluster {}."
              + " Swap will not complete unless both instances have only one session.",
          swapOutInstanceName, swapInInstanceName, clusterName);
      return false;
    }

    // 3. Check that the swapOutInstance has no pending messages.
    List<Message> swapOutMessages =
        accessor.getChildValues(keyBuilder.messages(swapOutInstanceName), true);
    int swapOutPendingMessageCount = swapOutMessages != null ? swapOutMessages.size() : 0;
    List<Message> swapInMessages =
        accessor.getChildValues(keyBuilder.messages(swapInInstanceName), true);
    int swapInPendingMessageCount = swapInMessages != null ? swapInMessages.size() : 0;
    if ((swapOutLiveInstance != null && swapOutPendingMessageCount > 0)
        || swapInPendingMessageCount > 0) {
      logger.warn(
          "SwapOutInstance {} has {} pending messages and SwapInInstance {} has {} pending messages for cluster {}."
              + " Swap will not complete unless both SwapOutInstance(only when live)"
              + " and SwapInInstance have no pending messages unless.",
          swapOutInstanceName, swapOutPendingMessageCount, swapInInstanceName,
          swapInPendingMessageCount, clusterName);
      return false;
    }

    // 4. If the swap-out instance is not alive or is disabled, we return true without checking
    // the current states on the swap-in instance.
    if (swapOutLiveInstance == null || swapOutInstanceConfig.getInstanceOperation().getOperation()
        .equals(InstanceConstants.InstanceOperation.DISABLE)) {
      return true;
    }

    // 5. Collect a list of all partitions that have a current state on swapOutInstance
    String swapOutLastActiveSession = swapOutLiveInstance.getEphemeralOwner();
    String swapInActiveSession = swapInLiveInstance.getEphemeralOwner();

    // Iterate over all resources with current states on the swapOutInstance
    List<String> swapOutResources = baseAccessor.getChildNames(
        PropertyPathBuilder.instanceCurrentState(clusterName, swapOutInstanceName,
            swapOutLastActiveSession), 0);
    for (String swapOutResource : swapOutResources) {
      // Get the topState and secondTopStates for the stateModelDef used by the resource.
      IdealState idealState = accessor.getProperty(keyBuilder.idealStates(swapOutResource));
      StateModelDefinition stateModelDefinition =
          accessor.getProperty(keyBuilder.stateModelDef(idealState.getStateModelDefRef()));
      String topState = stateModelDefinition.getTopState();
      Set<String> secondTopStates = stateModelDefinition.getSecondTopStates();

      CurrentState swapOutResourceCurrentState = accessor.getProperty(
          keyBuilder.currentState(swapOutInstanceName, swapOutLastActiveSession, swapOutResource));
      CurrentState swapInResourceCurrentState = accessor.getProperty(
          keyBuilder.currentState(swapInInstanceName, swapInActiveSession, swapOutResource));

      // Check to make sure swapInInstance has a current state for the resource
      if (swapInResourceCurrentState == null) {
        logger.warn(
            "SwapOutInstance {} has current state for resource {} but SwapInInstance {} does not for cluster {}."
                + " Swap will not complete unless both instances have current states for all resources.",
            swapOutInstanceName, swapOutResource, swapInInstanceName, clusterName);
        return false;
      }

      // Iterate over all partitions in the swapOutInstance's current state for the resource
      // and ensure that the swapInInstance has the correct state for the partition.
      for (String partitionName : swapOutResourceCurrentState.getPartitionStateMap().keySet()) {
        String swapOutPartitionState = swapOutResourceCurrentState.getState(partitionName);
        String swapInPartitionState = swapInResourceCurrentState.getState(partitionName);

        // SwapInInstance should have the correct state for the partition.
        // All states should match except for the case where the topState is not ALL_REPLICAS or ALL_CANDIDATE_NODES
        // or the swap-out partition is ERROR state.
        // When the topState is not ALL_REPLICAS or ALL_CANDIDATE_NODES, the swap-in partition should be in a secondTopStates.
        if (!(swapOutPartitionState.equals(HelixDefinedState.ERROR.name()) || (
            topState.equals(swapOutPartitionState) && (
                swapOutPartitionState.equals(swapInPartitionState) ||
                    !ImmutableSet.of(StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS,
                        StateModelDefinition.STATE_REPLICA_COUNT_ALL_CANDIDATE_NODES).contains(
                        stateModelDefinition.getNumInstancesPerState(
                            stateModelDefinition.getTopState())) && secondTopStates.contains(
                        swapInPartitionState))) || swapOutPartitionState.equals(
            swapInPartitionState))) {
          logger.warn(
              "SwapOutInstance {} has partition {} in {} but SwapInInstance {} has partition {} in state {} for cluster {}."
                  + " Swap will not complete unless SwapInInstance has partition in correct states.",
              swapOutInstanceName, partitionName, swapOutPartitionState, swapInInstanceName,
              partitionName, swapInPartitionState, clusterName);
          return false;
        }
      }
    }

    return true;
  }