in helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java [140:240]
private void updatePendingMessages(LiveInstance instance, BaseControllerDataProvider cache,
Collection<Message> pendingMessages, Collection<Message> pendingRelayMessages,
Set<Message> existingStaleMessages, CurrentStateOutput currentStateOutput,
Map<String, Resource> resourceMap) {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
// update all pending messages
for (Message message : pendingMessages) {
// ignore existing stale messages
if (existingStaleMessages.contains(message)) {
continue;
}
if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
&& !MessageType.STATE_TRANSITION_CANCELLATION.name()
.equalsIgnoreCase(message.getMsgType())) {
continue;
}
if (!instanceSessionId.equals(message.getTgtSessionId())) {
continue;
}
String resourceName = message.getResourceName();
Resource resource = resourceMap.get(resourceName);
if (resource == null) {
LogUtil.logDebug(LOG, _eventId, String.format(
"Ignore a pending relay message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
continue;
}
if (!message.getBatchMessageMode()) {
String partitionName = message.getPartitionName();
Partition partition = resource.getPartition(partitionName);
if (partition != null) {
String currentState = currentStateOutput.getCurrentState(resourceName, partition,
instanceName);
if (_isTaskFrameworkPipeline || !isStaleMessage(message, currentState)) {
setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
} else {
cache.addStaleMessage(instanceName, message);
}
} else {
LogUtil.logDebug(LOG, _eventId, String
.format("Ignore a pending message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
}
} else {
List<String> partitionNames = message.getPartitionNames();
if (!partitionNames.isEmpty()) {
for (String partitionName : partitionNames) {
Partition partition = resource.getPartition(partitionName);
if (partition != null) {
setMessageState(currentStateOutput, resourceName, partition, instanceName, message);
} else {
LogUtil.logDebug(LOG, _eventId, String.format(
"Ignore a pending message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
}
}
}
}
// Add the state model into the map for lookup of Task Framework pending partitions
if (resource.getStateModelDefRef() != null) {
currentStateOutput.setResourceStateModelDef(resourceName, resource.getStateModelDefRef());
}
}
// update all pending relay messages
for (Message message : pendingRelayMessages) {
if (!message.isRelayMessage()) {
LogUtil.logWarn(LOG, _eventId,
String.format("Not a relay message %s, ignored!", message.getMsgId()));
continue;
}
String resourceName = message.getResourceName();
Resource resource = resourceMap.get(resourceName);
if (resource == null) {
LogUtil.logDebug(LOG, _eventId, String.format(
"Ignore a pending relay message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
continue;
}
if (!message.getBatchMessageMode()) {
String partitionName = message.getPartitionName();
Partition partition = resource.getPartition(partitionName);
if (partition != null) {
currentStateOutput.setPendingRelayMessage(resourceName, partition, instanceName, message);
} else {
LogUtil.logDebug(LOG, _eventId, String.format(
"Ignore a pending relay message %s for a non-exist resource %s and partition %s",
message.getMsgId(), resourceName, message.getPartitionName()));
}
} else {
LogUtil.logWarn(LOG, _eventId,
String.format("A relay message %s should not be batched, ignored!", message.getMsgId()));
}
}
}