void ClusterMembershipMgr::UpdateMembership()

in be/src/scheduling/cluster-membership-mgr.cc [206:468]


void ClusterMembershipMgr::UpdateMembership(
    const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
    vector<TTopicDelta>* subscriber_topic_updates) {
  lock_guard<mutex> l(update_membership_lock_);

  // First look to see if the topic we're interested in has an update.
  StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
      incoming_topic_deltas.find(membership_topic_name_);

  // Ignore spurious messages.
  if (topic == incoming_topic_deltas.end()) return;
  const TTopicDelta& update = topic->second;

  // If the update transmitted by the statestore is an empty delta, we don't need to
  // process it.
  bool no_ss_update = update.is_delta && update.topic_entries.empty();

  // Check if the local backend is up and needs updating.
  const Snapshot* base_snapshot = recovering_membership_.get();
  if (base_snapshot == nullptr) base_snapshot = current_membership_.get();
  DCHECK(base_snapshot != nullptr);
  BeDescSharedPtr local_be_desc = GetLocalBackendDescriptor();
  bool needs_local_be_update = NeedsLocalBackendUpdate(*base_snapshot, local_be_desc);

  // We consider the statestore service to be recovering from a connection failure or
  // fail-over until its post recovery grace period has elapsed.
  bool ss_is_recovering = statestore_subscriber_ != nullptr
      && statestore_subscriber_->IsInPostRecoveryGracePeriod();

  // If we are tracking a recovering membership but the statestore is out of recovery, we
  // will need to send the current membership to the impala server.
  bool update_local_server = recovering_membership_.get() != nullptr && !ss_is_recovering;

  // Check if there are any executors that can be removed from the blacklist.
  bool needs_blacklist_maintenance = base_snapshot->executor_blacklist.NeedsMaintenance();

  // If there's no statestore update, the local backend descriptor has no changes, we
  // don't need to update the local server, and the blacklist doesn't need to be updated,
  // then we can skip processing altogether and avoid making a copy of the state.
  if (no_ss_update && !needs_local_be_update && !update_local_server
      && !needs_blacklist_maintenance) {
    return;
  }

  if (!no_ss_update) VLOG(1) << "Processing statestore update";
  if (needs_local_be_update) VLOG(1) << "Local backend membership needs update";
  if (update_local_server) VLOG(1) << "Local impala server needs update";
  if (needs_blacklist_maintenance) VLOG(1) << "Removing executors from the blacklist";
  if (ss_is_recovering) {
    VLOG(1) << "Statestore subscriber is in post-recovery grace period";
  }

  // By now we know that we need to renew the snapshot. Construct a new state based on the
  // type of the update we received.
  std::shared_ptr<Snapshot> new_state;

  if (!update.is_delta) {
    VLOG(1) << "Received full membership update";
    // Full topic transmit, create fresh state.
    new_state = std::make_shared<Snapshot>();
  } else {
    VLOG(1) << "Received delta membership update";
    if (recovering_membership_.get() != nullptr) {
      // The recovering membership is never exposed to clients and therefore requires no
      // copying.
      new_state = recovering_membership_;
    } else {
      // Make a copy of the current membership. This is the only function calling SetState
      // and thus no lock is needed for read access.
      new_state = std::make_shared<Snapshot>(*current_membership_);
    }
  }
  if (local_be_desc.get() != nullptr) new_state->local_be_desc = local_be_desc;
  new_state->version += 1;

  // Process removed, new, and updated entries from the topic update and apply the changes
  // to the new backend map and executor groups.
  BackendIdMap* new_backend_map = &(new_state->current_backends);
  ExecutorGroups* new_executor_groups = &(new_state->executor_groups);
  ExecutorBlacklist* new_blacklist = &(new_state->executor_blacklist);
  for (const TTopicItem& item : update.topic_entries) {
    // Deleted item
    if (item.deleted) {
      if (new_backend_map->find(item.key) != new_backend_map->end()) {
        const BackendDescriptorPB& be_desc = (*new_backend_map)[item.key];
        bool blacklisted = new_blacklist->FindAndRemove(be_desc)
            == ExecutorBlacklist::State::BLACKLISTED;
        if (blacklisted) {
          VLOG(1) << "Removing backend " << item.key << " from blacklist (deleted)";
          DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups));
        }
        // If the backend was quiescing or was previously blacklisted, it will already
        // have been removed from 'executor_groups'.
        if (be_desc.is_executor() && !be_desc.is_quiescing() && !blacklisted) {
          for (const auto& group : be_desc.executor_groups()) {
            VLOG(1) << "Removing backend " << item.key << " from group "
                    << group.DebugString() << " (deleted)";
            RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
          }
        }

        // If a coordinator is not shutdown gracefully, then it will be deleted here.
        if (be_desc.is_coordinator()) {
          _removeCoordIfExists(new_state, be_desc);
        }

        // Note: be_desc is a reference to item.key, thus this erase must come at the end
        // of the loop where be_desc is initialized.
        new_backend_map->erase(item.key);
      }
      continue;
    }

    // New or existing item
    BackendDescriptorPB be_desc;
    bool success = be_desc.ParseFromString(item.value);
    if (!success) {
      LOG_EVERY_N(WARNING, 30) << "Error deserializing membership topic item with key: "
          << item.key;
      continue;
    }
    if (be_desc.ip_address().empty()) {
      // Each backend resolves its own IP address and transmits it inside its backend
      // descriptor as part of the statestore update. If it is empty, then either that
      // code has been changed, or someone else is sending malformed packets.
      LOG_EVERY_N(WARNING, 30) << "Ignoring subscription request with empty IP address "
          << "from subscriber: " << be_desc.address();
      continue;
    }
    if (item.key == local_backend_id_) {
      if (local_be_desc.get() == nullptr) {
        LOG_EVERY_N(WARNING, 30) << "Another host registered itself with the local "
             << "backend id (" << item.key << "), but the local backend has not started "
             << "yet. The offending address is: " << be_desc.address();
      } else if (be_desc.address() != local_be_desc->address()) {
        // Someone else has registered this subscriber ID with a different address. We
        // will try to re-register (i.e. overwrite their subscription), but there is
        // likely a configuration problem.
        LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
            << be_desc.address() << " (we are: " << local_be_desc->address()
            << ", backend id: " << item.key << ")";
      }
      // We will always set the local backend explicitly below, so we ignore it here.
      continue;
    }

    auto it = new_backend_map->find(item.key);
    if (it != new_backend_map->end()) {
      // Update
      BackendDescriptorPB& existing = it->second;

      // Once a backend starts quiescing, it must stay in the quiescing state until it
      // has been deleted from the cluster membership. Once a node starts quiescing, it
      // can never transfer back to a running state.
      if (existing.is_quiescing()) DCHECK(be_desc.is_quiescing());

      // If the node starts quiescing
      if (be_desc.is_quiescing() && !existing.is_quiescing()) {
        if (existing.is_executor()) {
          // If the backend starts quiescing and it is present in the blacklist, remove it
          // from the blacklist. If the backend is present in the blacklist, there is no
          // need to remove it from the executor group because it has already been removed
          bool blacklisted = new_blacklist->FindAndRemove(be_desc)
              == ExecutorBlacklist::State::BLACKLISTED;
          if (blacklisted) {
            VLOG(1) << "Removing backend " << item.key << " from blacklist (quiescing)";
            DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups));
          } else {
            // Executor needs to be removed from its groups
            for (const auto& group : be_desc.executor_groups()) {
              VLOG(1) << "Removing backend " << item.key << " from group "
                      << group.DebugString() << " (quiescing)";
              RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
            }
          }
        }

        if (existing.is_coordinator()) {
          _removeCoordIfExists(new_state, be_desc);
        }
      }
      existing = be_desc;
    } else {
      // Create
      new_backend_map->insert(make_pair(item.key, be_desc));
      if (!be_desc.is_quiescing()) {
        if (be_desc.is_executor()) {
          for (const auto& group : be_desc.executor_groups()) {
            VLOG(1) << "Adding backend " << item.key << " to group " <<
                group.DebugString();
            FindOrInsertExecutorGroup(group, new_executor_groups)->AddExecutor(be_desc);
          }
        }

        if (is_active_coordinator(be_desc)) {
          new_state->all_coordinators.AddExecutor(be_desc);
        }
      }
      // Since this backend is new, it cannot already be on the blacklist or probation.
      DCHECK_EQ(new_blacklist->FindAndRemove(be_desc),
          ExecutorBlacklist::State::NOT_BLACKLISTED);
    }
    DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
  }

  if (needs_blacklist_maintenance) {
    // Add any backends that were removed from the blacklist and put on probation back
    // into 'executor_groups'.
    std::list<BackendDescriptorPB> probation_list;
    new_blacklist->Maintenance(&probation_list);
    for (const BackendDescriptorPB& be_desc : probation_list) {
      for (const auto& group : be_desc.executor_groups()) {
        VLOG(1) << "Adding backend " << be_desc.address() << " to group "
                << group.DebugString() << " (passed blacklist timeout)";
        FindOrInsertExecutorGroup(group, new_executor_groups)->AddExecutor(be_desc);
      }
    }
    DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
  }

  // Update the local backend descriptor if required. We need to re-check new_state here
  // in case it was reset to empty above.
  if (NeedsLocalBackendUpdate(*new_state, local_be_desc)) {
    // We need to update both the new membership state and the statestore
    (*new_backend_map)[local_backend_id_] = *local_be_desc;
    // Could be a coordinator and/or executor, but only executors add the local backend.
    DCHECK(local_be_desc->is_coordinator() || local_be_desc->is_executor());
    if (local_be_desc->is_executor()) {
      for (const auto& group : local_be_desc->executor_groups()) {
        if (local_be_desc->is_quiescing()) {
          VLOG(1) << "Removing local backend from group " << group.DebugString();
          RemoveExecutorAndGroup(*local_be_desc, group, new_executor_groups);
        } else {
          VLOG(1) << "Adding local backend to group " << group.DebugString();
          FindOrInsertExecutorGroup(
              group, new_executor_groups)->AddExecutor(*local_be_desc);
        }
      }
    }

    // Add ourself to the list of all coordinators.
    if (is_active_coordinator(*local_be_desc.get())) {
      _removeCoordIfExists(new_state, *local_be_desc);
      new_state->all_coordinators.AddExecutor(*local_be_desc);
    }

    AddLocalBackendToStatestore(*local_be_desc, subscriber_topic_updates);
    DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
  }

  // Don't send updates or update the current membership if the statestore is in its
  // post-recovery grace period.
  if (ss_is_recovering) {
    recovering_membership_ = new_state;
    return;
  }

  // Atomically update the respective membership snapshot and update metrics.
  SetState(new_state);
  // Send notifications to all callbacks registered to receive updates.
  NotifyListeners(new_state);
  recovering_membership_.reset();
}