private void sendStateTransitionMessage()

in helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java [995:1113]


  private void sendStateTransitionMessage(String clusterName, String instanceName,
      String resourceName, List<String> partitionNames, StateTransitionType stateTransitionType) {
    HelixDataAccessor accessor =
        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();

    // check the instance is alive
    LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
    if (liveInstance == null) {
      // check if the instance exists in the cluster
      String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
      throw new HelixException(String.format(
          (_zkClient.exists(instanceConfigPath) ? SetPartitionFailureReason.INSTANCE_NOT_ALIVE
              : SetPartitionFailureReason.INSTANCE_NON_EXISTENT).getMessage(resourceName,
                  partitionNames, instanceName, instanceName, clusterName, stateTransitionType)));
    }

    // check resource exists in ideal state
    IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
    if (idealState == null) {
      throw new HelixException(
          String.format(SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName,
              partitionNames, instanceName, resourceName, clusterName, stateTransitionType)));
    }

    // check partition exists in resource
    Set<String> partitionsNames = new HashSet<String>(partitionNames);
    Set<String> partitions = (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED)
        ? idealState.getRecord().getMapFields().keySet()
        : idealState.getRecord().getListFields().keySet();
    if (!partitions.containsAll(partitionsNames)) {
      throw new HelixException(
          String.format(SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName,
              partitionNames, instanceName, partitionNames.toString(), clusterName, stateTransitionType)));
    }

    // check partition is in ERROR state if reset is set to True
    String sessionId = liveInstance.getEphemeralOwner();
    CurrentState curState =
        accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName));
    if (stateTransitionType.equals(StateTransitionType.RESET)) {
      for (String partitionName : partitionNames) {
        if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) {
          throw new HelixException(String.format(
              SetPartitionFailureReason.PARTITION_NOT_ERROR.getMessage(resourceName, partitionNames,
                  instanceName, partitionNames.toString(), clusterName, stateTransitionType)));
        }
      }
    }

    // check stateModelDef exists
    String stateModelDef = idealState.getStateModelDefRef();
    StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
    if (stateModel == null) {
      throw new HelixException(
          String.format(SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName,
              partitionNames, instanceName, stateModelDef, clusterName, stateTransitionType)));
    }

    // check there is no pending messages for the partitions exist
    List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
    for (Message message : messages) {
      if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
          || !sessionId.equals(message.getTgtSessionId())
          || !resourceName.equals(message.getResourceName())
          || !partitionsNames.contains(message.getPartitionName())) {
        continue;
      }

      throw new HelixException(String.format(
          "Can't %s state for %s.%s on %s, because a pending message %s exists for resource %s",
          stateTransitionType.name(), resourceName, partitionNames, instanceName, message,
          message.getResourceName()));
    }

    String adminName = null;
    try {
      adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
    } catch (UnknownHostException e) {
      logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
      adminName = "UNKNOWN";
    }

    List<Message> stateTransitionMessages = new ArrayList<Message>();
    List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
    for (String partitionName : partitionNames) {
      String msgId = UUID.randomUUID().toString();
      Message message = new Message(MessageType.STATE_TRANSITION, msgId);
      message.setSrcName(adminName);
      message.setTgtName(instanceName);
      message.setMsgState(MessageState.NEW);
      message.setPartitionName(partitionName);
      message.setResourceName(resourceName);
      message.setTgtSessionId(sessionId);
      message.setStateModelDef(stateModelDef);
      message.setStateModelFactoryName(idealState.getStateModelFactoryName());
      // if reset == TRUE, send ERROR to initialState message
      // else, send * to ERROR state message
      if (stateTransitionType.equals(StateTransitionType.RESET)) {
        message.setFromState(HelixDefinedState.ERROR.toString());
        message.setToState(stateModel.getInitialState());
      }
      if (stateTransitionType.equals(StateTransitionType.SET_TO_ERROR)) {
        message.setFromState("*");
        message.setToState(HelixDefinedState.ERROR.toString());
      }
      if (idealState.getResourceGroupName() != null) {
        message.setResourceGroupName(idealState.getResourceGroupName());
      }
      if (idealState.getInstanceGroupTag() != null) {
        message.setResourceTag(idealState.getInstanceGroupTag());
      }

      stateTransitionMessages.add(message);
      messageKeys.add(keyBuilder.message(instanceName, message.getId()));
    }

    accessor.setChildren(messageKeys, stateTransitionMessages);
  }