public void updateRelayMessages()

in helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java [182:316]


  public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap,
      Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {

    // cache all relay messages read from ZK
    for (String instance : _messageMap.keySet()) {
      Map<String, Message> instanceMessages = _messageMap.get(instance);
      for (Message message : instanceMessages.values()) {
        if (message.hasRelayMessages()) {
          for (Message relayMsg : message.getRelayMessages().values()) {
            cacheRelayMessage(relayMsg, message);
          }
        }
      }
    }

    long nextRebalanceTime = Long.MAX_VALUE;
    long currentTime = System.currentTimeMillis();
    Map<String, Map<String, Message>> relayMessageMap = new HashMap<>();
    Set<String> targetInstanceToRemove = new HashSet<>();
    // Iterate all relay message in the cache, remove invalid or expired ones.
    for (String targetInstance : _relayMessageCache.keySet()) {
      Map<String, Message> relayMessages = _relayMessageCache.get(targetInstance);
      Iterator<Map.Entry<String, Message>> iterator = relayMessages.entrySet().iterator();

      while (iterator.hasNext()) {
        Message relayMessage = iterator.next().getValue();
        Map<String, Message> instanceMsgMap = _messageMap.get(targetInstance);

        if (!relayMessage.isValid()) {
          LOG.warn("Invalid relay message {}, remove it from the cache.", relayMessage.getId());
          iterator.remove();
          _relayHostMessageCache.remove(relayMessage.getMsgId());
          continue;
        }

        // Check whether the relay message has already been sent to the target host.
        if (instanceMsgMap != null && instanceMsgMap.containsKey(relayMessage.getMsgId())) {
          Message committedMessage = instanceMsgMap.get(relayMessage.getMsgId());
          if (committedMessage.isRelayMessage()) {
            LOG.info("Relay message already committed, remove relay message {} from the cache.",
                relayMessage.getId());
            iterator.remove();
            _relayHostMessageCache.remove(relayMessage.getMsgId());
            continue;
          } else {
            // controller already sent the same message to target host,
            // Relay host may still forward the p2p message later, so we can not remove the relay message immediately now,
            // just set the relay time as current time.
            // TODO: we should remove the message immediately here once we have transaction id support in CurrentState.
            LOG.info(
                "Controller already sent the message to the target host, set relay message {} to be expired.",
                relayMessage.getId());
            setMessageRelayTime(relayMessage, currentTime);
          }
        }

        try {
          // Check partition's state on the relay message's target host (The relay message's destination host).
          // Set the relay message to be expired immediately or to be expired a certain time in future if necessary.
          checkTargetHost(targetInstance, relayMessage, liveInstanceMap, currentStateMap);

          // Check partition's state on the original relay host (host that should forward the relay message)
          // Set the relay message to be expired immediately or to be expired a certain time in future if necessary.
          Message hostedMessage = _relayHostMessageCache.get(relayMessage.getMsgId());
          checkRelayHost(relayMessage, liveInstanceMap, currentStateMap, hostedMessage);
        } catch (Exception e) {
          LOG.warn(
              "Failed to check target and relay host and set the relay time. Relay message: {} exception: {}",
              relayMessage.getId(), e);
        }

        if (relayMessage.isExpired()) {
          LOG.info("relay message {} expired, remove it from cache. relay time {}.",
              relayMessage.getId(), relayMessage.getRelayTime());
          iterator.remove();
          _relayHostMessageCache.remove(relayMessage.getMsgId());
          continue;
        }

        // If Helix missed all of other events to evict a relay message from the cache,
        // it will delete the message anyway after a certain timeout.
        // This is the latest resort to avoid a relay message stuck in the cache forever.
        // This case should happen very rarely.
        if (relayMessage.getRelayTime() < 0
            && (relayMessage.getCreateTimeStamp() + _relayMessageLifetime) < System
            .currentTimeMillis()) {
          LOG.info(
              "relay message {} has reached its lifetime, remove it from cache.", relayMessage.getId());
          iterator.remove();
          _relayHostMessageCache.remove(relayMessage.getMsgId());
          continue;
        }

        if (!relayMessageMap.containsKey(targetInstance)) {
          relayMessageMap.put(targetInstance, Maps.<String, Message>newHashMap());
        }
        relayMessageMap.get(targetInstance).put(relayMessage.getMsgId(), relayMessage);

        // Compute the next earliest time to trigger a pipeline run.
        long expiryTime = relayMessage.getCreateTimeStamp() + _relayMessageLifetime;
        if (relayMessage.getRelayTime() > 0) {
          expiryTime = relayMessage.getRelayTime() + relayMessage.getExpiryPeriod();
        }
        if (expiryTime < nextRebalanceTime) {
          nextRebalanceTime = expiryTime;
        }
      } // end while (iterator.hasNext())

      if (relayMessages.isEmpty()) {
        targetInstanceToRemove.add(targetInstance);
      }
    }
    _relayMessageCache.keySet().removeAll(targetInstanceToRemove);

    if (nextRebalanceTime < Long.MAX_VALUE) {
      scheduleFuturePipeline(nextRebalanceTime);
    }

    _relayMessageMap = Collections.unmodifiableMap(relayMessageMap);
    long relayMessageCount = 0;

    // Add valid relay messages to the instance message map.
    for (String instance : _relayMessageMap.keySet()) {
      Map<String, Message> relayMessages = _relayMessageMap.get(instance);
      if (!_messageMap.containsKey(instance)) {
        _messageMap.put(instance, Maps.<String, Message>newHashMap());
      }
      _messageMap.get(instance).putAll(relayMessages);
      relayMessageCount += relayMessages.size();
    }

    LOG.info(
        "END: updateRelayMessages(), {} of valid relay messages in cache, took {} ms. ",
        relayMessageCount, (System.currentTimeMillis() - currentTime));
  }