in src/kudu/master/catalog_manager.cc [5339:5784]
Status CatalogManager::ProcessTabletReport(
TSDescriptor* ts_desc,
const TabletReportPB& full_report,
TabletReportUpdatesPB* full_report_update,
RpcContext* rpc) {
int num_tablets = full_report.updated_tablets_size();
TRACE_EVENT2("master", "ProcessTabletReport",
"requestor", rpc->requestor_string(),
"num_tablets", num_tablets);
TRACE_COUNTER_INCREMENT("reported_tablets", num_tablets);
leader_lock_.AssertAcquiredForReading();
VLOG(2) << Substitute("Received tablet report from $0:\n$1",
RequestorString(rpc), SecureDebugString(full_report));
// TODO(todd): on a full tablet report, we may want to iterate over the
// tablets we think the server should have, compare vs the ones being
// reported, and somehow mark any that have been "lost" (eg somehow the
// tablet metadata got corrupted or something).
// Maps a tablet ID to its corresponding tablet report (owned by 'full_report').
unordered_map<string, const ReportedTabletPB*> reports;
// Maps a tablet ID to its corresponding tablet report update (owned by
// 'full_report_update').
unordered_map<string, ReportedTabletUpdatesPB*> updates;
// Maps a tablet ID to its corresponding TabletInfo.
unordered_map<string, scoped_refptr<TabletInfo>> tablet_infos;
// Keeps track of all RPCs that should be sent when we're done.
vector<scoped_refptr<RetryingTSRpcTask>> rpcs;
// Locks the referenced tables (for READ) and tablets (for WRITE).
//
// We must hold the tablets' locks while writing to the catalog table, and
// since they're locked for WRITE, we have to lock them en masse in order to
// avoid deadlocking.
//
// We have more freedom with the table locks: we could acquire them en masse,
// or we could acquire, use, and release them one at a time. So why do we
// acquire en masse? Because it reduces the overall number of lock
// acquisitions by reusing locks for tablets belonging to the same table, and
// although one-at-a-time acquisition would reduce table lock contention when
// writing, table writes are very rare events.
TableMetadataGroupLock tables_lock(LockMode::RELEASED);
TabletMetadataGroupLock tablets_lock(LockMode::RELEASED);
// 1. Set up local state.
full_report_update->mutable_tablets()->Reserve(num_tablets);
{
// We only need to acquire lock_ for the tablet_map_ access, but since it's
// acquired exclusively so rarely, it's probably cheaper to acquire and
// hold it for all tablets here than to acquire/release it for each tablet.
shared_lock l(lock_);
for (const ReportedTabletPB& report : full_report.updated_tablets()) {
const string& tablet_id = report.tablet_id();
// 1a. Prepare an update entry for this tablet. Every tablet in the
// report gets one, even if there's no change to it.
ReportedTabletUpdatesPB* update = full_report_update->add_tablets();
update->set_tablet_id(tablet_id);
// 1b. Find the tablet, deleting/skipping it if it can't be found.
scoped_refptr<TabletInfo> tablet = FindPtrOrNull(tablet_map_, tablet_id);
if (!tablet) {
// It'd be unsafe to ask the tserver to delete this tablet without first
// replicating something to our followers (i.e. to guarantee that we're
// the leader). For example, if we were a rogue master, we might be
// deleting a tablet created by a new master accidentally. Though masters
// don't always retain metadata for deleted tablets forever, a tablet
// may be unknown in the event of a serious misconfiguration, such as a
// tserver heartbeating to the wrong cluster. Therefore, it should be
// reasonable to ignore it and wait for an operator fix the situation.
LOG(WARNING) << "Ignoring report from unknown tablet " << tablet_id;
continue;
}
// 1c. Found the tablet, update local state. If multiple tablets with the
// same ID are in the report, all but the last one will be ignored.
reports[tablet_id] = &report;
updates[tablet_id] = update;
tablet_infos[tablet_id] = tablet;
tables_lock.AddInfo(*tablet->table().get());
tablets_lock.AddMutableInfo(tablet.get());
}
}
// 2. Lock the affected tables and tablets.
tables_lock.Lock(LockMode::READ);
tablets_lock.Lock(LockMode::WRITE);
// 3. Process each tablet. This may not be in the order that the tablets
// appear in 'full_report', but that has no bearing on correctness.
vector<scoped_refptr<TabletInfo>> mutated_tablets;
unordered_set<string> mutated_table_ids;
unordered_set<string> uuids_ignored_for_underreplication =
master_->ts_manager()->GetUuidsToIgnoreForUnderreplication();
for (const auto& e : tablet_infos) {
const string& tablet_id = e.first;
const scoped_refptr<TabletInfo>& tablet = e.second;
const scoped_refptr<TableInfo>& table = tablet->table();
const ReportedTabletPB& report = *FindOrDie(reports, tablet_id);
ReportedTabletUpdatesPB* update = FindOrDie(updates, tablet_id);
bool tablet_was_mutated = false;
// 4. Delete the tablet if it (or its table) have been deleted.
if (tablet->metadata().state().is_deleted() ||
table->metadata().state().is_deleted()) {
const string& msg = tablet->metadata().state().pb.state_msg();
update->set_state_msg(msg);
VLOG(1) << Substitute("Got report from deleted tablet $0 ($1)", tablet->ToString(), msg);
// TODO(unknown): Cancel tablet creation, instead of deleting, in cases
// where that might be possible (tablet creation timeout & replacement).
rpcs.emplace_back(new AsyncDeleteReplica(
master_, ts_desc->permanent_uuid(), table.get(), tablet_id,
TABLET_DATA_DELETED, nullopt, msg));
continue;
}
// 5. Tombstone a replica that is no longer part of the Raft config (and
// not already tombstoned or deleted outright).
//
// If the report includes a committed raft config, we only tombstone if
// the opid_index is strictly less than the latest reported committed
// config. This prevents us from spuriously deleting replicas that have
// just been added to the committed config and are in the process of copying.
const ConsensusStatePB& prev_cstate = tablet->metadata().state().pb.consensus_state();
const int64_t prev_opid_index = prev_cstate.committed_config().opid_index();
const int64_t report_opid_index = (report.has_consensus_state() &&
report.consensus_state().committed_config().has_opid_index()) ?
report.consensus_state().committed_config().opid_index() :
consensus::kInvalidOpIdIndex;
if (FLAGS_master_tombstone_evicted_tablet_replicas &&
report.tablet_data_state() != TABLET_DATA_TOMBSTONED &&
report.tablet_data_state() != TABLET_DATA_DELETED &&
!IsRaftConfigMember(ts_desc->permanent_uuid(), prev_cstate.committed_config()) &&
report_opid_index < prev_opid_index) {
const string delete_msg = report_opid_index == consensus::kInvalidOpIdIndex ?
"Replica has no consensus available" :
Substitute("Replica with old config index $0", report_opid_index);
rpcs.emplace_back(new AsyncDeleteReplica(
master_, ts_desc->permanent_uuid(), table.get(), tablet_id,
TABLET_DATA_TOMBSTONED, prev_opid_index,
Substitute("$0 (current committed config index is $1)",
delete_msg, prev_opid_index)));
continue;
}
// 6. Skip a non-deleted tablet which reports an error.
if (report.has_error()) {
Status s = StatusFromPB(report.error());
DCHECK(!s.ok());
LOG(WARNING) << Substitute("Tablet $0 has failed on TS $1: $2",
tablet->ToString(), ts_desc->ToString(), s.ToString());
continue;
}
const auto replication_factor = table->metadata().state().pb.num_replicas();
bool consensus_state_updated = false;
// 7. Process the report's consensus state. There may be one even when the
// replica has been tombstoned.
if (report.has_consensus_state()) {
// 7a. The master only processes reports for replicas with committed
// consensus configurations since it needs the committed index to only
// cache the most up-to-date config. Since it's possible for TOMBSTONED
// replicas with no ConsensusMetadata on disk to be reported as having no
// committed config opid_index, we skip over those replicas.
if (!report.consensus_state().committed_config().has_opid_index()) {
continue;
}
// 7b. Disregard the leader state if the reported leader is not a member
// of the committed config.
ConsensusStatePB cstate = report.consensus_state();
if (cstate.leader_uuid().empty() ||
!IsRaftConfigMember(cstate.leader_uuid(), cstate.committed_config())) {
cstate.clear_leader_uuid();
}
// 7c. Mark the tablet as RUNNING if it makes sense to do so.
//
// We need to wait for a leader before marking a tablet as RUNNING, or
// else we could incorrectly consider a tablet created when only a
// minority of its replicas were successful. In that case, the tablet
// would be stuck in this bad state forever.
if (ShouldTransitionTabletToRunning(tablet, report, cstate)) {
DCHECK_EQ(SysTabletsEntryPB::CREATING, tablet->metadata().state().pb.state())
<< Substitute("Tablet in unexpected state: $0: $1", tablet->ToString(),
SecureShortDebugString(tablet->metadata().state().pb));
VLOG(1) << Substitute("Tablet $0 is now online", tablet->ToString());
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::RUNNING, "Tablet reported with an active leader");
tablet_was_mutated = true;
}
// 7d. Update the consensus state if:
// - A config change operation was committed (reflected by a change to
// the committed config's opid_index).
// - The new cstate has a leader, and either the old cstate didn't, or
// there was a term change.
consensus_state_updated = (cstate.committed_config().opid_index() >
prev_cstate.committed_config().opid_index()) ||
(!cstate.leader_uuid().empty() &&
(prev_cstate.leader_uuid().empty() ||
cstate.current_term() > prev_cstate.current_term()));
if (consensus_state_updated) {
// 7d(i). Retain knowledge of the leader even if it wasn't reported in
// the latest config.
//
// When a config change is reported to the master, it may not include
// the leader because the follower doing the reporting may not know who
// the leader is yet (it may have just started up). It is safe to reuse
// the previous leader if the reported cstate has the same term as the
// previous cstate, and the leader was known for that term. An extra
// condition is to check whether it's a report from a former leader
// replica which currently doesn't maintain the leadership role. Such a
// situation is possible when the replica's cmeta file had been deleted
// and then recreated (e.g. by the "kudu local_replica cmeta unsafe_recreate"
// CLI tool). The code below assumes that the replica effectively has
// the leadership if the 'leader_uuid' field is set, but that's not so
// (see KUDU-2335).
if (cstate.current_term() == prev_cstate.current_term()) {
if (cstate.leader_uuid().empty() && !prev_cstate.leader_uuid().empty() &&
ts_desc->permanent_uuid() != prev_cstate.leader_uuid()) {
cstate.set_leader_uuid(prev_cstate.leader_uuid());
// Sanity check to detect consensus divergence bugs.
} else if (!cstate.leader_uuid().empty() &&
!prev_cstate.leader_uuid().empty() &&
cstate.leader_uuid() != prev_cstate.leader_uuid()) {
LOG(DFATAL) << Substitute("Previously reported cstate for tablet $0 gave "
"a different leader for term $1 than the current cstate. "
"Previous cstate: $2. Current cstate: $3.",
tablet->ToString(), cstate.current_term(),
SecureShortDebugString(prev_cstate),
SecureShortDebugString(cstate));
continue;
}
}
LOG(INFO) << Substitute("T $0 P $1 reported cstate change: $2. New cstate: $3",
tablet->id(), ts_desc->permanent_uuid(),
DiffConsensusStates(prev_cstate, cstate),
SecureShortDebugString(cstate));
VLOG(2) << Substitute("Updating cstate for tablet $0 from config reported by $1 "
"to that committed in log index $2 with leader state from term $3",
tablet_id, ts_desc->ToString(), cstate.committed_config().opid_index(),
cstate.current_term());
// 7d(ii). Update the consensus state.
// Strip the health report from the cstate before persisting it.
auto* dirty_cstate =
tablet->mutable_metadata()->mutable_dirty()->pb.mutable_consensus_state();
*dirty_cstate = cstate; // Copy in the updated cstate.
// Strip out the health reports from the persisted copy *only*.
for (auto& peer : *dirty_cstate->mutable_committed_config()->mutable_peers()) {
peer.clear_health_report();
}
tablet_was_mutated = true;
// 7d(iii). Delete any replicas from the previous config that are not
// in the new one.
if (FLAGS_master_tombstone_evicted_tablet_replicas) {
unordered_set<string> current_member_uuids;
for (const auto& p : cstate.committed_config().peers()) {
InsertOrDie(¤t_member_uuids, p.permanent_uuid());
}
for (const auto& p : prev_cstate.committed_config().peers()) {
DCHECK(!p.has_health_report()); // Health report shouldn't be persisted.
const string& peer_uuid = p.permanent_uuid();
if (!ContainsKey(current_member_uuids, peer_uuid)) {
rpcs.emplace_back(new AsyncDeleteReplica(
master_, peer_uuid, table.get(), tablet_id,
TABLET_DATA_TOMBSTONED, prev_cstate.committed_config().opid_index(),
Substitute("TS $0 not found in new config with opid_index $1",
peer_uuid, cstate.committed_config().opid_index())));
}
}
}
}
// 7e. Make tablet configuration change depending on the mode the server
// is running with. The choice between two alternative modes is controlled
// by the 'raft_prepare_replacement_before_eviction' run-time flag.
if (!FLAGS_raft_prepare_replacement_before_eviction) {
if (consensus_state_updated &&
FLAGS_master_add_server_when_underreplicated &&
CountVoters(cstate.committed_config()) < replication_factor) {
// Add a server to the config if it is under-replicated.
//
// This is an idempotent operation due to a CAS enforced on the
// committed config's opid_index.
rpcs.emplace_back(new AsyncAddReplicaTask(
master_, tablet, cstate, RaftPeerPB::VOTER, &rng_));
}
// When --raft_prepare_replacement_before_eviction is enabled, we
// consider whether to add or evict replicas based on the health report
// included in the leader's tablet report. Since only the leader tracks
// health, we ignore reports from non-leaders in this case. Also, making
// the changes recommended by Should{Add,Evict}Replica() assumes that the
// leader replica has already committed the configuration it's working with.
} else if (!cstate.has_pending_config() &&
!cstate.leader_uuid().empty() &&
cstate.leader_uuid() == ts_desc->permanent_uuid()) {
const auto& config = cstate.committed_config();
string to_evict;
if (PREDICT_TRUE(FLAGS_catalog_manager_evict_excess_replicas) &&
ShouldEvictReplica(config, cstate.leader_uuid(), replication_factor, &to_evict)) {
DCHECK(!to_evict.empty());
rpcs.emplace_back(new AsyncEvictReplicaTask(
master_, tablet, cstate, std::move(to_evict)));
} else if (FLAGS_master_add_server_when_underreplicated &&
ShouldAddReplica(config, replication_factor,
uuids_ignored_for_underreplication)) {
rpcs.emplace_back(new AsyncAddReplicaTask(
master_, tablet, cstate, RaftPeerPB::NON_VOTER, &rng_));
}
}
}
// 8. Send an AlterSchema RPC if the tablet has an old schema version.
uint32_t table_schema_version = table->metadata().state().pb.version();
if (report.has_schema_version() &&
report.schema_version() != table_schema_version) {
if (report.schema_version() > table_schema_version) {
LOG(ERROR) << Substitute("TS $0 has reported a schema version greater "
"than the current one for tablet $1. Expected version $2 got $3 (corruption)",
ts_desc->ToString(), tablet->ToString(), table_schema_version,
report.schema_version());
} else {
LOG(INFO) << Substitute("TS $0 does not have the latest schema for tablet $1. "
"Expected version $2 got $3", ts_desc->ToString(), tablet->ToString(),
table_schema_version, report.schema_version());
}
// It's possible that the tablet being reported is a laggy replica, and
// in fact the leader has already received an AlterTable RPC. That's OK,
// though -- it'll safely ignore it if we send another.
rpcs.emplace_back(new AsyncAlterTable(master_, tablet));
}
// 9. If the tablet was mutated, add it to the tablets to be re-persisted.
//
// Done here and not on a per-mutation basis to avoid duplicate entries.
if (tablet_was_mutated) {
mutated_tablets.push_back(tablet);
mutated_table_ids.emplace(table->id());
}
// 10. Process the report's tablet statistics.
//
// The tserver only reports the LEADER replicas it owns.
if (report.has_consensus_state() &&
report.consensus_state().leader_uuid() == ts_desc->permanent_uuid()) {
if (report.has_stats()) {
// For the versions >= 1.11.x, the tserver reports stats. But keep in
// mind that 'live_row_count' is not supported for the legacy replicas.
tablet->table()->UpdateStatsMetrics(tablet_id, tablet->GetStats(), report.stats());
tablet->UpdateStats(report.stats());
} else {
// For the versions < 1.11.x, the tserver doesn't report stats. Thus,
// the metrics from the stats should be hidden, for example, when it's
// in the upgrade/downgrade process or in a mixed environment.
tablet->table()->InvalidateMetrics(tablet_id);
}
}
}
// 11. Unlock the tables; we no longer need to access their state.
tables_lock.Unlock();
// 12. Write all tablet mutations to the catalog table.
//
// SysCatalogTable::Write will short-circuit the case where the data has not
// in fact changed since the previous version and avoid any unnecessary
// mutations. The generated sequence of actions may be split into multiple
// writes to the system catalog tablet to keep the size of each write request
// under the specified threshold.
{
SysCatalogTable::Actions actions;
actions.tablets_to_update = std::move(mutated_tablets);
// Updating the status of replicas on the same tablet server can be safely
// chunked. Even if some chunks of the update fails, it should not lead to
// bigger inconsistencies than simply not updating the status of a single
// replica on that tablet server (i.e., rejecting the whole tablet report).
// In addition, the nature of such failures is transient, and it's expected
// that the next successfully processed tablet report from the tablet server
// will fix the partial update.
const auto write_mode = FLAGS_catalog_manager_enable_chunked_tablet_reports
? SysCatalogTable::WriteMode::CHUNKED
: SysCatalogTable::WriteMode::ATOMIC;
auto s = sys_catalog_->Write(std::move(actions), write_mode);
if (PREDICT_FALSE(!s.ok())) {
LOG(ERROR) << Substitute(
"Error updating tablets from $0: $1. Tablet report was: $2",
ts_desc->permanent_uuid(), s.ToString(), SecureShortDebugString(full_report));
return s;
}
}
// Having successfully written the tablet mutations, this function cannot
// fail from here on out.
// 13. Publish the in-memory tablet mutations and release the locks.
tablets_lock.Commit();
// 14. Process all tablet schema version changes.
//
// This is separate from tablet state mutations because only tablet in-memory
// state (and table on-disk state) is changed.
for (const auto& e : tablet_infos) {
const string& tablet_id = e.first;
const scoped_refptr<TabletInfo>& tablet = e.second;
const ReportedTabletPB& report = *FindOrDie(reports, tablet_id);
if (report.has_schema_version()) {
HandleTabletSchemaVersionReport(tablet, report.schema_version());
}
}
// 15. Send all queued RPCs.
for (auto& rpc : rpcs) {
if (rpc->table()->ContainsTask(rpc->tablet_id(), rpc->description())) {
// There are some tasks with the same tablet_id, alter type (and permanent_uuid
// for some specific tasks) already running, here we just ignore the rpc to avoid
// sending duplicate requests, maybe it will be sent the next time the tserver heartbeats.
VLOG(1) << Substitute("Not sending duplicate request: $0", rpc->description());
continue;
}
rpc->table()->AddTask(rpc->tablet_id(), rpc);
WARN_NOT_OK(rpc->Run(), Substitute("Failed to send $0", rpc->description()));
}
// 16. Invalidate corresponding entries in the table locations cache.
if (table_locations_cache_) {
for (const auto& table_id : mutated_table_ids) {
table_locations_cache_->Remove(table_id);
}
}
return Status::OK();
}