in be/src/runtime/exec-env.cc [345:575]
Status ExecEnv::Init() {
LOG(INFO) << "Initializing impalad with backend uuid: " << PrintId(backend_id_);
// Initialize thread pools
if (FLAGS_is_coordinator) {
RETURN_IF_ERROR(hdfs_op_thread_pool_->Init());
}
RETURN_IF_ERROR(async_rpc_pool_->Init());
int64_t bytes_limit;
RETURN_IF_ERROR(ChooseProcessMemLimit(&bytes_limit));
// Need to register JVM metrics first so that we can use them to compute the buffer pool
// limit.
JvmMemoryMetric::InitMetrics(metrics_.get());
if (!BitUtil::IsPowerOf2(FLAGS_min_buffer_size)) {
return Status(Substitute(
"--min_buffer_size must be a power-of-two: $0", FLAGS_min_buffer_size));
}
// The bytes limit we want to size everything else as a fraction of, excluding the
// JVM.
admit_mem_limit_ = bytes_limit;
if (FLAGS_mem_limit_includes_jvm) {
// The JVM max heap size is static and therefore known at this point. Other categories
// of JVM memory consumption are much smaller and dynamic so it is simpler not to
// include them here.
int64_t jvm_max_heap_size = JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
admit_mem_limit_ -= jvm_max_heap_size;
if (admit_mem_limit_ <= 0) {
return Status(
Substitute("Invalid combination of --mem_limit_includes_jvm and JVM max heap "
"size $0, which must be smaller than process memory limit $1",
jvm_max_heap_size, bytes_limit));
}
}
bool is_percent = false;
int64_t codegen_cache_capacity =
ParseUtil::ParseMemSpec(FLAGS_codegen_cache_capacity, &is_percent, 0);
if (codegen_cache_capacity > 0) {
// If codegen_cache_capacity is larger than 0, the number should not be a percentage.
DCHECK(!is_percent);
int64_t codegen_cache_limit = admit_mem_limit_ * MAX_CODEGEN_CACHE_MEM_PERCENT;
DCHECK(codegen_cache_limit > 0);
if (codegen_cache_capacity > codegen_cache_limit) {
LOG(INFO) << "CodeGen Cache capacity changed from "
<< PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES) << " to "
<< PrettyPrinter::Print(codegen_cache_limit, TUnit::BYTES)
<< " due to reaching the limit.";
codegen_cache_capacity = codegen_cache_limit;
}
codegen_cache_.reset(new CodeGenCache(metrics_.get()));
RETURN_IF_ERROR(codegen_cache_->Init(codegen_cache_capacity));
LOG(INFO) << "CodeGen Cache initialized with capacity "
<< PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES);
// Preserve the memory for codegen cache.
admit_mem_limit_ -= codegen_cache_capacity;
DCHECK_GT(admit_mem_limit_, 0);
} else {
LOG(INFO) << "CodeGen Cache is disabled.";
}
// Initialize the tuple cache
RETURN_IF_ERROR(tuple_cache_mgr_->Init());
LOG(INFO) << "Admit memory limit: "
<< PrettyPrinter::Print(admit_mem_limit_, TUnit::BYTES);
int64_t buffer_pool_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_limit,
&is_percent, admit_mem_limit_);
if (buffer_pool_limit <= 0) {
return Status(Substitute("Invalid --buffer_pool_limit value, must be a "
"positive bytes value or percentage: $0",
FLAGS_buffer_pool_limit));
}
buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, FLAGS_min_buffer_size);
LOG(INFO) << "Buffer pool limit: "
<< PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
int64_t clean_pages_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_clean_pages_limit,
&is_percent, buffer_pool_limit);
if (clean_pages_limit <= 0) {
return Status(Substitute("Invalid --buffer_pool_clean_pages_limit value, must be a "
"positive bytes value or percentage: $0",
FLAGS_buffer_pool_clean_pages_limit));
}
InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);
admission_slots_ = CpuInfo::num_cores();
if (FLAGS_admission_control_slots > 0) {
if (FLAGS_max_concurrent_queries > 0) {
LOG(WARNING) << "Ignored --max_concurrent_queries, --admission_control_slots was "
<< "set and takes precedence.";
}
admission_slots_ = FLAGS_admission_control_slots;
} else if (FLAGS_max_concurrent_queries > 0) {
admission_slots_ = FLAGS_max_concurrent_queries;
} else if (FLAGS_is_coordinator && !FLAGS_is_executor) {
// By default we assume that dedicated coordinators can handle more queries than
// executors.
admission_slots_ *= COORDINATOR_CONCURRENCY_MULTIPLIER;
}
InitSystemStateInfo();
if (enable_webserver_) {
RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(webserver_.get()));
}
if (FLAGS_metrics_webserver_port > 0) {
RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(metrics_webserver_.get()));
RETURN_IF_ERROR(metrics_webserver_->Start());
}
catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
catalogd_lightweight_req_client_cache_->InitMetrics(
metrics_.get(), "catalog.server", "for-lightweight-rpc");
RETURN_IF_ERROR(RegisterMemoryMetrics(
metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
// Initialize impalad metrics
ImpaladMetrics::CreateMetrics(
exec_env_->metrics()->GetOrCreateChildGroup("impala-server"));
InitMemTracker(bytes_limit);
// Initializes the RPCMgr, ControlServices and DataStreamServices.
// Initialization needs to happen in the following order due to dependencies:
// - RPC manager, DataStreamService and DataStreamManager.
RETURN_IF_ERROR(rpc_mgr_->Init(krpc_address_));
control_svc_.reset(new ControlService(rpc_metrics_));
RETURN_IF_ERROR(control_svc_->Init());
data_svc_.reset(new DataStreamService(rpc_metrics_));
RETURN_IF_ERROR(data_svc_->Init());
RETURN_IF_ERROR(stream_mgr_->Init(data_svc_->mem_tracker()));
// Bump thread cache to 1GB to reduce contention for TCMalloc central
// list's spinlock.
if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
}
#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
const static char* TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES =
"tcmalloc.max_total_thread_cache_bytes";
// Change the total TCMalloc thread cache size if necessary.
if (FLAGS_tcmalloc_max_total_thread_cache_bytes > 0 &&
!MallocExtension::instance()->SetNumericProperty(
TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES,
FLAGS_tcmalloc_max_total_thread_cache_bytes)) {
return Status(Substitute("Failed to change {0}",
TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES));
}
// Read the value back from tcmalloc to verify it matches what we set.
size_t actual_max_total_thread_cache_bytes = 0;
bool retval = MallocExtension::instance()->GetNumericProperty(
TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES,
&actual_max_total_thread_cache_bytes);
if (!retval) {
return Status(Substitute("Could not retrieve value of {0}.",
TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES));
}
if (actual_max_total_thread_cache_bytes !=
FLAGS_tcmalloc_max_total_thread_cache_bytes) {
LOG(WARNING) << "Set " << TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES << " to "
<< FLAGS_tcmalloc_max_total_thread_cache_bytes << " bytes but actually "
<< "using " << actual_max_total_thread_cache_bytes << " bytes.";
}
// A MemTracker for TCMalloc overhead which is the difference between the physical bytes
// reserved (TcmallocMetric::PHYSICAL_BYTES_RESERVED) and the bytes in use
// (TcmallocMetrics::BYTES_IN_USE). This overhead accounts for all the cached freelists
// used by TCMalloc.
IntGauge* negated_bytes_in_use = obj_pool_->Add(new NegatedGauge(
MakeTMetricDef("negated_tcmalloc_bytes_in_use", TMetricKind::GAUGE, TUnit::BYTES),
TcmallocMetric::BYTES_IN_USE));
vector<IntGauge*> overhead_metrics;
overhead_metrics.push_back(negated_bytes_in_use);
overhead_metrics.push_back(TcmallocMetric::PHYSICAL_BYTES_RESERVED);
SumGauge* tcmalloc_overhead = obj_pool_->Add(new SumGauge(
MakeTMetricDef("tcmalloc_overhead", TMetricKind::GAUGE, TUnit::BYTES),
overhead_metrics));
obj_pool_->Add(
new MemTracker(tcmalloc_overhead, -1, "TCMalloc Overhead", mem_tracker_.get()));
#endif
mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process");
RETURN_IF_ERROR(disk_io_mgr_->Init());
// Start services in order to ensure that dependencies between them are met
if (enable_webserver_) {
AddDefaultUrlCallbacks(webserver_.get(), metrics_.get(), mem_tracker_.get());
RETURN_IF_ERROR(webserver_->Start());
} else {
LOG(INFO) << "Not starting webserver";
}
RETURN_IF_ERROR(cluster_membership_mgr_->Init());
if (FLAGS_is_coordinator && frontend_ != nullptr) {
cluster_membership_mgr_->RegisterUpdateCallbackFn(
[this](const ClusterMembershipMgr::SnapshotPtr& snapshot) {
SendClusterMembershipToFrontend(snapshot,
this->cluster_membership_mgr()->GetExpectedExecGroupSets(),
this->frontend());
});
}
RETURN_IF_ERROR(admission_controller_->Init());
RETURN_IF_ERROR(InitHadoopConfig());
// If 'ai_api_key_jceks_secret' is set then extract the api_key and populate
// AIFunctions::ai_api_key_
if (frontend_ != nullptr && FLAGS_ai_api_key_jceks_secret != "") {
string api_key;
RETURN_IF_ERROR(
frontend_->GetSecretFromKeyStore(FLAGS_ai_api_key_jceks_secret, &api_key));
AiFunctions::set_api_key(api_key);
}
// Validate default ai_endpoint.
if (FLAGS_ai_endpoint != "" &&
!AiFunctions::is_api_endpoint_supported(FLAGS_ai_endpoint)) {
string supported_platforms = Substitute("$0, $1",
AiFunctions::OPEN_AI_AZURE_ENDPOINT, AiFunctions::OPEN_AI_PUBLIC_ENDPOINT);
if (!FLAGS_ai_additional_platforms.empty()) {
supported_platforms += ", " + FLAGS_ai_additional_platforms;
}
return Status(Substitute("Unsupported --ai_endpoint=$0, supported platforms are: $1",
FLAGS_ai_endpoint, supported_platforms));
}
jwt_helper_ = new JWTHelper();
oauth_helper_ = new JWTHelper();
return Status::OK();
}