in helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java [995:1113]
private void sendStateTransitionMessage(String clusterName, String instanceName,
String resourceName, List<String> partitionNames, StateTransitionType stateTransitionType) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
// check the instance is alive
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
if (liveInstance == null) {
// check if the instance exists in the cluster
String instanceConfigPath = PropertyPathBuilder.instanceConfig(clusterName, instanceName);
throw new HelixException(String.format(
(_zkClient.exists(instanceConfigPath) ? SetPartitionFailureReason.INSTANCE_NOT_ALIVE
: SetPartitionFailureReason.INSTANCE_NON_EXISTENT).getMessage(resourceName,
partitionNames, instanceName, instanceName, clusterName, stateTransitionType)));
}
// check resource exists in ideal state
IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName));
if (idealState == null) {
throw new HelixException(
String.format(SetPartitionFailureReason.RESOURCE_NON_EXISTENT.getMessage(resourceName,
partitionNames, instanceName, resourceName, clusterName, stateTransitionType)));
}
// check partition exists in resource
Set<String> partitionsNames = new HashSet<String>(partitionNames);
Set<String> partitions = (idealState.getRebalanceMode() == RebalanceMode.CUSTOMIZED)
? idealState.getRecord().getMapFields().keySet()
: idealState.getRecord().getListFields().keySet();
if (!partitions.containsAll(partitionsNames)) {
throw new HelixException(
String.format(SetPartitionFailureReason.PARTITION_NON_EXISTENT.getMessage(resourceName,
partitionNames, instanceName, partitionNames.toString(), clusterName, stateTransitionType)));
}
// check partition is in ERROR state if reset is set to True
String sessionId = liveInstance.getEphemeralOwner();
CurrentState curState =
accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName));
if (stateTransitionType.equals(StateTransitionType.RESET)) {
for (String partitionName : partitionNames) {
if (!curState.getState(partitionName).equals(HelixDefinedState.ERROR.toString())) {
throw new HelixException(String.format(
SetPartitionFailureReason.PARTITION_NOT_ERROR.getMessage(resourceName, partitionNames,
instanceName, partitionNames.toString(), clusterName, stateTransitionType)));
}
}
}
// check stateModelDef exists
String stateModelDef = idealState.getStateModelDefRef();
StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
if (stateModel == null) {
throw new HelixException(
String.format(SetPartitionFailureReason.STATE_MODEL_NON_EXISTENT.getMessage(resourceName,
partitionNames, instanceName, stateModelDef, clusterName, stateTransitionType)));
}
// check there is no pending messages for the partitions exist
List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true);
for (Message message : messages) {
if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
|| !sessionId.equals(message.getTgtSessionId())
|| !resourceName.equals(message.getResourceName())
|| !partitionsNames.contains(message.getPartitionName())) {
continue;
}
throw new HelixException(String.format(
"Can't %s state for %s.%s on %s, because a pending message %s exists for resource %s",
stateTransitionType.name(), resourceName, partitionNames, instanceName, message,
message.getResourceName()));
}
String adminName = null;
try {
adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
} catch (UnknownHostException e) {
logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
adminName = "UNKNOWN";
}
List<Message> stateTransitionMessages = new ArrayList<Message>();
List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
for (String partitionName : partitionNames) {
String msgId = UUID.randomUUID().toString();
Message message = new Message(MessageType.STATE_TRANSITION, msgId);
message.setSrcName(adminName);
message.setTgtName(instanceName);
message.setMsgState(MessageState.NEW);
message.setPartitionName(partitionName);
message.setResourceName(resourceName);
message.setTgtSessionId(sessionId);
message.setStateModelDef(stateModelDef);
message.setStateModelFactoryName(idealState.getStateModelFactoryName());
// if reset == TRUE, send ERROR to initialState message
// else, send * to ERROR state message
if (stateTransitionType.equals(StateTransitionType.RESET)) {
message.setFromState(HelixDefinedState.ERROR.toString());
message.setToState(stateModel.getInitialState());
}
if (stateTransitionType.equals(StateTransitionType.SET_TO_ERROR)) {
message.setFromState("*");
message.setToState(HelixDefinedState.ERROR.toString());
}
if (idealState.getResourceGroupName() != null) {
message.setResourceGroupName(idealState.getResourceGroupName());
}
if (idealState.getInstanceGroupTag() != null) {
message.setResourceTag(idealState.getInstanceGroupTag());
}
stateTransitionMessages.add(message);
messageKeys.add(keyBuilder.message(instanceName, message.getId()));
}
accessor.setChildren(messageKeys, stateTransitionMessages);
}