in helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java [182:316]
public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap,
Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
// cache all relay messages read from ZK
for (String instance : _messageMap.keySet()) {
Map<String, Message> instanceMessages = _messageMap.get(instance);
for (Message message : instanceMessages.values()) {
if (message.hasRelayMessages()) {
for (Message relayMsg : message.getRelayMessages().values()) {
cacheRelayMessage(relayMsg, message);
}
}
}
}
long nextRebalanceTime = Long.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Map<String, Map<String, Message>> relayMessageMap = new HashMap<>();
Set<String> targetInstanceToRemove = new HashSet<>();
// Iterate all relay message in the cache, remove invalid or expired ones.
for (String targetInstance : _relayMessageCache.keySet()) {
Map<String, Message> relayMessages = _relayMessageCache.get(targetInstance);
Iterator<Map.Entry<String, Message>> iterator = relayMessages.entrySet().iterator();
while (iterator.hasNext()) {
Message relayMessage = iterator.next().getValue();
Map<String, Message> instanceMsgMap = _messageMap.get(targetInstance);
if (!relayMessage.isValid()) {
LOG.warn("Invalid relay message {}, remove it from the cache.", relayMessage.getId());
iterator.remove();
_relayHostMessageCache.remove(relayMessage.getMsgId());
continue;
}
// Check whether the relay message has already been sent to the target host.
if (instanceMsgMap != null && instanceMsgMap.containsKey(relayMessage.getMsgId())) {
Message committedMessage = instanceMsgMap.get(relayMessage.getMsgId());
if (committedMessage.isRelayMessage()) {
LOG.info("Relay message already committed, remove relay message {} from the cache.",
relayMessage.getId());
iterator.remove();
_relayHostMessageCache.remove(relayMessage.getMsgId());
continue;
} else {
// controller already sent the same message to target host,
// Relay host may still forward the p2p message later, so we can not remove the relay message immediately now,
// just set the relay time as current time.
// TODO: we should remove the message immediately here once we have transaction id support in CurrentState.
LOG.info(
"Controller already sent the message to the target host, set relay message {} to be expired.",
relayMessage.getId());
setMessageRelayTime(relayMessage, currentTime);
}
}
try {
// Check partition's state on the relay message's target host (The relay message's destination host).
// Set the relay message to be expired immediately or to be expired a certain time in future if necessary.
checkTargetHost(targetInstance, relayMessage, liveInstanceMap, currentStateMap);
// Check partition's state on the original relay host (host that should forward the relay message)
// Set the relay message to be expired immediately or to be expired a certain time in future if necessary.
Message hostedMessage = _relayHostMessageCache.get(relayMessage.getMsgId());
checkRelayHost(relayMessage, liveInstanceMap, currentStateMap, hostedMessage);
} catch (Exception e) {
LOG.warn(
"Failed to check target and relay host and set the relay time. Relay message: {} exception: {}",
relayMessage.getId(), e);
}
if (relayMessage.isExpired()) {
LOG.info("relay message {} expired, remove it from cache. relay time {}.",
relayMessage.getId(), relayMessage.getRelayTime());
iterator.remove();
_relayHostMessageCache.remove(relayMessage.getMsgId());
continue;
}
// If Helix missed all of other events to evict a relay message from the cache,
// it will delete the message anyway after a certain timeout.
// This is the latest resort to avoid a relay message stuck in the cache forever.
// This case should happen very rarely.
if (relayMessage.getRelayTime() < 0
&& (relayMessage.getCreateTimeStamp() + _relayMessageLifetime) < System
.currentTimeMillis()) {
LOG.info(
"relay message {} has reached its lifetime, remove it from cache.", relayMessage.getId());
iterator.remove();
_relayHostMessageCache.remove(relayMessage.getMsgId());
continue;
}
if (!relayMessageMap.containsKey(targetInstance)) {
relayMessageMap.put(targetInstance, Maps.<String, Message>newHashMap());
}
relayMessageMap.get(targetInstance).put(relayMessage.getMsgId(), relayMessage);
// Compute the next earliest time to trigger a pipeline run.
long expiryTime = relayMessage.getCreateTimeStamp() + _relayMessageLifetime;
if (relayMessage.getRelayTime() > 0) {
expiryTime = relayMessage.getRelayTime() + relayMessage.getExpiryPeriod();
}
if (expiryTime < nextRebalanceTime) {
nextRebalanceTime = expiryTime;
}
} // end while (iterator.hasNext())
if (relayMessages.isEmpty()) {
targetInstanceToRemove.add(targetInstance);
}
}
_relayMessageCache.keySet().removeAll(targetInstanceToRemove);
if (nextRebalanceTime < Long.MAX_VALUE) {
scheduleFuturePipeline(nextRebalanceTime);
}
_relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
long relayMessageCount = 0;
// Add valid relay messages to the instance message map.
for (String instance : _relayMessageMap.keySet()) {
Map<String, Message> relayMessages = _relayMessageMap.get(instance);
if (!_messageMap.containsKey(instance)) {
_messageMap.put(instance, Maps.<String, Message>newHashMap());
}
_messageMap.get(instance).putAll(relayMessages);
relayMessageCount += relayMessages.size();
}
LOG.info(
"END: updateRelayMessages(), {} of valid relay messages in cache, took {} ms. ",
relayMessageCount, (System.currentTimeMillis() - currentTime));
}