in src/kudu/server/server_base.cc [803:1004]
Status ServerBase::Init() {
if (!FLAGS_use_hybrid_clock) {
clock_.reset(new clock::LogicalClock(Timestamp::kInitialTimestamp,
metric_entity_));
} else {
uint64_t threshold_usec = 0;
unique_ptr<InstanceMetadata> im;
RETURN_NOT_OK(WallClockJumpDetectionNeeded(&threshold_usec, &im));
if (threshold_usec > 0) {
LOG(INFO) << "enabling wall clock jump detection";
}
clock_.reset(new clock::HybridClock(
metric_entity_, threshold_usec, std::move(im)));
}
// Initialize the clock immediately. This checks that the clock is synchronized
// so we're less likely to get into a partially initialized state on disk during startup
// if we're having clock problems.
RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");
Timer* init = startup_path_handler_->init_progress();
Timer* read_filesystem = startup_path_handler_->read_filesystem_progress();
init->Start();
glog_metrics_.reset(new ScopedGLogMetrics(metric_entity_));
tcmalloc::RegisterMetrics(metric_entity_);
#ifdef TCMALLOC_ENABLED
MallocExtension::instance()->SetNumericProperty(
"tcmalloc.max_total_thread_cache_bytes", FLAGS_tcmalloc_max_total_thread_cache_bytes);
RegisterSpinLockContentionMetrics(metric_entity_);
#else
LOG(INFO) << "Flag tcmalloc_max_total_thread_cache_bytes is not working since tcmalloc "
"is not enabled.";
#endif
InitSpinLockContentionProfiling();
// Get the FQDN of the node where the server is running. If fetching of the
// FQDN fails, it attempts to set the 'hostname_' field to the local hostname.
string hostname;
if (auto s = GetFQDN(&hostname); !s.ok()) {
const auto& msg = Substitute("could not determine host FQDN: $0", s.ToString());
if (hostname.empty()) {
LOG(ERROR) << msg;
return s;
}
LOG(WARNING) << msg;
}
DCHECK(!hostname.empty());
RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal, FLAGS_keytab_file));
RETURN_NOT_OK(file_cache_->Init());
// Register the startup web handler and start the web server to make the web UI
// available while the server is initializing, loading the file system, etc.
//
// NOTE: unlike the other path handlers, we register this path handler
// separately, as the startup handler is meant to be displayed before all of
// Kudu's subsystems have finished initializing.
if (options_.fs_opts.block_manager_type == "file") {
startup_path_handler_->set_is_using_lbm(false);
}
if (web_server_) {
startup_path_handler_->RegisterStartupPathHandler(web_server_.get());
AddPreInitializedDefaultPathHandlers(web_server_.get());
web_server_->set_footer_html(FooterHtml());
RETURN_NOT_OK(web_server_->Start());
}
fs::FsReport report;
init->Stop();
read_filesystem->Start();
Status s = fs_manager_->Open(&report,
startup_path_handler_->read_instance_metadata_files_progress(),
startup_path_handler_->read_data_directories_progress(),
startup_path_handler_->containers_processed(),
startup_path_handler_->containers_total());
// No instance files existed. Try creating a new FS layout.
if (s.IsNotFound()) {
LOG(INFO) << "This appears to be a new deployment of Kudu; creating new FS layout";
is_first_run_ = true;
if (options_.server_key_info.server_key.empty() &&
options_.tenant_key_info.tenant_key.empty()) {
s = fs_manager_->CreateInitialFileSystemLayout();
} else if (!options_.tenant_key_info.tenant_key.empty()) {
// The priority of tenant key is higher than that of server key.
s = fs_manager_->CreateInitialFileSystemLayout(std::nullopt,
options_.tenant_key_info.tenant_name,
options_.tenant_key_info.tenant_id,
options_.tenant_key_info.tenant_key,
options_.tenant_key_info.tenant_key_iv,
options_.tenant_key_info.tenant_key_version);
} else {
s = fs_manager_->CreateInitialFileSystemLayout(std::nullopt,
std::nullopt,
std::nullopt,
options_.server_key_info.server_key,
options_.server_key_info.server_key_iv,
options_.server_key_info.server_key_version);
}
if (s.IsAlreadyPresent()) {
return s.CloneAndPrepend("FS layout already exists; not overwriting existing layout");
}
RETURN_NOT_OK_PREPEND(s, "Could not create new FS layout");
s = fs_manager_->Open(&report, startup_path_handler_->read_instance_metadata_files_progress(),
startup_path_handler_->read_data_directories_progress());
}
RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout");
RETURN_NOT_OK(report.LogAndCheckForFatalErrors());
read_filesystem->Stop();
RETURN_NOT_OK(InitAcls());
vector<string> rpc_tls_excluded_protocols = strings::Split(
FLAGS_rpc_tls_excluded_protocols, ",", strings::SkipEmpty());
const auto listen_backlog = FLAGS_rpc_acceptor_listen_backlog;
const auto effective_listen_backlog = GetEffectiveListenSocketBacklog(
Env::Default(), listen_backlog);
if (effective_listen_backlog != listen_backlog) {
LOG(WARNING) << Substitute(
"--rpc_acceptor_listen_backlog setting $0 is capped at $1 by $2",
listen_backlog, effective_listen_backlog, kListenBacklogMax);
}
// Create the Messenger.
rpc::MessengerBuilder builder(name_);
builder.set_num_reactors(FLAGS_num_reactor_threads)
.set_min_negotiation_threads(FLAGS_min_negotiation_threads)
.set_max_negotiation_threads(FLAGS_max_negotiation_threads)
.set_metric_entity(metric_entity())
.set_connection_keep_alive_time(FLAGS_rpc_default_keepalive_time_ms)
.set_rpc_negotiation_timeout_ms(FLAGS_rpc_negotiation_timeout_ms)
.set_rpc_authentication(FLAGS_rpc_authentication)
.set_rpc_encryption(FLAGS_rpc_encryption)
.set_rpc_tls_ciphers(FLAGS_rpc_tls_ciphers)
.set_rpc_tls_ciphersuites(FLAGS_rpc_tls_ciphersuites)
.set_rpc_tls_excluded_protocols(std::move(rpc_tls_excluded_protocols))
.set_rpc_tls_min_protocol(FLAGS_rpc_tls_min_protocol)
.set_epki_cert_key_files(FLAGS_rpc_certificate_file, FLAGS_rpc_private_key_file)
.set_epki_certificate_authority_file(FLAGS_rpc_ca_certificate_file)
.set_epki_private_password_key_cmd(FLAGS_rpc_private_key_password_cmd)
.set_keytab_file(FLAGS_keytab_file)
.set_hostname(hostname)
.set_acceptor_listen_backlog(listen_backlog)
.enable_inbound_tls();
auto username = kudu::security::GetLoggedInUsernameFromKeytab();
if (username.has_value()) {
builder.set_sasl_proto_name(username.value());
}
if (options_.rpc_opts.rpc_reuseport) {
builder.set_reuseport();
}
if (!FLAGS_keytab_file.empty()) {
string service_name;
RETURN_NOT_OK(security::MapPrincipalToLocalName(FLAGS_principal, &service_name));
builder.set_sasl_proto_name(service_name);
}
if (FLAGS_enable_jwt_token_auth) {
if (!FLAGS_jwks_url.empty()) {
builder.set_jwt_verifier(std::make_shared<KeyBasedJwtVerifier>(
FLAGS_jwks_url,
FLAGS_jwks_verify_server_certificate,
FLAGS_trusted_certificate_file));
} else if (!FLAGS_jwks_file_path.empty()) {
builder.set_jwt_verifier(std::make_shared<KeyBasedJwtVerifier>(
FLAGS_jwks_file_path));
} else {
LOG(WARNING) << Substitute("JWT authentication enabled, but neither "
"'jwks_url' nor 'jwks_file_path' is set");
}
}
RETURN_NOT_OK(builder.Build(&messenger_));
rpc_server_->set_too_busy_hook([this](rpc::ServicePool* pool) {
this->ServiceQueueOverflowed(pool);
});
RETURN_NOT_OK(rpc_server_->Init(messenger_));
if (FLAGS_rpc_listen_on_unix_domain_socket) {
VLOG(1) << "Enabling listening on unix domain socket.";
Sockaddr addr;
#if !defined(__APPLE__)
RETURN_NOT_OK_PREPEND(addr.ParseUnixDomainPath(Substitute("@kudu-$0", fs_manager_->uuid())),
"unable to parse provided UNIX socket path");
#else
RETURN_NOT_OK_PREPEND(addr.ParseUnixDomainPath(Substitute("/tmp/kudu-$0", fs_manager_->uuid())),
"unable to parse provided UNIX socket path");
#endif
RETURN_NOT_OK_PREPEND(rpc_server_->AddBindAddress(addr),
"unable to add configured UNIX socket path to list of bind addresses "
"for RPC server");
}
return rpc_server_->Bind();
}