Status ImpalaServer::Start()

in be/src/service/impala-server.cc [3122:3392]


Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
    int32_t hs2_http_port, int32_t external_fe_port) {
  exec_env_->SetImpalaServer(this);

#ifdef CALLONCEHACK
  // Include this calloncehack call (which is a no-op) to make sure calloncehack
  // is required at link time when using it.
  calloncehack::InitializeCallOnceHack();
#endif

  // We must register the HTTP handlers after registering the ImpalaServer with the
  // ExecEnv. Otherwise the HTTP handlers will try to resolve the ImpalaServer through the
  // ExecEnv singleton and will receive a nullptr.
  http_handler_.reset(ImpalaHttpHandler::CreateImpaladHandler(
      this, exec_env_->admission_controller(), exec_env_->cluster_membership_mgr()));
  http_handler_->RegisterHandlers(exec_env_->webserver());
  if (exec_env_->metrics_webserver() != nullptr) {
    http_handler_->RegisterHandlers(
        exec_env_->metrics_webserver(), /* metrics_only */ true);
  }

  if (!FLAGS_is_coordinator && !FLAGS_is_executor) {
    return Status("Impala does not have a valid role configured. "
        "Either --is_coordinator or --is_executor must be set to true.");
  }

  // Subscribe with the statestore. Coordinators need to subscribe to the catalog topic
  // then wait for the initial catalog update.
  RETURN_IF_ERROR(exec_env_->StartStatestoreSubscriberService());

  kudu::Version target_schema_version;

  if (FLAGS_enable_workload_mgmt && !FLAGS_catalogd_deployed) {
    // TODO(IMPALA-13830): Enable workload management in lightweight deployments.
    return Status("Workload management needs CatalogD to be deployed.");
  }

  if (FLAGS_is_coordinator) {
    if (FLAGS_enable_workload_mgmt) {
      ABORT_IF_ERROR(workloadmgmt::ParseSchemaVersionFlag(&target_schema_version));
      ABORT_IF_ERROR(workloadmgmt::StartupChecks(target_schema_version));
    }

    exec_env_->frontend()->WaitForCatalog();
    ABORT_IF_ERROR(UpdateCatalogMetrics());
  }

  SSLProtocol ssl_version = SSLProtocol::TLSv1_0;
  if (IsExternalTlsConfigured() || IsInternalTlsConfigured()) {
    RETURN_IF_ERROR(
        SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
  }

  if (!FLAGS_is_coordinator) {
    LOG(INFO) << "Initialized executor Impala server on "
              << TNetworkAddressToString(exec_env_->configured_backend_address());
  } else {
    // Load JWKS from file if validation for signature of JWT token is enabled.
    if (FLAGS_jwt_token_auth && FLAGS_jwt_validate_signature) {
      if (!FLAGS_jwks_file_path.empty()) {
        RETURN_IF_ERROR(ExecEnv::GetInstance()->GetJWTHelperInstance()->Init(
            FLAGS_jwks_file_path));
      } else if (!FLAGS_jwks_url.empty()) {
        if (TestInfo::is_test()) sleep(1);
        RETURN_IF_ERROR(ExecEnv::GetInstance()->GetJWTHelperInstance()->Init(
            FLAGS_jwks_url, FLAGS_jwks_verify_server_certificate,
            FLAGS_jwks_ca_certificate, false));
      } else {
        LOG(ERROR) << "JWKS file is not specified when the validation of JWT signature "
                   << " is enabled.";
        return Status("JWKS file is not specified");
      }
    }

    // Load JWKS from file if validation for signature of OAuth token is enabled.
    if (FLAGS_oauth_token_auth && FLAGS_oauth_jwt_validate_signature) {
      if (!FLAGS_oauth_jwks_file_path.empty()) {
        RETURN_IF_ERROR(ExecEnv::GetInstance()->GetOAuthHelperInstance()->Init(
            FLAGS_oauth_jwks_file_path));
      } else if (!FLAGS_oauth_jwks_url.empty()) {
        if (TestInfo::is_test()) sleep(1);
        RETURN_IF_ERROR(ExecEnv::GetInstance()->GetOAuthHelperInstance()->Init(
            FLAGS_oauth_jwks_url, FLAGS_oauth_jwks_verify_server_certificate,
            FLAGS_oauth_jwks_ca_certificate, false));
      } else {
        LOG(ERROR) << "JWKS file is not specified when the validation of OAuth signature "
                   << " is enabled.";
        return Status("JWKS file for OAuth is not specified");
      }
    }

    // Initialize the client servers.
    shared_ptr<ImpalaServer> handler = shared_from_this();
    if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0)) {
      shared_ptr<TProcessor> beeswax_processor(
          new ImpalaServiceProcessor(handler));
      shared_ptr<TProcessorEventHandler> event_handler(
          new RpcEventHandler("beeswax", exec_env_->metrics()));
      beeswax_processor->setEventHandler(event_handler);
      ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);

      if (IsExternalTlsConfigured()) {
        LOG(INFO) << "Enabling SSL for Beeswax";
        builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
              .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
              .ssl_version(ssl_version)
              .cipher_list(FLAGS_ssl_cipher_list);
      }

      ThriftServer* server;
      RETURN_IF_ERROR(
          builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
          .metrics(exec_env_->metrics())
          .max_concurrent_connections(FLAGS_fe_service_threads)
          .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
          .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
          .keepalive(FLAGS_client_keepalive_probe_period_s,
              FLAGS_client_keepalive_retry_period_s,
              FLAGS_client_keepalive_retry_count)
          .Build(&server));
      beeswax_server_.reset(server);
      beeswax_server_->SetConnectionHandler(this);
    }

    if (hs2_port > 0 || (TestInfo::is_test() && hs2_port == 0)) {
      shared_ptr<TProcessor> hs2_fe_processor(
          new ImpalaHiveServer2ServiceProcessor(handler));
      shared_ptr<TProcessorEventHandler> event_handler(
          new RpcEventHandler("hs2", exec_env_->metrics()));
      hs2_fe_processor->setEventHandler(event_handler);

      ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);

      if (IsExternalTlsConfigured()) {
        LOG(INFO) << "Enabling SSL for HiveServer2";
        builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
              .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
              .ssl_version(ssl_version)
              .cipher_list(FLAGS_ssl_cipher_list);
      }

      ThriftServer* server;
      RETURN_IF_ERROR(
          builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
          .metrics(exec_env_->metrics())
          .max_concurrent_connections(FLAGS_fe_service_threads)
          .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
          .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
          .keepalive(FLAGS_client_keepalive_probe_period_s,
              FLAGS_client_keepalive_retry_period_s,
              FLAGS_client_keepalive_retry_count)
          .Build(&server));
      hs2_server_.reset(server);
      hs2_server_->SetConnectionHandler(this);
    }

    if (external_fe_port > 0 || (TestInfo::is_test() && external_fe_port == 0)) {
      shared_ptr<TProcessor> external_fe_processor(
          new ImpalaHiveServer2ServiceProcessor(handler));
      shared_ptr<TProcessorEventHandler> event_handler(
          new RpcEventHandler("external_frontend", exec_env_->metrics()));
      external_fe_processor->setEventHandler(event_handler);

      ThriftServer::TransportType external_fe_port_transport =
          ThriftServer::TransportType::BINARY;
      if (FLAGS_enable_external_fe_http) {
        LOG(INFO) << "External FE endpoint is using HTTP for transport";
        external_fe_port_transport = ThriftServer::TransportType::HTTP;
      }

      ThriftServerBuilder builder(EXTERNAL_FRONTEND_SERVER_NAME, external_fe_processor,
          external_fe_port);
      ThriftServer* server;
      RETURN_IF_ERROR(
          builder
              .auth_provider(
                  AuthManager::GetInstance()->GetExternalFrontendAuthProvider())
              .transport_type(external_fe_port_transport)
              .metrics(exec_env_->metrics())
              .max_concurrent_connections(FLAGS_fe_service_threads)
              .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
              .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
              .keepalive(FLAGS_client_keepalive_probe_period_s,
                  FLAGS_client_keepalive_retry_period_s,
                  FLAGS_client_keepalive_retry_count)
              .Build(&server));
      external_fe_server_.reset(server);
      external_fe_server_->SetConnectionHandler(this);
    }

    if (hs2_http_port > 0 || (TestInfo::is_test() && hs2_http_port == 0)) {
      shared_ptr<TProcessor> hs2_http_processor(
          new ImpalaHiveServer2ServiceProcessor(handler));
      shared_ptr<TProcessorEventHandler> event_handler(
          new RpcEventHandler("hs2_http", exec_env_->metrics()));
      hs2_http_processor->setEventHandler(event_handler);

      ThriftServer* http_server;
      ThriftServerBuilder http_builder(
          HS2_HTTP_SERVER_NAME, hs2_http_processor, hs2_http_port);
      if (IsExternalTlsConfigured()) {
        LOG(INFO) << "Enabling SSL for HiveServer2 HTTP endpoint.";
        http_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
            .pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
            .ssl_version(ssl_version)
            .cipher_list(FLAGS_ssl_cipher_list);
      }

      RETURN_IF_ERROR(
          http_builder
              .auth_provider(AuthManager::GetInstance()->GetExternalHttpAuthProvider())
              .transport_type(ThriftServer::TransportType::HTTP)
              .metrics(exec_env_->metrics())
              .max_concurrent_connections(FLAGS_fe_service_threads)
              .queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
              .idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
              .keepalive(FLAGS_client_keepalive_probe_period_s,
                  FLAGS_client_keepalive_retry_period_s,
                  FLAGS_client_keepalive_retry_count)
              .Build(&http_server));
      hs2_http_server_.reset(http_server);
      hs2_http_server_->SetConnectionHandler(this);
    }

    // Initialize workload management (if enabled).
    {
      lock_guard<mutex> l(workload_mgmt_state_mu_);

      // Skip starting workload management if workload management is not enabled or the
      // coordinator shutdown has run before this code runs.
      // Workload management initial checks must have already been run and the schema
      // version from the startup flag parsed into a `kudu::Version` object and stored in
      // the `target_schema_version` variable.
      if (FLAGS_enable_workload_mgmt
          && workload_mgmt_state_ == workloadmgmt::WorkloadManagementState::NOT_STARTED) {
        ABORT_IF_ERROR(Thread::Create("impala-server", "completed-queries",
            bind<void>(&ImpalaServer::WorkloadManagementWorker, this,
            target_schema_version), &workload_management_thread_));
      }
    }
  }
  LOG(INFO) << "Initialized coordinator/executor Impala server on "
            << TNetworkAddressToString(exec_env_->configured_backend_address());

  // Start the RPC services.
  RETURN_IF_ERROR(exec_env_->StartKrpcService());
  if (hs2_server_.get()) {
    RETURN_IF_ERROR(hs2_server_->Start());
    LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_server_->port();
  }
  if (hs2_http_server_.get()) {
    RETURN_IF_ERROR(hs2_http_server_->Start());
    LOG(INFO) << "Impala HiveServer2 Service (HTTP) listening on "
              << hs2_http_server_->port();
  }
  if (external_fe_server_.get()) {
    RETURN_IF_ERROR(external_fe_server_->Start());
    LOG(INFO) << "Impala External Frontend Service listening on "
              << external_fe_server_->port();
  }
  if (beeswax_server_.get()) {
    RETURN_IF_ERROR(beeswax_server_->Start());
    LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
  }
  RETURN_IF_ERROR(DebugAction(FLAGS_debug_actions, "IMPALA_SERVER_END_OF_START"));
  services_started_ = true;
  ImpaladMetrics::IMPALA_SERVER_READY->SetValue(true);
  LOG(INFO) << "Impala has started.";

  return Status::OK();
}