Status ExternalMiniCluster::Start()

in src/kudu/mini-cluster/external_mini_cluster.cc [274:460]


Status ExternalMiniCluster::Start() {
  CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size()
      << "). Maybe you meant Restart()?";
  CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: "
      << tablet_servers_.size() << "). Maybe you meant Restart()?";
  RETURN_NOT_OK(HandleOptions());
  gflags::FlagSaver saver;
  FLAGS_dns_addr_resolution_override = dns_overrides_;

  std::shared_ptr<JwtVerifier> jwt_verifier = nullptr;
  if (opts_.enable_client_jwt) {
    oidc_.reset(new MiniOidc(opts_.mini_oidc_options));
    string jwks_url = "default.url";
    if (opts_.start_jwks) {
      RETURN_NOT_OK_PREPEND(oidc_->Start(), "Failed to start OIDC endpoints");
      jwks_url = oidc_->jwks_url();
    }
    jwt_verifier =
        std::make_shared<KeyBasedJwtVerifier>(jwks_url,
                                              /* jwks_verify_server_certificate */ true,
                                              opts_.mini_oidc_options.server_certificate);

  }

  RETURN_NOT_OK_PREPEND(
      rpc::MessengerBuilder("minicluster-messenger")
          .set_num_reactors(1)
          .set_max_negotiation_threads(1)
          .set_rpc_negotiation_timeout_ms(opts_.rpc_negotiation_timeout.ToMilliseconds())
          .set_sasl_proto_name(opts_.principal)
          .set_jwt_verifier(std::move(jwt_verifier))
          .Build(&messenger_),
      "Failed to start Messenger for minicluster");

  Status s = env()->CreateDir(opts_.cluster_root);
  if (!s.ok() && !s.IsAlreadyPresent()) {
    RETURN_NOT_OK_PREPEND(s, "Could not create root dir " + opts_.cluster_root);
  }

  if (opts_.enable_kerberos) {
    kdc_.reset(new MiniKdc(opts_.mini_kdc_options));
    RETURN_NOT_OK(kdc_->Start());
    RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-admin"),
                          "could not create admin principal");
    RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-user"),
                          "could not create user principal");
    RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"),
                          "could not create unauthorized principal");

    RETURN_NOT_OK_PREPEND(kdc_->CreateKeytabForExistingPrincipal("test-user"),
                         "could not create client keytab");

    RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"),
                          "could not kinit as admin");

    RETURN_NOT_OK_PREPEND(kdc_->SetKrb5Environment(),
                          "could not set krb5 client env");
  }

#if !defined(NO_CHRONY)
  // Start NTP servers, if requested.
  if (opts_.num_ntp_servers > 0) {
    // Collect and keep alive the set of sockets bound with SO_REUSEPORT option
    // until all chronyd proccesses are started. This allows to avoid port
    // conflicts: chronyd doesn't support binding to wildcard addresses and
    // it's necessary to make sure chronyd is able to bind to the port specified
    // in its configuration. So, the mini-cluster reserves a set of ports up
    // front, then starts the set of chronyd processes, each bound to one
    // of the reserved ports.
    vector<unique_ptr<Socket>> reserved_sockets;
    for (auto i = 0; i < opts_.num_ntp_servers; ++i) {
      unique_ptr<Socket> reserved_socket;
      RETURN_NOT_OK_PREPEND(ReserveDaemonSocket(
          DaemonType::EXTERNAL_SERVER, i, opts_.bind_mode, &reserved_socket),
          "failed to reserve chronyd socket address");
      Sockaddr addr;
      RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr));
      reserved_sockets.emplace_back(std::move(reserved_socket));

      RETURN_NOT_OK_PREPEND(AddNtpServer(addr),
                            Substitute("failed to start NTP server $0", i));
    }
  }
#endif // #if !defined(NO_CHRONY) ...

  if (opts_.enable_ranger || opts_.enable_ranger_kms) {
    if (!postgres_ || !postgres_->IsStarted()) {
      postgres_.reset(new postgres::MiniPostgres(cluster_root(), GetBindIpForExternalServer(0)));
    }
    string host = GetBindIpForExternalServer(0);
    ranger_.reset(new ranger::MiniRanger(cluster_root(), host, postgres_));
    if (opts_.enable_kerberos) {

      // The SPNs match the ones defined in mini_ranger_configs.h.
      string admin_keytab;
      RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
            Substitute("rangeradmin/$0@KRBTEST.COM", host),
            &admin_keytab),
          "could not create rangeradmin keytab");

      string lookup_keytab;
      RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
            Substitute("rangerlookup/$0@KRBTEST.COM", host),
            &lookup_keytab),
          "could not create rangerlookup keytab");

      string spnego_keytab;
      RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
            Substitute("HTTP/$0@KRBTEST.COM", host),
            &spnego_keytab),
          "could not create ranger HTTP keytab");

      ranger_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], admin_keytab,
                              lookup_keytab, spnego_keytab);
    }

    RETURN_NOT_OK_PREPEND(ranger_->Start(), "Failed to start the Ranger service");
    RETURN_NOT_OK_PREPEND(ranger_->CreateClientConfig(JoinPathSegments(cluster_root(),
                                                                       "ranger-client")),
                          "Failed to write Ranger client config");
  }

  if (opts_.enable_ranger_kms) {
    string host = GetBindIpForExternalServer(0);
    ranger_kms_.reset(new rangerkms::MiniRangerKMS(cluster_root(), host, postgres_, ranger_));
    if (opts_.enable_kerberos) {
      string keytab;
      RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
          Substitute("rangerkms/$0@KRBTEST.COM", host),
          &keytab),
        "could not create ranger kms keytab");
      string spnego_keytab;
      RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(
          Substitute("HTTP/$0@KRBTEST.COM", host),
          &spnego_keytab),
        "could not create ranger kms keytab");

      ranger_kms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], keytab, spnego_keytab);
    }
    RETURN_NOT_OK(kdc_->CreateUserPrincipal("keyadmin"));
    RETURN_NOT_OK(kdc_->Kinit("keyadmin"));
    RETURN_NOT_OK_PREPEND(ranger_kms_->Start(), "Failed to start the Ranger KMS service");
    RETURN_NOT_OK_PREPEND(ranger_kms_->CreateClusterKey(opts_.ranger_cluster_key,
                                                        &opts_.ranger_cluster_key_version),
                          "Failed to create cluster key");;
    RETURN_NOT_OK(kdc_->Kinit("test-admin"));
  }

  // Start the HMS.
  if (opts_.hms_mode == HmsMode::DISABLE_HIVE_METASTORE ||
      opts_.hms_mode == HmsMode::ENABLE_HIVE_METASTORE ||
      opts_.hms_mode == HmsMode::ENABLE_METASTORE_INTEGRATION) {
    hms_.reset(new hms::MiniHms());
    hms_->SetDataRoot(opts_.cluster_root);

    if (opts_.hms_mode == HmsMode::DISABLE_HIVE_METASTORE) {
      hms_->EnableKuduPlugin(false);
    }

    if (opts_.enable_kerberos) {
      string spn = Substitute("hive/$0", hms_->address().host());
      string ktpath;
      RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(spn, &ktpath),
                            "could not create keytab");
      hms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], spn, ktpath,
                           rpc::SaslProtection::kAuthentication);

      // Set the protocol name in the environment so that the KuduMetastorePlugin
      // can communicate with Kudu when a custom name is used.
      hms_->AddEnvVar("KUDU_SASL_PROTOCOL_NAME", opts_.principal);
    }

    RETURN_NOT_OK_PREPEND(hms_->Start(),
                          "Failed to start the Hive Metastore");
  }

  RETURN_NOT_OK_PREPEND(StartMasters(), "failed to start masters");

  for (int i = 1; i <= opts_.num_tablet_servers; i++) {
    RETURN_NOT_OK_PREPEND(AddTabletServer(), Substitute("failed to start tablet server $0", i));
  }
  RETURN_NOT_OK(WaitForTabletServerCount(
                  opts_.num_tablet_servers,
                  MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds)));

  return Status::OK();
}