Status Heartbeater::Thread::DoHeartbeat()

in src/kudu/tserver/heartbeater.cc [431:604]


Status Heartbeater::Thread::DoHeartbeat(MasterErrorPB* error,
                                        ErrorStatusPB* error_status) {
  // Update the tablet statistics if necessary.
  server_->tablet_manager()->UpdateTabletStatsIfNecessary();

  if (PREDICT_FALSE(server_->fail_heartbeats_for_tests())) {
    return Status::IOError("failing all heartbeats for tests");
  }

  CHECK(IsCurrentThread());

  // Inject latency for testing purposes.
  if (PREDICT_FALSE(FLAGS_heartbeat_inject_latency_before_heartbeat_ms > 0)) {
    TRACE("Injecting $0ms of latency due to --heartbeat_inject_latency_before_heartbeat_ms",
          FLAGS_heartbeat_inject_latency_before_heartbeat_ms);
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_inject_latency_before_heartbeat_ms));
  }

  if (!proxy_) {
    VLOG(1) << "No valid master proxy. Connecting...";
    RETURN_NOT_OK(ConnectToMaster());
    DCHECK(proxy_);
  }

  RpcController rpc;
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));

  master::TSHeartbeatRequestPB req;
  SetupCommonField(req.mutable_common());
  if (last_hb_response_.needs_reregister()) {
    LOG(INFO) << "Registering TS with master...";
    RETURN_NOT_OK_PREPEND(SetupRegistration(req.mutable_registration()),
                          "Unable to set up registration");
    // If registering, let the catalog manager know what replica replacement
    // scheme the tablet server is running with.
    auto* info = req.mutable_replica_management_info();
    info->set_replacement_scheme(FLAGS_raft_prepare_replacement_before_eviction
        ? ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION
        : ReplicaManagementInfoPB::EVICT_FIRST);
    if (info->replacement_scheme() != ReplicaManagementInfoPB::EVICT_FIRST) {
      // It's necessary to have both the tablet server and the masters to run
      // with the same replica replacement scheme. Otherwise the system cannot
      // re-create tablet replicas consistently. If this tablet server is running
      // with the newer PREPARE_REPLACEMENT_BEFORE_EVICTION (a.k.a. 3-4-3) scheme,
      // a master of an older version might not recognise the mismatch because
      // it doesn't know about the 'replica_management_info' field (otherwise it
      // would respond with the INCOMPATIBLE_REPLICA_MANAGEMENT error code
      // when mismatch detected). To address that, tablet servers rely on the
      // dedicated feature flag to enforce the consistency of replica management
      // schemes.
      rpc.RequireServerFeature(MasterFeatures::REPLICA_MANAGEMENT);
    }
    if (PREDICT_FALSE(FLAGS_heartbeat_inject_required_feature_flag != 0)) {
      rpc.RequireServerFeature(FLAGS_heartbeat_inject_required_feature_flag);
    }
  }

  // Check with the TS cert manager if it has a cert that needs signing.
  // If so, send the CSR in the heartbeat for the master to sign.
  if (auto csr = server_->mutable_tls_context()->GetCsrIfNecessary(); csr) {
    RETURN_NOT_OK(csr->ToString(req.mutable_csr_der(), security::DataFormat::DER));
    VLOG(1) << "Sending a CSR to the master in the next heartbeat";
  }

  // Send the most recently known TSK sequence number so that the master can
  // send us knew ones if they exist.
  req.set_latest_tsk_seq_num(server_->token_verifier().GetMaxKnownKeySequenceNumber());
  if (send_full_tablet_report_) {
    LOG(INFO) << Substitute(
        "Master $0 was elected leader, sending a full tablet report...",
        master_address_.ToString());
    GenerateFullTabletReport(req.mutable_tablet_report());
    // Should the heartbeat fail, we'd want the next heartbeat to resend this
    // full tablet report. As such, send_full_tablet_report_ is only reset
    // after all error checking is complete.
  } else if (last_hb_response_.needs_full_tablet_report()) {
    LOG(INFO) << Substitute(
        "Master $0 requested a full tablet report, sending...",
        master_address_.ToString());
    GenerateFullTabletReport(req.mutable_tablet_report());
  } else {
    VLOG(2) << Substitute("Sending an incremental tablet report to master $0...",
                          master_address_.ToString());
    // Check if it is time to send a report with tombstoned replicas.
    const auto now = MonoTime::Now();
    const auto tombstoned_report_interval =
        FLAGS_tserver_send_tombstoned_tablets_report_inteval_secs;
    const auto include_tombstoned = tombstoned_report_interval >= 0 &&
        (now - last_tombstoned_report_time_).ToSeconds() >= tombstoned_report_interval;
    GenerateIncrementalTabletReport(req.mutable_tablet_report(), include_tombstoned);
    // Update the timestamp of last report on tombstoned tablet replicas.
    if (include_tombstoned) {
      last_tombstoned_report_time_ = now;
    }
  }

  req.set_num_live_tablets(server_->tablet_manager()->GetNumLiveTablets());
  auto num_live_tablets_by_dimension = server_->tablet_manager()->GetNumLiveTabletsByDimension();
  req.mutable_num_live_tablets_by_dimension()->insert(num_live_tablets_by_dimension.begin(),
                                                      num_live_tablets_by_dimension.end());
  auto num_live_tablets_by_range_and_table =
      server_->tablet_manager()->GetNumLiveTabletsByRangePerTable();
  for (const auto& [table, ranges] : num_live_tablets_by_range_and_table) {
    master::TabletsByRangePerTablePB table_pb;
    table_pb.mutable_num_live_tablets_by_range()->Reserve(ranges.size());
    for (const auto& range : ranges) {
      master::TabletsByRangePB* range_pb = table_pb.add_num_live_tablets_by_range();
      range_pb->set_range_start_key(range.first);
      range_pb->set_tablets(range.second);
    }
    auto pair = google::protobuf::MapPair(table, table_pb);
    req.mutable_num_live_tablets_by_range_per_table()->insert(pair);
  }

  VLOG(2) << "Sending heartbeat:\n" << SecureDebugString(req);
  master::TSHeartbeatResponsePB resp;
  const auto& s = proxy_->TSHeartbeat(req, &resp, &rpc);
  if (!s.ok()) {
    if (rpc.error_response()) {
      error_status->CopyFrom(*rpc.error_response());
    }
    RETURN_NOT_OK_PREPEND(s, "Failed to send heartbeat to master");
  }
  if (resp.has_error()) {
    error->Swap(resp.mutable_error());
    return StatusFromPB(error->status());
  }

  VLOG(2) << Substitute("Received heartbeat response from $0:\n$1",
                        master_address_.ToString(), SecureDebugString(resp));

  // If we've detected that our master was elected leader, send a full tablet
  // report in the next heartbeat.
  if (!last_hb_response_.leader_master() && resp.leader_master()) {
    send_full_tablet_report_ = true;
  } else {
    send_full_tablet_report_ = false;
  }

  last_hb_response_.Swap(&resp);

  for (const auto& ca_cert_der : last_hb_response_.ca_cert_der()) {
    security::Cert ca_cert;
    RETURN_NOT_OK_PREPEND(
        ca_cert.FromString(ca_cert_der, security::DataFormat::DER),
        "failed to parse CA certificate from master");
    RETURN_NOT_OK_PREPEND(
        server_->mutable_tls_context()->AddTrustedCertificate(ca_cert),
        "failed to trust master CA cert");
  }

  // If we have a new signed certificate from the master, adopt it.
  if (last_hb_response_.has_signed_cert_der()) {
    security::Cert cert;
    RETURN_NOT_OK_PREPEND(
        cert.FromString(last_hb_response_.signed_cert_der(), security::DataFormat::DER),
        "failed to parse signed certificate from master");
    RETURN_NOT_OK_PREPEND(
        server_->mutable_tls_context()->AdoptSignedCert(cert),
        "failed to adopt master-signed X509 cert");
  }

  // Import TSKs.
  if (!last_hb_response_.tsks().empty()) {
    vector<security::TokenSigningPublicKeyPB> tsks(last_hb_response_.tsks().begin(),
                                                   last_hb_response_.tsks().end());
    RETURN_NOT_OK_PREPEND(
        server_->mutable_token_verifier()->ImportKeys(tsks),
        "failed to import token signing public keys from master heartbeat");
  }

  MarkTabletReportAcknowledged(req.tablet_report());
  return Status::OK();
}