in src/kudu/tserver/heartbeater.cc [431:604]
Status Heartbeater::Thread::DoHeartbeat(MasterErrorPB* error,
ErrorStatusPB* error_status) {
// Update the tablet statistics if necessary.
server_->tablet_manager()->UpdateTabletStatsIfNecessary();
if (PREDICT_FALSE(server_->fail_heartbeats_for_tests())) {
return Status::IOError("failing all heartbeats for tests");
}
CHECK(IsCurrentThread());
// Inject latency for testing purposes.
if (PREDICT_FALSE(FLAGS_heartbeat_inject_latency_before_heartbeat_ms > 0)) {
TRACE("Injecting $0ms of latency due to --heartbeat_inject_latency_before_heartbeat_ms",
FLAGS_heartbeat_inject_latency_before_heartbeat_ms);
SleepFor(MonoDelta::FromMilliseconds(FLAGS_heartbeat_inject_latency_before_heartbeat_ms));
}
if (!proxy_) {
VLOG(1) << "No valid master proxy. Connecting...";
RETURN_NOT_OK(ConnectToMaster());
DCHECK(proxy_);
}
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_heartbeat_rpc_timeout_ms));
master::TSHeartbeatRequestPB req;
SetupCommonField(req.mutable_common());
if (last_hb_response_.needs_reregister()) {
LOG(INFO) << "Registering TS with master...";
RETURN_NOT_OK_PREPEND(SetupRegistration(req.mutable_registration()),
"Unable to set up registration");
// If registering, let the catalog manager know what replica replacement
// scheme the tablet server is running with.
auto* info = req.mutable_replica_management_info();
info->set_replacement_scheme(FLAGS_raft_prepare_replacement_before_eviction
? ReplicaManagementInfoPB::PREPARE_REPLACEMENT_BEFORE_EVICTION
: ReplicaManagementInfoPB::EVICT_FIRST);
if (info->replacement_scheme() != ReplicaManagementInfoPB::EVICT_FIRST) {
// It's necessary to have both the tablet server and the masters to run
// with the same replica replacement scheme. Otherwise the system cannot
// re-create tablet replicas consistently. If this tablet server is running
// with the newer PREPARE_REPLACEMENT_BEFORE_EVICTION (a.k.a. 3-4-3) scheme,
// a master of an older version might not recognise the mismatch because
// it doesn't know about the 'replica_management_info' field (otherwise it
// would respond with the INCOMPATIBLE_REPLICA_MANAGEMENT error code
// when mismatch detected). To address that, tablet servers rely on the
// dedicated feature flag to enforce the consistency of replica management
// schemes.
rpc.RequireServerFeature(MasterFeatures::REPLICA_MANAGEMENT);
}
if (PREDICT_FALSE(FLAGS_heartbeat_inject_required_feature_flag != 0)) {
rpc.RequireServerFeature(FLAGS_heartbeat_inject_required_feature_flag);
}
}
// Check with the TS cert manager if it has a cert that needs signing.
// If so, send the CSR in the heartbeat for the master to sign.
if (auto csr = server_->mutable_tls_context()->GetCsrIfNecessary(); csr) {
RETURN_NOT_OK(csr->ToString(req.mutable_csr_der(), security::DataFormat::DER));
VLOG(1) << "Sending a CSR to the master in the next heartbeat";
}
// Send the most recently known TSK sequence number so that the master can
// send us knew ones if they exist.
req.set_latest_tsk_seq_num(server_->token_verifier().GetMaxKnownKeySequenceNumber());
if (send_full_tablet_report_) {
LOG(INFO) << Substitute(
"Master $0 was elected leader, sending a full tablet report...",
master_address_.ToString());
GenerateFullTabletReport(req.mutable_tablet_report());
// Should the heartbeat fail, we'd want the next heartbeat to resend this
// full tablet report. As such, send_full_tablet_report_ is only reset
// after all error checking is complete.
} else if (last_hb_response_.needs_full_tablet_report()) {
LOG(INFO) << Substitute(
"Master $0 requested a full tablet report, sending...",
master_address_.ToString());
GenerateFullTabletReport(req.mutable_tablet_report());
} else {
VLOG(2) << Substitute("Sending an incremental tablet report to master $0...",
master_address_.ToString());
// Check if it is time to send a report with tombstoned replicas.
const auto now = MonoTime::Now();
const auto tombstoned_report_interval =
FLAGS_tserver_send_tombstoned_tablets_report_inteval_secs;
const auto include_tombstoned = tombstoned_report_interval >= 0 &&
(now - last_tombstoned_report_time_).ToSeconds() >= tombstoned_report_interval;
GenerateIncrementalTabletReport(req.mutable_tablet_report(), include_tombstoned);
// Update the timestamp of last report on tombstoned tablet replicas.
if (include_tombstoned) {
last_tombstoned_report_time_ = now;
}
}
req.set_num_live_tablets(server_->tablet_manager()->GetNumLiveTablets());
auto num_live_tablets_by_dimension = server_->tablet_manager()->GetNumLiveTabletsByDimension();
req.mutable_num_live_tablets_by_dimension()->insert(num_live_tablets_by_dimension.begin(),
num_live_tablets_by_dimension.end());
auto num_live_tablets_by_range_and_table =
server_->tablet_manager()->GetNumLiveTabletsByRangePerTable();
for (const auto& [table, ranges] : num_live_tablets_by_range_and_table) {
master::TabletsByRangePerTablePB table_pb;
table_pb.mutable_num_live_tablets_by_range()->Reserve(ranges.size());
for (const auto& range : ranges) {
master::TabletsByRangePB* range_pb = table_pb.add_num_live_tablets_by_range();
range_pb->set_range_start_key(range.first);
range_pb->set_tablets(range.second);
}
auto pair = google::protobuf::MapPair(table, table_pb);
req.mutable_num_live_tablets_by_range_per_table()->insert(pair);
}
VLOG(2) << "Sending heartbeat:\n" << SecureDebugString(req);
master::TSHeartbeatResponsePB resp;
const auto& s = proxy_->TSHeartbeat(req, &resp, &rpc);
if (!s.ok()) {
if (rpc.error_response()) {
error_status->CopyFrom(*rpc.error_response());
}
RETURN_NOT_OK_PREPEND(s, "Failed to send heartbeat to master");
}
if (resp.has_error()) {
error->Swap(resp.mutable_error());
return StatusFromPB(error->status());
}
VLOG(2) << Substitute("Received heartbeat response from $0:\n$1",
master_address_.ToString(), SecureDebugString(resp));
// If we've detected that our master was elected leader, send a full tablet
// report in the next heartbeat.
if (!last_hb_response_.leader_master() && resp.leader_master()) {
send_full_tablet_report_ = true;
} else {
send_full_tablet_report_ = false;
}
last_hb_response_.Swap(&resp);
for (const auto& ca_cert_der : last_hb_response_.ca_cert_der()) {
security::Cert ca_cert;
RETURN_NOT_OK_PREPEND(
ca_cert.FromString(ca_cert_der, security::DataFormat::DER),
"failed to parse CA certificate from master");
RETURN_NOT_OK_PREPEND(
server_->mutable_tls_context()->AddTrustedCertificate(ca_cert),
"failed to trust master CA cert");
}
// If we have a new signed certificate from the master, adopt it.
if (last_hb_response_.has_signed_cert_der()) {
security::Cert cert;
RETURN_NOT_OK_PREPEND(
cert.FromString(last_hb_response_.signed_cert_der(), security::DataFormat::DER),
"failed to parse signed certificate from master");
RETURN_NOT_OK_PREPEND(
server_->mutable_tls_context()->AdoptSignedCert(cert),
"failed to adopt master-signed X509 cert");
}
// Import TSKs.
if (!last_hb_response_.tsks().empty()) {
vector<security::TokenSigningPublicKeyPB> tsks(last_hb_response_.tsks().begin(),
last_hb_response_.tsks().end());
RETURN_NOT_OK_PREPEND(
server_->mutable_token_verifier()->ImportKeys(tsks),
"failed to import token signing public keys from master heartbeat");
}
MarkTabletReportAcknowledged(req.tablet_report());
return Status::OK();
}