Status ServerBase::Init()

in src/kudu/server/server_base.cc [803:1004]


Status ServerBase::Init() {
  if (!FLAGS_use_hybrid_clock) {
    clock_.reset(new clock::LogicalClock(Timestamp::kInitialTimestamp,
                                         metric_entity_));
  } else {
    uint64_t threshold_usec = 0;
    unique_ptr<InstanceMetadata> im;
    RETURN_NOT_OK(WallClockJumpDetectionNeeded(&threshold_usec, &im));
    if (threshold_usec > 0) {
      LOG(INFO) << "enabling wall clock jump detection";
    }
    clock_.reset(new clock::HybridClock(
        metric_entity_, threshold_usec, std::move(im)));
  }

  // Initialize the clock immediately. This checks that the clock is synchronized
  // so we're less likely to get into a partially initialized state on disk during startup
  // if we're having clock problems.
  RETURN_NOT_OK_PREPEND(clock_->Init(), "Cannot initialize clock");

  Timer* init = startup_path_handler_->init_progress();
  Timer* read_filesystem = startup_path_handler_->read_filesystem_progress();
  init->Start();
  glog_metrics_.reset(new ScopedGLogMetrics(metric_entity_));
  tcmalloc::RegisterMetrics(metric_entity_);

#ifdef TCMALLOC_ENABLED
  MallocExtension::instance()->SetNumericProperty(
    "tcmalloc.max_total_thread_cache_bytes", FLAGS_tcmalloc_max_total_thread_cache_bytes);
  RegisterSpinLockContentionMetrics(metric_entity_);
#else
  LOG(INFO) << "Flag tcmalloc_max_total_thread_cache_bytes is not working since tcmalloc "
               "is not enabled.";
#endif

  InitSpinLockContentionProfiling();

  // Get the FQDN of the node where the server is running. If fetching of the
  // FQDN fails, it attempts to set the 'hostname_' field to the local hostname.
  string hostname;
  if (auto s = GetFQDN(&hostname); !s.ok()) {
    const auto& msg = Substitute("could not determine host FQDN: $0", s.ToString());
    if (hostname.empty()) {
      LOG(ERROR) << msg;
      return s;
    }
    LOG(WARNING) << msg;
  }
  DCHECK(!hostname.empty());

  RETURN_NOT_OK(security::InitKerberosForServer(FLAGS_principal, FLAGS_keytab_file));
  RETURN_NOT_OK(file_cache_->Init());

  // Register the startup web handler and start the web server to make the web UI
  // available while the server is initializing, loading the file system, etc.
  //
  // NOTE: unlike the other path handlers, we register this path handler
  // separately, as the startup handler is meant to be displayed before all of
  // Kudu's subsystems have finished initializing.
  if (options_.fs_opts.block_manager_type == "file") {
    startup_path_handler_->set_is_using_lbm(false);
  }
  if (web_server_) {
    startup_path_handler_->RegisterStartupPathHandler(web_server_.get());
    AddPreInitializedDefaultPathHandlers(web_server_.get());
    web_server_->set_footer_html(FooterHtml());
    RETURN_NOT_OK(web_server_->Start());
  }

  fs::FsReport report;
  init->Stop();
  read_filesystem->Start();
  Status s = fs_manager_->Open(&report,
                               startup_path_handler_->read_instance_metadata_files_progress(),
                               startup_path_handler_->read_data_directories_progress(),
                               startup_path_handler_->containers_processed(),
                               startup_path_handler_->containers_total());
  // No instance files existed. Try creating a new FS layout.
  if (s.IsNotFound()) {
    LOG(INFO) << "This appears to be a new deployment of Kudu; creating new FS layout";
    is_first_run_ = true;

    if (options_.server_key_info.server_key.empty() &&
        options_.tenant_key_info.tenant_key.empty()) {
      s = fs_manager_->CreateInitialFileSystemLayout();
    } else if (!options_.tenant_key_info.tenant_key.empty()) {
      // The priority of tenant key is higher than that of server key.
      s = fs_manager_->CreateInitialFileSystemLayout(std::nullopt,
                                                     options_.tenant_key_info.tenant_name,
                                                     options_.tenant_key_info.tenant_id,
                                                     options_.tenant_key_info.tenant_key,
                                                     options_.tenant_key_info.tenant_key_iv,
                                                     options_.tenant_key_info.tenant_key_version);
    } else {
      s = fs_manager_->CreateInitialFileSystemLayout(std::nullopt,
                                                     std::nullopt,
                                                     std::nullopt,
                                                     options_.server_key_info.server_key,
                                                     options_.server_key_info.server_key_iv,
                                                     options_.server_key_info.server_key_version);
    }

    if (s.IsAlreadyPresent()) {
      return s.CloneAndPrepend("FS layout already exists; not overwriting existing layout");
    }
    RETURN_NOT_OK_PREPEND(s, "Could not create new FS layout");
    s = fs_manager_->Open(&report, startup_path_handler_->read_instance_metadata_files_progress(),
                          startup_path_handler_->read_data_directories_progress());
  }
  RETURN_NOT_OK_PREPEND(s, "Failed to load FS layout");
  RETURN_NOT_OK(report.LogAndCheckForFatalErrors());
  read_filesystem->Stop();
  RETURN_NOT_OK(InitAcls());

  vector<string> rpc_tls_excluded_protocols = strings::Split(
      FLAGS_rpc_tls_excluded_protocols, ",", strings::SkipEmpty());

  const auto listen_backlog = FLAGS_rpc_acceptor_listen_backlog;
  const auto effective_listen_backlog = GetEffectiveListenSocketBacklog(
      Env::Default(), listen_backlog);
  if (effective_listen_backlog != listen_backlog) {
    LOG(WARNING) << Substitute(
        "--rpc_acceptor_listen_backlog setting $0 is capped at $1 by $2",
        listen_backlog, effective_listen_backlog, kListenBacklogMax);
  }

  // Create the Messenger.
  rpc::MessengerBuilder builder(name_);
  builder.set_num_reactors(FLAGS_num_reactor_threads)
         .set_min_negotiation_threads(FLAGS_min_negotiation_threads)
         .set_max_negotiation_threads(FLAGS_max_negotiation_threads)
         .set_metric_entity(metric_entity())
         .set_connection_keep_alive_time(FLAGS_rpc_default_keepalive_time_ms)
         .set_rpc_negotiation_timeout_ms(FLAGS_rpc_negotiation_timeout_ms)
         .set_rpc_authentication(FLAGS_rpc_authentication)
         .set_rpc_encryption(FLAGS_rpc_encryption)
         .set_rpc_tls_ciphers(FLAGS_rpc_tls_ciphers)
         .set_rpc_tls_ciphersuites(FLAGS_rpc_tls_ciphersuites)
         .set_rpc_tls_excluded_protocols(std::move(rpc_tls_excluded_protocols))
         .set_rpc_tls_min_protocol(FLAGS_rpc_tls_min_protocol)
         .set_epki_cert_key_files(FLAGS_rpc_certificate_file, FLAGS_rpc_private_key_file)
         .set_epki_certificate_authority_file(FLAGS_rpc_ca_certificate_file)
         .set_epki_private_password_key_cmd(FLAGS_rpc_private_key_password_cmd)
         .set_keytab_file(FLAGS_keytab_file)
         .set_hostname(hostname)
         .set_acceptor_listen_backlog(listen_backlog)
         .enable_inbound_tls();

  auto username = kudu::security::GetLoggedInUsernameFromKeytab();
  if (username.has_value()) {
    builder.set_sasl_proto_name(username.value());
  }

  if (options_.rpc_opts.rpc_reuseport) {
    builder.set_reuseport();
  }

  if (!FLAGS_keytab_file.empty()) {
    string service_name;
    RETURN_NOT_OK(security::MapPrincipalToLocalName(FLAGS_principal, &service_name));
    builder.set_sasl_proto_name(service_name);
  }

  if (FLAGS_enable_jwt_token_auth) {
    if (!FLAGS_jwks_url.empty()) {
      builder.set_jwt_verifier(std::make_shared<KeyBasedJwtVerifier>(
          FLAGS_jwks_url,
          FLAGS_jwks_verify_server_certificate,
          FLAGS_trusted_certificate_file));
    } else if (!FLAGS_jwks_file_path.empty()) {
      builder.set_jwt_verifier(std::make_shared<KeyBasedJwtVerifier>(
          FLAGS_jwks_file_path));
    } else {
      LOG(WARNING) << Substitute("JWT authentication enabled, but neither "
                                 "'jwks_url' nor 'jwks_file_path' is set");
    }
  }

  RETURN_NOT_OK(builder.Build(&messenger_));
  rpc_server_->set_too_busy_hook([this](rpc::ServicePool* pool) {
    this->ServiceQueueOverflowed(pool);
  });

  RETURN_NOT_OK(rpc_server_->Init(messenger_));

  if (FLAGS_rpc_listen_on_unix_domain_socket) {
    VLOG(1) << "Enabling listening on unix domain socket.";
    Sockaddr addr;
#if !defined(__APPLE__)
    RETURN_NOT_OK_PREPEND(addr.ParseUnixDomainPath(Substitute("@kudu-$0", fs_manager_->uuid())),
                          "unable to parse provided UNIX socket path");
#else
    RETURN_NOT_OK_PREPEND(addr.ParseUnixDomainPath(Substitute("/tmp/kudu-$0", fs_manager_->uuid())),
                          "unable to parse provided UNIX socket path");
#endif
    RETURN_NOT_OK_PREPEND(rpc_server_->AddBindAddress(addr),
                          "unable to add configured UNIX socket path to list of bind addresses "
                          "for RPC server");
  }

  return rpc_server_->Bind();
}