in be/src/service/impala-server.cc [3122:3392]
Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
int32_t hs2_http_port, int32_t external_fe_port) {
exec_env_->SetImpalaServer(this);
#ifdef CALLONCEHACK
// Include this calloncehack call (which is a no-op) to make sure calloncehack
// is required at link time when using it.
calloncehack::InitializeCallOnceHack();
#endif
// We must register the HTTP handlers after registering the ImpalaServer with the
// ExecEnv. Otherwise the HTTP handlers will try to resolve the ImpalaServer through the
// ExecEnv singleton and will receive a nullptr.
http_handler_.reset(ImpalaHttpHandler::CreateImpaladHandler(
this, exec_env_->admission_controller(), exec_env_->cluster_membership_mgr()));
http_handler_->RegisterHandlers(exec_env_->webserver());
if (exec_env_->metrics_webserver() != nullptr) {
http_handler_->RegisterHandlers(
exec_env_->metrics_webserver(), /* metrics_only */ true);
}
if (!FLAGS_is_coordinator && !FLAGS_is_executor) {
return Status("Impala does not have a valid role configured. "
"Either --is_coordinator or --is_executor must be set to true.");
}
// Subscribe with the statestore. Coordinators need to subscribe to the catalog topic
// then wait for the initial catalog update.
RETURN_IF_ERROR(exec_env_->StartStatestoreSubscriberService());
kudu::Version target_schema_version;
if (FLAGS_enable_workload_mgmt && !FLAGS_catalogd_deployed) {
// TODO(IMPALA-13830): Enable workload management in lightweight deployments.
return Status("Workload management needs CatalogD to be deployed.");
}
if (FLAGS_is_coordinator) {
if (FLAGS_enable_workload_mgmt) {
ABORT_IF_ERROR(workloadmgmt::ParseSchemaVersionFlag(&target_schema_version));
ABORT_IF_ERROR(workloadmgmt::StartupChecks(target_schema_version));
}
exec_env_->frontend()->WaitForCatalog();
ABORT_IF_ERROR(UpdateCatalogMetrics());
}
SSLProtocol ssl_version = SSLProtocol::TLSv1_0;
if (IsExternalTlsConfigured() || IsInternalTlsConfigured()) {
RETURN_IF_ERROR(
SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
}
if (!FLAGS_is_coordinator) {
LOG(INFO) << "Initialized executor Impala server on "
<< TNetworkAddressToString(exec_env_->configured_backend_address());
} else {
// Load JWKS from file if validation for signature of JWT token is enabled.
if (FLAGS_jwt_token_auth && FLAGS_jwt_validate_signature) {
if (!FLAGS_jwks_file_path.empty()) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->GetJWTHelperInstance()->Init(
FLAGS_jwks_file_path));
} else if (!FLAGS_jwks_url.empty()) {
if (TestInfo::is_test()) sleep(1);
RETURN_IF_ERROR(ExecEnv::GetInstance()->GetJWTHelperInstance()->Init(
FLAGS_jwks_url, FLAGS_jwks_verify_server_certificate,
FLAGS_jwks_ca_certificate, false));
} else {
LOG(ERROR) << "JWKS file is not specified when the validation of JWT signature "
<< " is enabled.";
return Status("JWKS file is not specified");
}
}
// Load JWKS from file if validation for signature of OAuth token is enabled.
if (FLAGS_oauth_token_auth && FLAGS_oauth_jwt_validate_signature) {
if (!FLAGS_oauth_jwks_file_path.empty()) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->GetOAuthHelperInstance()->Init(
FLAGS_oauth_jwks_file_path));
} else if (!FLAGS_oauth_jwks_url.empty()) {
if (TestInfo::is_test()) sleep(1);
RETURN_IF_ERROR(ExecEnv::GetInstance()->GetOAuthHelperInstance()->Init(
FLAGS_oauth_jwks_url, FLAGS_oauth_jwks_verify_server_certificate,
FLAGS_oauth_jwks_ca_certificate, false));
} else {
LOG(ERROR) << "JWKS file is not specified when the validation of OAuth signature "
<< " is enabled.";
return Status("JWKS file for OAuth is not specified");
}
}
// Initialize the client servers.
shared_ptr<ImpalaServer> handler = shared_from_this();
if (beeswax_port > 0 || (TestInfo::is_test() && beeswax_port == 0)) {
shared_ptr<TProcessor> beeswax_processor(
new ImpalaServiceProcessor(handler));
shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("beeswax", exec_env_->metrics()));
beeswax_processor->setEventHandler(event_handler);
ThriftServerBuilder builder(BEESWAX_SERVER_NAME, beeswax_processor, beeswax_port);
if (IsExternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for Beeswax";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
ThriftServer* server;
RETURN_IF_ERROR(
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.keepalive(FLAGS_client_keepalive_probe_period_s,
FLAGS_client_keepalive_retry_period_s,
FLAGS_client_keepalive_retry_count)
.Build(&server));
beeswax_server_.reset(server);
beeswax_server_->SetConnectionHandler(this);
}
if (hs2_port > 0 || (TestInfo::is_test() && hs2_port == 0)) {
shared_ptr<TProcessor> hs2_fe_processor(
new ImpalaHiveServer2ServiceProcessor(handler));
shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("hs2", exec_env_->metrics()));
hs2_fe_processor->setEventHandler(event_handler);
ThriftServerBuilder builder(HS2_SERVER_NAME, hs2_fe_processor, hs2_port);
if (IsExternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for HiveServer2";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
ThriftServer* server;
RETURN_IF_ERROR(
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.keepalive(FLAGS_client_keepalive_probe_period_s,
FLAGS_client_keepalive_retry_period_s,
FLAGS_client_keepalive_retry_count)
.Build(&server));
hs2_server_.reset(server);
hs2_server_->SetConnectionHandler(this);
}
if (external_fe_port > 0 || (TestInfo::is_test() && external_fe_port == 0)) {
shared_ptr<TProcessor> external_fe_processor(
new ImpalaHiveServer2ServiceProcessor(handler));
shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("external_frontend", exec_env_->metrics()));
external_fe_processor->setEventHandler(event_handler);
ThriftServer::TransportType external_fe_port_transport =
ThriftServer::TransportType::BINARY;
if (FLAGS_enable_external_fe_http) {
LOG(INFO) << "External FE endpoint is using HTTP for transport";
external_fe_port_transport = ThriftServer::TransportType::HTTP;
}
ThriftServerBuilder builder(EXTERNAL_FRONTEND_SERVER_NAME, external_fe_processor,
external_fe_port);
ThriftServer* server;
RETURN_IF_ERROR(
builder
.auth_provider(
AuthManager::GetInstance()->GetExternalFrontendAuthProvider())
.transport_type(external_fe_port_transport)
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.keepalive(FLAGS_client_keepalive_probe_period_s,
FLAGS_client_keepalive_retry_period_s,
FLAGS_client_keepalive_retry_count)
.Build(&server));
external_fe_server_.reset(server);
external_fe_server_->SetConnectionHandler(this);
}
if (hs2_http_port > 0 || (TestInfo::is_test() && hs2_http_port == 0)) {
shared_ptr<TProcessor> hs2_http_processor(
new ImpalaHiveServer2ServiceProcessor(handler));
shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("hs2_http", exec_env_->metrics()));
hs2_http_processor->setEventHandler(event_handler);
ThriftServer* http_server;
ThriftServerBuilder http_builder(
HS2_HTTP_SERVER_NAME, hs2_http_processor, hs2_http_port);
if (IsExternalTlsConfigured()) {
LOG(INFO) << "Enabling SSL for HiveServer2 HTTP endpoint.";
http_builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
RETURN_IF_ERROR(
http_builder
.auth_provider(AuthManager::GetInstance()->GetExternalHttpAuthProvider())
.transport_type(ThriftServer::TransportType::HTTP)
.metrics(exec_env_->metrics())
.max_concurrent_connections(FLAGS_fe_service_threads)
.queue_timeout_ms(FLAGS_accepted_client_cnxn_timeout)
.idle_poll_period_ms(FLAGS_idle_client_poll_period_s * MILLIS_PER_SEC)
.keepalive(FLAGS_client_keepalive_probe_period_s,
FLAGS_client_keepalive_retry_period_s,
FLAGS_client_keepalive_retry_count)
.Build(&http_server));
hs2_http_server_.reset(http_server);
hs2_http_server_->SetConnectionHandler(this);
}
// Initialize workload management (if enabled).
{
lock_guard<mutex> l(workload_mgmt_state_mu_);
// Skip starting workload management if workload management is not enabled or the
// coordinator shutdown has run before this code runs.
// Workload management initial checks must have already been run and the schema
// version from the startup flag parsed into a `kudu::Version` object and stored in
// the `target_schema_version` variable.
if (FLAGS_enable_workload_mgmt
&& workload_mgmt_state_ == workloadmgmt::WorkloadManagementState::NOT_STARTED) {
ABORT_IF_ERROR(Thread::Create("impala-server", "completed-queries",
bind<void>(&ImpalaServer::WorkloadManagementWorker, this,
target_schema_version), &workload_management_thread_));
}
}
}
LOG(INFO) << "Initialized coordinator/executor Impala server on "
<< TNetworkAddressToString(exec_env_->configured_backend_address());
// Start the RPC services.
RETURN_IF_ERROR(exec_env_->StartKrpcService());
if (hs2_server_.get()) {
RETURN_IF_ERROR(hs2_server_->Start());
LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_server_->port();
}
if (hs2_http_server_.get()) {
RETURN_IF_ERROR(hs2_http_server_->Start());
LOG(INFO) << "Impala HiveServer2 Service (HTTP) listening on "
<< hs2_http_server_->port();
}
if (external_fe_server_.get()) {
RETURN_IF_ERROR(external_fe_server_->Start());
LOG(INFO) << "Impala External Frontend Service listening on "
<< external_fe_server_->port();
}
if (beeswax_server_.get()) {
RETURN_IF_ERROR(beeswax_server_->Start());
LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_server_->port();
}
RETURN_IF_ERROR(DebugAction(FLAGS_debug_actions, "IMPALA_SERVER_END_OF_START"));
services_started_ = true;
ImpaladMetrics::IMPALA_SERVER_READY->SetValue(true);
LOG(INFO) << "Impala has started.";
return Status::OK();
}