in helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java [119:291]
private void generateMessage(final Resource resource, final BaseControllerDataProvider cache,
final ResourcesStateMap resourcesStateMap, final CurrentStateOutput currentStateOutput,
final HelixManager manager, final Map<String, String> sessionIdMap,
final ClusterEventType eventType, MessageOutput output,
Map<String, Map<String, Message>> messagesToCleanUp) {
String resourceName = resource.getResourceName();
StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef());
if (stateModelDef == null) {
LogUtil.logError(logger, _eventId,
"State Model Definition null, skip generating messages for resource: " + resourceName);
return;
}
for (Partition partition : resource.getPartitions()) {
Map<String, String> instanceStateMap =
new HashMap<>(resourcesStateMap.getInstanceStateMap(resourceName, partition));
Map<String, String> pendingStateMap =
currentStateOutput.getPendingStateMap(resourceName, partition);
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resourceName, partition);
// The operation is combing pending state with best possible state. Since some replicas have
// been moved from one instance to another, the instance will exist in pending state but not
// best possible. Thus Helix need to check the pending state and cancel it.
for (String instance : pendingStateMap.keySet()) {
if (!instanceStateMap.containsKey(instance)) {
instanceStateMap.put(instance, NO_DESIRED_STATE);
}
}
// Look through the current state map and add DROPPED message if the instance is not in the
// resourceStateMap. This instance may not have had been dropped by the rebalance strategy.
// This check is required to ensure that the instances removed from the ideal state stateMap
// are properly dropped.
for (String instance : currentStateMap.keySet()) {
if (!instanceStateMap.containsKey(instance)) {
instanceStateMap.put(instance, HelixDefinedState.DROPPED.name());
}
}
// we should generate message based on the desired-state priority
// so keep generated messages in a temp map keyed by state
// desired-state->list of generated-messages
Map<String, List<Message>> messageMap = new HashMap<>();
for (String instanceName : instanceStateMap.keySet()) {
Set<Message> staleMessages = cache.getStaleMessagesByInstance(instanceName);
String desiredState = instanceStateMap.get(instanceName);
String currentState =
currentStateOutput.getCurrentState(resourceName, partition, instanceName);
Message pendingMessage =
currentStateOutput.getPendingMessage(resourceName, partition, instanceName);
boolean isCancellationEnabled = cache.getClusterConfig().isStateTransitionCancelEnabled();
Message cancellationMessage =
currentStateOutput.getCancellationMessage(resourceName, partition, instanceName);
String nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
Message message = null;
if (currentState == null) {
currentState = stateModelDef.getInitialState();
nextState = stateModelDef.getNextStateForTransition(currentState, desiredState);
if (desiredState.equals(HelixDefinedState.DROPPED.name())) {
LogUtil.logDebug(logger, _eventId, String
.format("No current state for partition %s in resource %s, skip the drop message",
partition.getPartitionName(), resourceName));
message =
generateCancellationMessageForPendingMessage(desiredState, currentState, nextState,
pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
stateModelDef, cancellationMessage, isCancellationEnabled);
addGeneratedMessageToMap(message, messageMap, eventType, cache, desiredState,
resourceName, partition, currentState, nextState);
// TODO: separate logic of resource/task message generation
if (cache instanceof ResourceControllerDataProvider) {
((ResourceControllerDataProvider) cache)
.invalidateCachedIdealStateMapping(resourceName);
}
continue;
}
}
if (shouldCleanUpPendingMessage(pendingMessage, sessionIdMap, instanceName, currentState,
currentStateOutput.getEndTime(resourceName, partition, instanceName))) {
logAndAddToCleanUp(messagesToCleanUp, pendingMessage, instanceName, resourceName,
partition, currentState, PENDING_MESSAGE);
}
for (Message staleMessage : staleMessages) {
// staleMessage can be simple or batch mode
if ((System.currentTimeMillis() - currentStateOutput
.getEndTime(resourceName, partition, instanceName) > DEFAULT_OBSELETE_MSG_PURGE_DELAY)
&& staleMessage.getResourceName().equals(resourceName) && sessionIdMap
.containsKey(instanceName) && (
staleMessage.getPartitionName().equals(partition.getPartitionName()) || (
staleMessage.getBatchMessageMode() && staleMessage.getPartitionNames()
.contains(partition.getPartitionName())))) {
logAndAddToCleanUp(messagesToCleanUp, staleMessage, instanceName, resourceName,
partition, currentState, STALE_MESSAGE);
}
}
if (desiredState.equals(NO_DESIRED_STATE) || desiredState.equalsIgnoreCase(currentState)) {
if (shouldCreateSTCancellation(pendingMessage, desiredState,
stateModelDef.getInitialState())) {
message = MessageUtil
.createStateTransitionCancellationMessage(manager.getInstanceName(),
manager.getSessionId(), resource, partition.getPartitionName(), instanceName,
sessionIdMap.get(instanceName), stateModelDef.getId(),
pendingMessage.getFromState(), pendingMessage.getToState(), null,
cancellationMessage, isCancellationEnabled, currentState);
}
} else {
if (nextState == null) {
LogUtil.logError(logger, _eventId,
"Unable to find a next state for resource: " + resource.getResourceName()
+ " partition: " + partition.getPartitionName() + " from stateModelDefinition"
+ stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
continue;
}
if (pendingMessage != null) {
message =
generateCancellationMessageForPendingMessage(desiredState, currentState, nextState,
pendingMessage, manager, resource, partition, sessionIdMap, instanceName,
stateModelDef, cancellationMessage, isCancellationEnabled);
} else {
// Create new state transition message
message = MessageUtil
.createStateTransitionMessage(manager.getInstanceName(), manager.getSessionId(),
resource, partition.getPartitionName(), instanceName, currentState, nextState,
sessionIdMap.get(instanceName), stateModelDef.getId());
if (logger.isDebugEnabled()) {
LogUtil.logDebug(logger, _eventId, String.format(
"Resource %s partition %s for instance %s with currentState %s and nextState %s",
resource.getResourceName(), partition.getPartitionName(), instanceName,
currentState, nextState));
}
}
}
addGeneratedMessageToMap(message, messageMap, eventType, cache, desiredState, resourceName,
partition, currentState, nextState);
}
// add generated messages to output according to state priority
List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
for (String state : statesPriorityList) {
if (messageMap.containsKey(state)) {
for (Message message : messageMap.get(state)) {
// This is for a bug where a message's target session id is null
if (!message.isValid()) {
LogUtil.logError(logger, _eventId, String.format(
"An invalid message was generated! Discarding this message. sessionIdMap: %s, CurrentStateMap: %s, InstanceStateMap: %s, AllInstances: %s, LiveInstances: %s, Message: %s",
sessionIdMap, currentStateOutput.getCurrentStateMap(resourceName, partition),
instanceStateMap, cache.getAllInstances(),
cache.getLiveInstances().keySet(),
message));
continue; // Do not add this message
}
output.addMessage(resourceName, partition, message);
}
}
}
} // end of for-each-partition
}