in be/src/scheduling/cluster-membership-mgr.cc [206:468]
void ClusterMembershipMgr::UpdateMembership(
const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
vector<TTopicDelta>* subscriber_topic_updates) {
lock_guard<mutex> l(update_membership_lock_);
// First look to see if the topic we're interested in has an update.
StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
incoming_topic_deltas.find(membership_topic_name_);
// Ignore spurious messages.
if (topic == incoming_topic_deltas.end()) return;
const TTopicDelta& update = topic->second;
// If the update transmitted by the statestore is an empty delta, we don't need to
// process it.
bool no_ss_update = update.is_delta && update.topic_entries.empty();
// Check if the local backend is up and needs updating.
const Snapshot* base_snapshot = recovering_membership_.get();
if (base_snapshot == nullptr) base_snapshot = current_membership_.get();
DCHECK(base_snapshot != nullptr);
BeDescSharedPtr local_be_desc = GetLocalBackendDescriptor();
bool needs_local_be_update = NeedsLocalBackendUpdate(*base_snapshot, local_be_desc);
// We consider the statestore service to be recovering from a connection failure or
// fail-over until its post recovery grace period has elapsed.
bool ss_is_recovering = statestore_subscriber_ != nullptr
&& statestore_subscriber_->IsInPostRecoveryGracePeriod();
// If we are tracking a recovering membership but the statestore is out of recovery, we
// will need to send the current membership to the impala server.
bool update_local_server = recovering_membership_.get() != nullptr && !ss_is_recovering;
// Check if there are any executors that can be removed from the blacklist.
bool needs_blacklist_maintenance = base_snapshot->executor_blacklist.NeedsMaintenance();
// If there's no statestore update, the local backend descriptor has no changes, we
// don't need to update the local server, and the blacklist doesn't need to be updated,
// then we can skip processing altogether and avoid making a copy of the state.
if (no_ss_update && !needs_local_be_update && !update_local_server
&& !needs_blacklist_maintenance) {
return;
}
if (!no_ss_update) VLOG(1) << "Processing statestore update";
if (needs_local_be_update) VLOG(1) << "Local backend membership needs update";
if (update_local_server) VLOG(1) << "Local impala server needs update";
if (needs_blacklist_maintenance) VLOG(1) << "Removing executors from the blacklist";
if (ss_is_recovering) {
VLOG(1) << "Statestore subscriber is in post-recovery grace period";
}
// By now we know that we need to renew the snapshot. Construct a new state based on the
// type of the update we received.
std::shared_ptr<Snapshot> new_state;
if (!update.is_delta) {
VLOG(1) << "Received full membership update";
// Full topic transmit, create fresh state.
new_state = std::make_shared<Snapshot>();
} else {
VLOG(1) << "Received delta membership update";
if (recovering_membership_.get() != nullptr) {
// The recovering membership is never exposed to clients and therefore requires no
// copying.
new_state = recovering_membership_;
} else {
// Make a copy of the current membership. This is the only function calling SetState
// and thus no lock is needed for read access.
new_state = std::make_shared<Snapshot>(*current_membership_);
}
}
if (local_be_desc.get() != nullptr) new_state->local_be_desc = local_be_desc;
new_state->version += 1;
// Process removed, new, and updated entries from the topic update and apply the changes
// to the new backend map and executor groups.
BackendIdMap* new_backend_map = &(new_state->current_backends);
ExecutorGroups* new_executor_groups = &(new_state->executor_groups);
ExecutorBlacklist* new_blacklist = &(new_state->executor_blacklist);
for (const TTopicItem& item : update.topic_entries) {
// Deleted item
if (item.deleted) {
if (new_backend_map->find(item.key) != new_backend_map->end()) {
const BackendDescriptorPB& be_desc = (*new_backend_map)[item.key];
bool blacklisted = new_blacklist->FindAndRemove(be_desc)
== ExecutorBlacklist::State::BLACKLISTED;
if (blacklisted) {
VLOG(1) << "Removing backend " << item.key << " from blacklist (deleted)";
DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups));
}
// If the backend was quiescing or was previously blacklisted, it will already
// have been removed from 'executor_groups'.
if (be_desc.is_executor() && !be_desc.is_quiescing() && !blacklisted) {
for (const auto& group : be_desc.executor_groups()) {
VLOG(1) << "Removing backend " << item.key << " from group "
<< group.DebugString() << " (deleted)";
RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
}
}
// If a coordinator is not shutdown gracefully, then it will be deleted here.
if (be_desc.is_coordinator()) {
_removeCoordIfExists(new_state, be_desc);
}
// Note: be_desc is a reference to item.key, thus this erase must come at the end
// of the loop where be_desc is initialized.
new_backend_map->erase(item.key);
}
continue;
}
// New or existing item
BackendDescriptorPB be_desc;
bool success = be_desc.ParseFromString(item.value);
if (!success) {
LOG_EVERY_N(WARNING, 30) << "Error deserializing membership topic item with key: "
<< item.key;
continue;
}
if (be_desc.ip_address().empty()) {
// Each backend resolves its own IP address and transmits it inside its backend
// descriptor as part of the statestore update. If it is empty, then either that
// code has been changed, or someone else is sending malformed packets.
LOG_EVERY_N(WARNING, 30) << "Ignoring subscription request with empty IP address "
<< "from subscriber: " << be_desc.address();
continue;
}
if (item.key == local_backend_id_) {
if (local_be_desc.get() == nullptr) {
LOG_EVERY_N(WARNING, 30) << "Another host registered itself with the local "
<< "backend id (" << item.key << "), but the local backend has not started "
<< "yet. The offending address is: " << be_desc.address();
} else if (be_desc.address() != local_be_desc->address()) {
// Someone else has registered this subscriber ID with a different address. We
// will try to re-register (i.e. overwrite their subscription), but there is
// likely a configuration problem.
LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
<< be_desc.address() << " (we are: " << local_be_desc->address()
<< ", backend id: " << item.key << ")";
}
// We will always set the local backend explicitly below, so we ignore it here.
continue;
}
auto it = new_backend_map->find(item.key);
if (it != new_backend_map->end()) {
// Update
BackendDescriptorPB& existing = it->second;
// Once a backend starts quiescing, it must stay in the quiescing state until it
// has been deleted from the cluster membership. Once a node starts quiescing, it
// can never transfer back to a running state.
if (existing.is_quiescing()) DCHECK(be_desc.is_quiescing());
// If the node starts quiescing
if (be_desc.is_quiescing() && !existing.is_quiescing()) {
if (existing.is_executor()) {
// If the backend starts quiescing and it is present in the blacklist, remove it
// from the blacklist. If the backend is present in the blacklist, there is no
// need to remove it from the executor group because it has already been removed
bool blacklisted = new_blacklist->FindAndRemove(be_desc)
== ExecutorBlacklist::State::BLACKLISTED;
if (blacklisted) {
VLOG(1) << "Removing backend " << item.key << " from blacklist (quiescing)";
DCHECK(!IsBackendInExecutorGroups(be_desc, *new_executor_groups));
} else {
// Executor needs to be removed from its groups
for (const auto& group : be_desc.executor_groups()) {
VLOG(1) << "Removing backend " << item.key << " from group "
<< group.DebugString() << " (quiescing)";
RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
}
}
}
if (existing.is_coordinator()) {
_removeCoordIfExists(new_state, be_desc);
}
}
existing = be_desc;
} else {
// Create
new_backend_map->insert(make_pair(item.key, be_desc));
if (!be_desc.is_quiescing()) {
if (be_desc.is_executor()) {
for (const auto& group : be_desc.executor_groups()) {
VLOG(1) << "Adding backend " << item.key << " to group " <<
group.DebugString();
FindOrInsertExecutorGroup(group, new_executor_groups)->AddExecutor(be_desc);
}
}
if (is_active_coordinator(be_desc)) {
new_state->all_coordinators.AddExecutor(be_desc);
}
}
// Since this backend is new, it cannot already be on the blacklist or probation.
DCHECK_EQ(new_blacklist->FindAndRemove(be_desc),
ExecutorBlacklist::State::NOT_BLACKLISTED);
}
DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
}
if (needs_blacklist_maintenance) {
// Add any backends that were removed from the blacklist and put on probation back
// into 'executor_groups'.
std::list<BackendDescriptorPB> probation_list;
new_blacklist->Maintenance(&probation_list);
for (const BackendDescriptorPB& be_desc : probation_list) {
for (const auto& group : be_desc.executor_groups()) {
VLOG(1) << "Adding backend " << be_desc.address() << " to group "
<< group.DebugString() << " (passed blacklist timeout)";
FindOrInsertExecutorGroup(group, new_executor_groups)->AddExecutor(be_desc);
}
}
DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
}
// Update the local backend descriptor if required. We need to re-check new_state here
// in case it was reset to empty above.
if (NeedsLocalBackendUpdate(*new_state, local_be_desc)) {
// We need to update both the new membership state and the statestore
(*new_backend_map)[local_backend_id_] = *local_be_desc;
// Could be a coordinator and/or executor, but only executors add the local backend.
DCHECK(local_be_desc->is_coordinator() || local_be_desc->is_executor());
if (local_be_desc->is_executor()) {
for (const auto& group : local_be_desc->executor_groups()) {
if (local_be_desc->is_quiescing()) {
VLOG(1) << "Removing local backend from group " << group.DebugString();
RemoveExecutorAndGroup(*local_be_desc, group, new_executor_groups);
} else {
VLOG(1) << "Adding local backend to group " << group.DebugString();
FindOrInsertExecutorGroup(
group, new_executor_groups)->AddExecutor(*local_be_desc);
}
}
}
// Add ourself to the list of all coordinators.
if (is_active_coordinator(*local_be_desc.get())) {
_removeCoordIfExists(new_state, *local_be_desc);
new_state->all_coordinators.AddExecutor(*local_be_desc);
}
AddLocalBackendToStatestore(*local_be_desc, subscriber_topic_updates);
DCHECK(CheckConsistency(*new_backend_map, *new_executor_groups, *new_blacklist));
}
// Don't send updates or update the current membership if the statestore is in its
// post-recovery grace period.
if (ss_is_recovering) {
recovering_membership_ = new_state;
return;
}
// Atomically update the respective membership snapshot and update metrics.
SetState(new_state);
// Send notifications to all callbacks registered to receive updates.
NotifyListeners(new_state);
recovering_membership_.reset();
}