bool BaseDynamicClusterImpl::updateDynamicHostList()

in source/common/upstream/upstream_impl.cc [1000:1189]


bool BaseDynamicClusterImpl::updateDynamicHostList(const HostVector& new_hosts,
                                                   HostVector& current_priority_hosts,
                                                   HostVector& hosts_added_to_current_priority,
                                                   HostVector& hosts_removed_from_current_priority,
                                                   HostMap& updated_hosts,
                                                   const HostMap& all_hosts) {
  uint64_t max_host_weight = 1;

  // Did hosts change?
  //
  // Has the EDS health status changed the health of any endpoint? If so, we
  // rebuild the hosts vectors. We only do this if the health status of an
  // endpoint has materially changed (e.g. if previously failing active health
  // checks, we just note it's now failing EDS health status but don't rebuild).
  //
  // Likewise, if metadata for an endpoint changed we rebuild the hosts vectors.
  //
  // TODO(htuch): We can be smarter about this potentially, and not force a full
  // host set update on health status change. The way this would work is to
  // implement a HealthChecker subclass that provides thread local health
  // updates to the Cluster object. This will probably make sense to do in
  // conjunction with https://github.com/envoyproxy/envoy/issues/2874.
  bool hosts_changed = false;

  // Go through and see if the list we have is different from what we just got. If it is, we make a
  // new host list and raise a change notification. This uses an N^2 search given that this does not
  // happen very often and the list sizes should be small (see
  // https://github.com/envoyproxy/envoy/issues/2874). We also check for duplicates here. It's
  // possible for DNS to return the same address multiple times, and a bad EDS implementation could
  // do the same thing.

  // Keep track of hosts we see in new_hosts that we are able to match up with an existing host.
  std::unordered_set<std::string> existing_hosts_for_current_priority(
      current_priority_hosts.size());
  HostVector final_hosts;
  for (const HostSharedPtr& host : new_hosts) {
    if (updated_hosts.count(host->address()->asString())) {
      continue;
    }

    // To match a new host with an existing host means comparing their addresses.
    auto existing_host = all_hosts.find(host->address()->asString());
    const bool existing_host_found = existing_host != all_hosts.end();

    // Check if in-place host update should be skipped, i.e. when the following criteria are met
    // (currently there is only one criterion, but we might add more in the future):
    // - The cluster health checker is activated and a new host is matched with the existing one,
    //   but the health check address is different.
    const bool skip_inplace_host_update =
        health_checker_ != nullptr && existing_host_found &&
        *existing_host->second->healthCheckAddress() != *host->healthCheckAddress();

    // When there is a match and we decided to do in-place update, we potentially update the host's
    // health check flag and metadata. Afterwards, the host is pushed back into the final_hosts,
    // i.e. hosts that should be preserved in the current priority.
    if (existing_host_found && !skip_inplace_host_update) {
      existing_hosts_for_current_priority.emplace(existing_host->first);
      // If we find a host matched based on address, we keep it. However we do change weight inline
      // so do that here.
      if (host->weight() > max_host_weight) {
        max_host_weight = host->weight();
      }

      if (existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH) !=
          host->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH)) {
        // TODO(snowp): To accommodate degraded, this bit should be checking for any changes
        // to the health flag, not just healthy vs not healthy.
        const bool previously_healthy = existing_host->second->health() == Host::Health::Healthy;
        if (host->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH)) {
          existing_host->second->healthFlagSet(Host::HealthFlag::FAILED_EDS_HEALTH);
          // If the host was previously healthy and we're now unhealthy, we need to
          // rebuild.
          hosts_changed |= previously_healthy;
        } else {
          existing_host->second->healthFlagClear(Host::HealthFlag::FAILED_EDS_HEALTH);
          // If the host was previously unhealthy and now healthy, we need to
          // rebuild.
          hosts_changed |=
              !previously_healthy && existing_host->second->health() == Host::Health::Healthy;
        }
      }

      // Did metadata change?
      const bool metadata_changed = !Protobuf::util::MessageDifferencer::Equivalent(
          *host->metadata(), *existing_host->second->metadata());
      if (metadata_changed) {
        // First, update the entire metadata for the endpoint.
        existing_host->second->metadata(*host->metadata());

        // Also, given that the canary attribute of an endpoint is derived from its metadata
        // (e.g.: from envoy.lb/canary), we do a blind update here since it's cheaper than testing
        // to see if it actually changed. We must update this besides just updating the metadata,
        // because it'll be used by the router filter to compute upstream stats.
        existing_host->second->canary(host->canary());

        // If metadata changed, we need to rebuild. See github issue #3810.
        hosts_changed = true;
      }

      // Did the priority change?
      if (host->priority() != existing_host->second->priority()) {
        existing_host->second->priority(host->priority());
      }

      existing_host->second->weight(host->weight());
      final_hosts.push_back(existing_host->second);
      updated_hosts[existing_host->second->address()->asString()] = existing_host->second;
    } else {
      if (host->weight() > max_host_weight) {
        max_host_weight = host->weight();
      }

      // If we are depending on a health checker, we initialize to unhealthy.
      if (health_checker_ != nullptr) {
        host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
      }

      updated_hosts[host->address()->asString()] = host;
      final_hosts.push_back(host);
      hosts_added_to_current_priority.push_back(host);
    }
  }

  // Remove hosts from current_priority_hosts that were matched to an existing host in the previous
  // loop.
  for (auto itr = current_priority_hosts.begin(); itr != current_priority_hosts.end();) {
    auto existing_itr = existing_hosts_for_current_priority.find((*itr)->address()->asString());

    if (existing_itr != existing_hosts_for_current_priority.end()) {
      existing_hosts_for_current_priority.erase(existing_itr);
      itr = current_priority_hosts.erase(itr);
    } else {
      itr++;
    }
  }

  // If we saw existing hosts during this iteration from a different priority, then we've moved
  // a host from another priority into this one, so we should mark the priority as having changed.
  if (!existing_hosts_for_current_priority.empty()) {
    hosts_changed = true;
  }

  // The remaining hosts are hosts that are not referenced in the config update. We remove them from
  // the priority if any of the following is true:
  // - Active health checking is not enabled.
  // - The removed hosts are failing active health checking.
  // - We have explicitly configured the cluster to remove hosts regardless of active health status.
  const bool dont_remove_healthy_hosts =
      health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval();
  if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
    for (auto i = current_priority_hosts.begin(); i != current_priority_hosts.end();) {
      if (!(*i)->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
        if ((*i)->weight() > max_host_weight) {
          max_host_weight = (*i)->weight();
        }

        final_hosts.push_back(*i);
        updated_hosts[(*i)->address()->asString()] = *i;
        i = current_priority_hosts.erase(i);
      } else {
        i++;
      }
    }
  }

  // At this point we've accounted for all the new hosts as well the hosts that previously
  // existed in this priority.

  // TODO(mattklein123): This stat is used by both the RR and LR load balancer to decide at
  // runtime whether to use either the weighted or unweighted mode. If we extend weights to
  // static clusters or DNS SRV clusters we need to make sure this gets set. Better, we should
  // avoid pivoting on this entirely and probably just force a host set refresh if any weights
  // change.
  info_->stats().max_host_weight_.set(max_host_weight);

  // Whatever remains in current_priority_hosts should be removed.
  if (!hosts_added_to_current_priority.empty() || !current_priority_hosts.empty()) {
    hosts_removed_from_current_priority = std::move(current_priority_hosts);
    hosts_changed = true;
  }

  // During the update we populated final_hosts with all the hosts that should remain
  // in the current priority, so move them back into current_priority_hosts.
  current_priority_hosts = std::move(final_hosts);
  // We return false here in the absence of EDS health status or metadata changes, because we
  // have no changes to host vector status (modulo weights). When we have EDS
  // health status or metadata changed, we return true, causing updateHosts() to fire in the
  // caller.
  return hosts_changed;
}