in src/kudu/mini-cluster/external_mini_cluster.cc [274:460]
Status ExternalMiniCluster::Start() {
CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size()
<< "). Maybe you meant Restart()?";
CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: "
<< tablet_servers_.size() << "). Maybe you meant Restart()?";
RETURN_NOT_OK(HandleOptions());
gflags::FlagSaver saver;
FLAGS_dns_addr_resolution_override = dns_overrides_;
std::shared_ptr<JwtVerifier> jwt_verifier = nullptr;
if (opts_.enable_client_jwt) {
oidc_.reset(new MiniOidc(opts_.mini_oidc_options));
string jwks_url = "default.url";
if (opts_.start_jwks) {
RETURN_NOT_OK_PREPEND(oidc_->Start(), "Failed to start OIDC endpoints");
jwks_url = oidc_->jwks_url();
}
jwt_verifier =
std::make_shared<KeyBasedJwtVerifier>(jwks_url,
/* jwks_verify_server_certificate */ true,
opts_.mini_oidc_options.server_certificate);
}
RETURN_NOT_OK_PREPEND(
rpc::MessengerBuilder("minicluster-messenger")
.set_num_reactors(1)
.set_max_negotiation_threads(1)
.set_rpc_negotiation_timeout_ms(opts_.rpc_negotiation_timeout.ToMilliseconds())
.set_sasl_proto_name(opts_.principal)
.set_jwt_verifier(std::move(jwt_verifier))
.Build(&messenger_),
"Failed to start Messenger for minicluster");
Status s = env()->CreateDir(opts_.cluster_root);
if (!s.ok() && !s.IsAlreadyPresent()) {
RETURN_NOT_OK_PREPEND(s, "Could not create root dir " + opts_.cluster_root);
}
if (opts_.enable_kerberos) {
kdc_.reset(new MiniKdc(opts_.mini_kdc_options));
RETURN_NOT_OK(kdc_->Start());
RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-admin"),
"could not create admin principal");
RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-user"),
"could not create user principal");
RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"),
"could not create unauthorized principal");
RETURN_NOT_OK_PREPEND(kdc_->CreateKeytabForExistingPrincipal("test-user"),
"could not create client keytab");
RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"),
"could not kinit as admin");
RETURN_NOT_OK_PREPEND(kdc_->SetKrb5Environment(),
"could not set krb5 client env");
}
#if !defined(NO_CHRONY)
// Start NTP servers, if requested.
if (opts_.num_ntp_servers > 0) {
// Collect and keep alive the set of sockets bound with SO_REUSEPORT option
// until all chronyd proccesses are started. This allows to avoid port
// conflicts: chronyd doesn't support binding to wildcard addresses and
// it's necessary to make sure chronyd is able to bind to the port specified
// in its configuration. So, the mini-cluster reserves a set of ports up
// front, then starts the set of chronyd processes, each bound to one
// of the reserved ports.
vector<unique_ptr<Socket>> reserved_sockets;
for (auto i = 0; i < opts_.num_ntp_servers; ++i) {
unique_ptr<Socket> reserved_socket;
RETURN_NOT_OK_PREPEND(ReserveDaemonSocket(
DaemonType::EXTERNAL_SERVER, i, opts_.bind_mode, &reserved_socket),
"failed to reserve chronyd socket address");
Sockaddr addr;
RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr));
reserved_sockets.emplace_back(std::move(reserved_socket));
RETURN_NOT_OK_PREPEND(AddNtpServer(addr),
Substitute("failed to start NTP server $0", i));
}
}
#endif // #if !defined(NO_CHRONY) ...
if (opts_.enable_ranger || opts_.enable_ranger_kms) {
if (!postgres_ || !postgres_->IsStarted()) {
postgres_.reset(new postgres::MiniPostgres(cluster_root(), GetBindIpForExternalServer(0)));
}
string host = GetBindIpForExternalServer(0);
ranger_.reset(new ranger::MiniRanger(cluster_root(), host, postgres_));
if (opts_.enable_kerberos) {
// The SPNs match the ones defined in mini_ranger_configs.h.
string admin_keytab;
RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
Substitute("rangeradmin/$0@KRBTEST.COM", host),
&admin_keytab),
"could not create rangeradmin keytab");
string lookup_keytab;
RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
Substitute("rangerlookup/$0@KRBTEST.COM", host),
&lookup_keytab),
"could not create rangerlookup keytab");
string spnego_keytab;
RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
Substitute("HTTP/$0@KRBTEST.COM", host),
&spnego_keytab),
"could not create ranger HTTP keytab");
ranger_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], admin_keytab,
lookup_keytab, spnego_keytab);
}
RETURN_NOT_OK_PREPEND(ranger_->Start(), "Failed to start the Ranger service");
RETURN_NOT_OK_PREPEND(ranger_->CreateClientConfig(JoinPathSegments(cluster_root(),
"ranger-client")),
"Failed to write Ranger client config");
}
if (opts_.enable_ranger_kms) {
string host = GetBindIpForExternalServer(0);
ranger_kms_.reset(new rangerkms::MiniRangerKMS(cluster_root(), host, postgres_, ranger_));
if (opts_.enable_kerberos) {
string keytab;
RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
Substitute("rangerkms/$0@KRBTEST.COM", host),
&keytab),
"could not create ranger kms keytab");
string spnego_keytab;
RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
Substitute("HTTP/$0@KRBTEST.COM", host),
&spnego_keytab),
"could not create ranger kms keytab");
ranger_kms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], keytab, spnego_keytab);
}
RETURN_NOT_OK(kdc_->CreateUserPrincipal("keyadmin"));
RETURN_NOT_OK(kdc_->Kinit("keyadmin"));
RETURN_NOT_OK_PREPEND(ranger_kms_->Start(), "Failed to start the Ranger KMS service");
RETURN_NOT_OK_PREPEND(ranger_kms_->CreateClusterKey(opts_.ranger_cluster_key,
&opts_.ranger_cluster_key_version),
"Failed to create cluster key");;
RETURN_NOT_OK(kdc_->Kinit("test-admin"));
}
// Start the HMS.
if (opts_.hms_mode == HmsMode::DISABLE_HIVE_METASTORE ||
opts_.hms_mode == HmsMode::ENABLE_HIVE_METASTORE ||
opts_.hms_mode == HmsMode::ENABLE_METASTORE_INTEGRATION) {
hms_.reset(new hms::MiniHms());
hms_->SetDataRoot(opts_.cluster_root);
if (opts_.hms_mode == HmsMode::DISABLE_HIVE_METASTORE) {
hms_->EnableKuduPlugin(false);
}
if (opts_.enable_kerberos) {
string spn = Substitute("hive/$0", hms_->address().host());
string ktpath;
RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(spn, &ktpath),
"could not create keytab");
hms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], spn, ktpath,
rpc::SaslProtection::kAuthentication);
// Set the protocol name in the environment so that the KuduMetastorePlugin
// can communicate with Kudu when a custom name is used.
hms_->AddEnvVar("KUDU_SASL_PROTOCOL_NAME", opts_.principal);
}
RETURN_NOT_OK_PREPEND(hms_->Start(),
"Failed to start the Hive Metastore");
}
RETURN_NOT_OK_PREPEND(StartMasters(), "failed to start masters");
for (int i = 1; i <= opts_.num_tablet_servers; i++) {
RETURN_NOT_OK_PREPEND(AddTabletServer(), Substitute("failed to start tablet server $0", i));
}
RETURN_NOT_OK(WaitForTabletServerCount(
opts_.num_tablet_servers,
MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));
return Status::OK();
}