Status CatalogManager::ProcessTabletReport()

in src/kudu/master/catalog_manager.cc [5339:5784]


Status CatalogManager::ProcessTabletReport(
    TSDescriptor* ts_desc,
    const TabletReportPB& full_report,
    TabletReportUpdatesPB* full_report_update,
    RpcContext* rpc) {
  int num_tablets = full_report.updated_tablets_size();
  TRACE_EVENT2("master", "ProcessTabletReport",
               "requestor", rpc->requestor_string(),
               "num_tablets", num_tablets);
  TRACE_COUNTER_INCREMENT("reported_tablets", num_tablets);

  leader_lock_.AssertAcquiredForReading();

  VLOG(2) << Substitute("Received tablet report from $0:\n$1",
                        RequestorString(rpc), SecureDebugString(full_report));

  // TODO(todd): on a full tablet report, we may want to iterate over the
  // tablets we think the server should have, compare vs the ones being
  // reported, and somehow mark any that have been "lost" (eg somehow the
  // tablet metadata got corrupted or something).

  // Maps a tablet ID to its corresponding tablet report (owned by 'full_report').
  unordered_map<string, const ReportedTabletPB*> reports;

  // Maps a tablet ID to its corresponding tablet report update (owned by
  // 'full_report_update').
  unordered_map<string, ReportedTabletUpdatesPB*> updates;

  // Maps a tablet ID to its corresponding TabletInfo.
  unordered_map<string, scoped_refptr<TabletInfo>> tablet_infos;

  // Keeps track of all RPCs that should be sent when we're done.
  vector<scoped_refptr<RetryingTSRpcTask>> rpcs;

  // Locks the referenced tables (for READ) and tablets (for WRITE).
  //
  // We must hold the tablets' locks while writing to the catalog table, and
  // since they're locked for WRITE, we have to lock them en masse in order to
  // avoid deadlocking.
  //
  // We have more freedom with the table locks: we could acquire them en masse,
  // or we could acquire, use, and release them one at a time. So why do we
  // acquire en masse? Because it reduces the overall number of lock
  // acquisitions by reusing locks for tablets belonging to the same table, and
  // although one-at-a-time acquisition would reduce table lock contention when
  // writing, table writes are very rare events.
  TableMetadataGroupLock tables_lock(LockMode::RELEASED);
  TabletMetadataGroupLock tablets_lock(LockMode::RELEASED);

  // 1. Set up local state.
  full_report_update->mutable_tablets()->Reserve(num_tablets);
  {
    // We only need to acquire lock_ for the tablet_map_ access, but since it's
    // acquired exclusively so rarely, it's probably cheaper to acquire and
    // hold it for all tablets here than to acquire/release it for each tablet.
    shared_lock l(lock_);
    for (const ReportedTabletPB& report : full_report.updated_tablets()) {
      const string& tablet_id = report.tablet_id();

      // 1a. Prepare an update entry for this tablet. Every tablet in the
      // report gets one, even if there's no change to it.
      ReportedTabletUpdatesPB* update = full_report_update->add_tablets();
      update->set_tablet_id(tablet_id);

      // 1b. Find the tablet, deleting/skipping it if it can't be found.
      scoped_refptr<TabletInfo> tablet = FindPtrOrNull(tablet_map_, tablet_id);
      if (!tablet) {
        // It'd be unsafe to ask the tserver to delete this tablet without first
        // replicating something to our followers (i.e. to guarantee that we're
        // the leader). For example, if we were a rogue master, we might be
        // deleting a tablet created by a new master accidentally. Though masters
        // don't always retain metadata for deleted tablets forever, a tablet
        // may be unknown in the event of a serious misconfiguration, such as a
        // tserver heartbeating to the wrong cluster. Therefore, it should be
        // reasonable to ignore it and wait for an operator fix the situation.
        LOG(WARNING) << "Ignoring report from unknown tablet " << tablet_id;
        continue;
      }

      // 1c. Found the tablet, update local state. If multiple tablets with the
      // same ID are in the report, all but the last one will be ignored.
      reports[tablet_id] = &report;
      updates[tablet_id] = update;
      tablet_infos[tablet_id] = tablet;
      tables_lock.AddInfo(*tablet->table().get());
      tablets_lock.AddMutableInfo(tablet.get());
    }
  }

  // 2. Lock the affected tables and tablets.
  tables_lock.Lock(LockMode::READ);
  tablets_lock.Lock(LockMode::WRITE);

  // 3. Process each tablet. This may not be in the order that the tablets
  // appear in 'full_report', but that has no bearing on correctness.
  vector<scoped_refptr<TabletInfo>> mutated_tablets;
  unordered_set<string> mutated_table_ids;
  unordered_set<string> uuids_ignored_for_underreplication =
      master_->ts_manager()->GetUuidsToIgnoreForUnderreplication();
  for (const auto& e : tablet_infos) {
    const string& tablet_id = e.first;
    const scoped_refptr<TabletInfo>& tablet = e.second;
    const scoped_refptr<TableInfo>& table = tablet->table();
    const ReportedTabletPB& report = *FindOrDie(reports, tablet_id);
    ReportedTabletUpdatesPB* update = FindOrDie(updates, tablet_id);
    bool tablet_was_mutated = false;

    // 4. Delete the tablet if it (or its table) have been deleted.
    if (tablet->metadata().state().is_deleted() ||
        table->metadata().state().is_deleted()) {
      const string& msg = tablet->metadata().state().pb.state_msg();
      update->set_state_msg(msg);
      VLOG(1) << Substitute("Got report from deleted tablet $0 ($1)", tablet->ToString(), msg);

      // TODO(unknown): Cancel tablet creation, instead of deleting, in cases
      // where that might be possible (tablet creation timeout & replacement).
      rpcs.emplace_back(new AsyncDeleteReplica(
          master_, ts_desc->permanent_uuid(), table.get(), tablet_id,
          TABLET_DATA_DELETED, nullopt, msg));
      continue;
    }

    // 5. Tombstone a replica that is no longer part of the Raft config (and
    // not already tombstoned or deleted outright).
    //
    // If the report includes a committed raft config, we only tombstone if
    // the opid_index is strictly less than the latest reported committed
    // config. This prevents us from spuriously deleting replicas that have
    // just been added to the committed config and are in the process of copying.
    const ConsensusStatePB& prev_cstate = tablet->metadata().state().pb.consensus_state();
    const int64_t prev_opid_index = prev_cstate.committed_config().opid_index();
    const int64_t report_opid_index = (report.has_consensus_state() &&
        report.consensus_state().committed_config().has_opid_index()) ?
            report.consensus_state().committed_config().opid_index() :
            consensus::kInvalidOpIdIndex;
    if (FLAGS_master_tombstone_evicted_tablet_replicas &&
        report.tablet_data_state() != TABLET_DATA_TOMBSTONED &&
        report.tablet_data_state() != TABLET_DATA_DELETED &&
        !IsRaftConfigMember(ts_desc->permanent_uuid(), prev_cstate.committed_config()) &&
        report_opid_index < prev_opid_index) {
      const string delete_msg = report_opid_index == consensus::kInvalidOpIdIndex ?
          "Replica has no consensus available" :
          Substitute("Replica with old config index $0", report_opid_index);
      rpcs.emplace_back(new AsyncDeleteReplica(
          master_, ts_desc->permanent_uuid(), table.get(), tablet_id,
          TABLET_DATA_TOMBSTONED, prev_opid_index,
          Substitute("$0 (current committed config index is $1)",
                     delete_msg, prev_opid_index)));
      continue;
    }

    // 6. Skip a non-deleted tablet which reports an error.
    if (report.has_error()) {
      Status s = StatusFromPB(report.error());
      DCHECK(!s.ok());
      LOG(WARNING) << Substitute("Tablet $0 has failed on TS $1: $2",
                                 tablet->ToString(), ts_desc->ToString(), s.ToString());
      continue;
    }

    const auto replication_factor = table->metadata().state().pb.num_replicas();
    bool consensus_state_updated = false;
    // 7. Process the report's consensus state. There may be one even when the
    // replica has been tombstoned.
    if (report.has_consensus_state()) {
      // 7a. The master only processes reports for replicas with committed
      // consensus configurations since it needs the committed index to only
      // cache the most up-to-date config. Since it's possible for TOMBSTONED
      // replicas with no ConsensusMetadata on disk to be reported as having no
      // committed config opid_index, we skip over those replicas.
      if (!report.consensus_state().committed_config().has_opid_index()) {
        continue;
      }

      // 7b. Disregard the leader state if the reported leader is not a member
      // of the committed config.
      ConsensusStatePB cstate = report.consensus_state();
      if (cstate.leader_uuid().empty() ||
          !IsRaftConfigMember(cstate.leader_uuid(), cstate.committed_config())) {
        cstate.clear_leader_uuid();
      }

      // 7c. Mark the tablet as RUNNING if it makes sense to do so.
      //
      // We need to wait for a leader before marking a tablet as RUNNING, or
      // else we could incorrectly consider a tablet created when only a
      // minority of its replicas were successful. In that case, the tablet
      // would be stuck in this bad state forever.
      if (ShouldTransitionTabletToRunning(tablet, report, cstate)) {
        DCHECK_EQ(SysTabletsEntryPB::CREATING, tablet->metadata().state().pb.state())
            << Substitute("Tablet in unexpected state: $0: $1", tablet->ToString(),
                          SecureShortDebugString(tablet->metadata().state().pb));
        VLOG(1) << Substitute("Tablet $0 is now online", tablet->ToString());
        tablet->mutable_metadata()->mutable_dirty()->set_state(
            SysTabletsEntryPB::RUNNING, "Tablet reported with an active leader");
        tablet_was_mutated = true;
      }

      // 7d. Update the consensus state if:
      // - A config change operation was committed (reflected by a change to
      //   the committed config's opid_index).
      // - The new cstate has a leader, and either the old cstate didn't, or
      //   there was a term change.
      consensus_state_updated = (cstate.committed_config().opid_index() >
                                 prev_cstate.committed_config().opid_index()) ||
          (!cstate.leader_uuid().empty() &&
           (prev_cstate.leader_uuid().empty() ||
            cstate.current_term() > prev_cstate.current_term()));
      if (consensus_state_updated) {
        // 7d(i). Retain knowledge of the leader even if it wasn't reported in
        // the latest config.
        //
        // When a config change is reported to the master, it may not include
        // the leader because the follower doing the reporting may not know who
        // the leader is yet (it may have just started up). It is safe to reuse
        // the previous leader if the reported cstate has the same term as the
        // previous cstate, and the leader was known for that term. An extra
        // condition is to check whether it's a report from a former leader
        // replica which currently doesn't maintain the leadership role.  Such a
        // situation is possible when the replica's cmeta file had been deleted
        // and then recreated (e.g. by the "kudu local_replica cmeta unsafe_recreate"
        // CLI tool). The code below assumes that the replica effectively has
        // the leadership if the 'leader_uuid' field is set, but that's not so
        // (see KUDU-2335).
        if (cstate.current_term() == prev_cstate.current_term()) {
          if (cstate.leader_uuid().empty() && !prev_cstate.leader_uuid().empty() &&
              ts_desc->permanent_uuid() != prev_cstate.leader_uuid()) {
            cstate.set_leader_uuid(prev_cstate.leader_uuid());
            // Sanity check to detect consensus divergence bugs.
          } else if (!cstate.leader_uuid().empty() &&
              !prev_cstate.leader_uuid().empty() &&
              cstate.leader_uuid() != prev_cstate.leader_uuid()) {
            LOG(DFATAL) << Substitute("Previously reported cstate for tablet $0 gave "
                "a different leader for term $1 than the current cstate. "
                "Previous cstate: $2. Current cstate: $3.",
                tablet->ToString(), cstate.current_term(),
                SecureShortDebugString(prev_cstate),
                SecureShortDebugString(cstate));
            continue;
          }
        }

        LOG(INFO) << Substitute("T $0 P $1 reported cstate change: $2. New cstate: $3",
                                tablet->id(), ts_desc->permanent_uuid(),
                                DiffConsensusStates(prev_cstate, cstate),
                                SecureShortDebugString(cstate));
        VLOG(2) << Substitute("Updating cstate for tablet $0 from config reported by $1 "
            "to that committed in log index $2 with leader state from term $3",
            tablet_id, ts_desc->ToString(), cstate.committed_config().opid_index(),
            cstate.current_term());


        // 7d(ii). Update the consensus state.
        // Strip the health report from the cstate before persisting it.
        auto* dirty_cstate =
            tablet->mutable_metadata()->mutable_dirty()->pb.mutable_consensus_state();
        *dirty_cstate = cstate; // Copy in the updated cstate.
        // Strip out the health reports from the persisted copy *only*.
        for (auto& peer : *dirty_cstate->mutable_committed_config()->mutable_peers()) {
          peer.clear_health_report();
        }
        tablet_was_mutated = true;

        // 7d(iii). Delete any replicas from the previous config that are not
        // in the new one.
        if (FLAGS_master_tombstone_evicted_tablet_replicas) {
          unordered_set<string> current_member_uuids;
          for (const auto& p : cstate.committed_config().peers()) {
            InsertOrDie(&current_member_uuids, p.permanent_uuid());
          }
          for (const auto& p : prev_cstate.committed_config().peers()) {
            DCHECK(!p.has_health_report()); // Health report shouldn't be persisted.
            const string& peer_uuid = p.permanent_uuid();
            if (!ContainsKey(current_member_uuids, peer_uuid)) {
              rpcs.emplace_back(new AsyncDeleteReplica(
                  master_, peer_uuid, table.get(), tablet_id,
                  TABLET_DATA_TOMBSTONED, prev_cstate.committed_config().opid_index(),
                  Substitute("TS $0 not found in new config with opid_index $1",
                             peer_uuid, cstate.committed_config().opid_index())));
            }
          }
        }
      }

      // 7e. Make tablet configuration change depending on the mode the server
      // is running with. The choice between two alternative modes is controlled
      // by the 'raft_prepare_replacement_before_eviction' run-time flag.
      if (!FLAGS_raft_prepare_replacement_before_eviction) {
        if (consensus_state_updated &&
            FLAGS_master_add_server_when_underreplicated &&
            CountVoters(cstate.committed_config()) < replication_factor) {
          // Add a server to the config if it is under-replicated.
          //
          // This is an idempotent operation due to a CAS enforced on the
          // committed config's opid_index.
          rpcs.emplace_back(new AsyncAddReplicaTask(
              master_, tablet, cstate, RaftPeerPB::VOTER, &rng_));
        }

      // When --raft_prepare_replacement_before_eviction is enabled, we
      // consider whether to add or evict replicas based on the health report
      // included in the leader's tablet report. Since only the leader tracks
      // health, we ignore reports from non-leaders in this case. Also, making
      // the changes recommended by Should{Add,Evict}Replica() assumes that the
      // leader replica has already committed the configuration it's working with.
      } else if (!cstate.has_pending_config() &&
                 !cstate.leader_uuid().empty() &&
                 cstate.leader_uuid() == ts_desc->permanent_uuid()) {
        const auto& config = cstate.committed_config();
        string to_evict;
        if (PREDICT_TRUE(FLAGS_catalog_manager_evict_excess_replicas) &&
            ShouldEvictReplica(config, cstate.leader_uuid(), replication_factor, &to_evict)) {
          DCHECK(!to_evict.empty());
          rpcs.emplace_back(new AsyncEvictReplicaTask(
              master_, tablet, cstate, std::move(to_evict)));
        } else if (FLAGS_master_add_server_when_underreplicated &&
                   ShouldAddReplica(config, replication_factor,
                                    uuids_ignored_for_underreplication)) {
          rpcs.emplace_back(new AsyncAddReplicaTask(
              master_, tablet, cstate, RaftPeerPB::NON_VOTER, &rng_));
        }
      }
    }

    // 8. Send an AlterSchema RPC if the tablet has an old schema version.
    uint32_t table_schema_version = table->metadata().state().pb.version();
    if (report.has_schema_version() &&
        report.schema_version() != table_schema_version) {
      if (report.schema_version() > table_schema_version) {
        LOG(ERROR) << Substitute("TS $0 has reported a schema version greater "
            "than the current one for tablet $1. Expected version $2 got $3 (corruption)",
            ts_desc->ToString(), tablet->ToString(), table_schema_version,
            report.schema_version());
      } else {
        LOG(INFO) << Substitute("TS $0 does not have the latest schema for tablet $1. "
            "Expected version $2 got $3", ts_desc->ToString(), tablet->ToString(),
            table_schema_version, report.schema_version());
      }

      // It's possible that the tablet being reported is a laggy replica, and
      // in fact the leader has already received an AlterTable RPC. That's OK,
      // though -- it'll safely ignore it if we send another.
      rpcs.emplace_back(new AsyncAlterTable(master_, tablet));
    }

    // 9. If the tablet was mutated, add it to the tablets to be re-persisted.
    //
    // Done here and not on a per-mutation basis to avoid duplicate entries.
    if (tablet_was_mutated) {
      mutated_tablets.push_back(tablet);
      mutated_table_ids.emplace(table->id());
    }

    // 10. Process the report's tablet statistics.
    //
    // The tserver only reports the LEADER replicas it owns.
    if (report.has_consensus_state() &&
        report.consensus_state().leader_uuid() == ts_desc->permanent_uuid()) {
      if (report.has_stats()) {
        // For the versions >= 1.11.x, the tserver reports stats. But keep in
        // mind that 'live_row_count' is not supported for the legacy replicas.
        tablet->table()->UpdateStatsMetrics(tablet_id, tablet->GetStats(), report.stats());
        tablet->UpdateStats(report.stats());
      } else {
        // For the versions < 1.11.x, the tserver doesn't report stats. Thus,
        // the metrics from the stats should be hidden, for example, when it's
        // in the upgrade/downgrade process or in a mixed environment.
        tablet->table()->InvalidateMetrics(tablet_id);
      }
    }
  }

  // 11. Unlock the tables; we no longer need to access their state.
  tables_lock.Unlock();

  // 12. Write all tablet mutations to the catalog table.
  //
  // SysCatalogTable::Write will short-circuit the case where the data has not
  // in fact changed since the previous version and avoid any unnecessary
  // mutations. The generated sequence of actions may be split into multiple
  // writes to the system catalog tablet to keep the size of each write request
  // under the specified threshold.
  {
    SysCatalogTable::Actions actions;
    actions.tablets_to_update = std::move(mutated_tablets);
    // Updating the status of replicas on the same tablet server can be safely
    // chunked. Even if some chunks of the update fails, it should not lead to
    // bigger inconsistencies than simply not updating the status of a single
    // replica on that tablet server (i.e., rejecting the whole tablet report).
    // In addition, the nature of such failures is transient, and it's expected
    // that the next successfully processed tablet report from the tablet server
    // will fix the partial update.
    const auto write_mode = FLAGS_catalog_manager_enable_chunked_tablet_reports
        ? SysCatalogTable::WriteMode::CHUNKED
        : SysCatalogTable::WriteMode::ATOMIC;
    auto s = sys_catalog_->Write(std::move(actions), write_mode);
    if (PREDICT_FALSE(!s.ok())) {
      LOG(ERROR) << Substitute(
          "Error updating tablets from $0: $1. Tablet report was: $2",
          ts_desc->permanent_uuid(), s.ToString(), SecureShortDebugString(full_report));
      return s;
    }
  }

  // Having successfully written the tablet mutations, this function cannot
  // fail from here on out.

  // 13. Publish the in-memory tablet mutations and release the locks.
  tablets_lock.Commit();

  // 14. Process all tablet schema version changes.
  //
  // This is separate from tablet state mutations because only tablet in-memory
  // state (and table on-disk state) is changed.
  for (const auto& e : tablet_infos) {
    const string& tablet_id = e.first;
    const scoped_refptr<TabletInfo>& tablet = e.second;
    const ReportedTabletPB& report = *FindOrDie(reports, tablet_id);

    if (report.has_schema_version()) {
      HandleTabletSchemaVersionReport(tablet, report.schema_version());
    }
  }

  // 15. Send all queued RPCs.
  for (auto& rpc : rpcs) {
    if (rpc->table()->ContainsTask(rpc->tablet_id(), rpc->description())) {
      // There are some tasks with the same tablet_id, alter type (and permanent_uuid
      // for some specific tasks) already running, here we just ignore the rpc to avoid
      // sending duplicate requests, maybe it will be sent the next time the tserver heartbeats.
      VLOG(1) << Substitute("Not sending duplicate request: $0", rpc->description());
      continue;
    }
    rpc->table()->AddTask(rpc->tablet_id(), rpc);
    WARN_NOT_OK(rpc->Run(), Substitute("Failed to send $0", rpc->description()));
  }

  // 16. Invalidate corresponding entries in the table locations cache.
  if (table_locations_cache_) {
    for (const auto& table_id : mutated_table_ids) {
      table_locations_cache_->Remove(table_id);
    }
  }

  return Status::OK();
}