Status ExecEnv::Init()

in be/src/runtime/exec-env.cc [345:575]


Status ExecEnv::Init() {
  LOG(INFO) << "Initializing impalad with backend uuid: " << PrintId(backend_id_);
  // Initialize thread pools
  if (FLAGS_is_coordinator) {
    RETURN_IF_ERROR(hdfs_op_thread_pool_->Init());
  }
  RETURN_IF_ERROR(async_rpc_pool_->Init());

  int64_t bytes_limit;
  RETURN_IF_ERROR(ChooseProcessMemLimit(&bytes_limit));

  // Need to register JVM metrics first so that we can use them to compute the buffer pool
  // limit.
  JvmMemoryMetric::InitMetrics(metrics_.get());
  if (!BitUtil::IsPowerOf2(FLAGS_min_buffer_size)) {
    return Status(Substitute(
        "--min_buffer_size must be a power-of-two: $0", FLAGS_min_buffer_size));
  }
  // The bytes limit we want to size everything else as a fraction of, excluding the
  // JVM.
  admit_mem_limit_ = bytes_limit;
  if (FLAGS_mem_limit_includes_jvm) {
    // The JVM max heap size is static and therefore known at this point. Other categories
    // of JVM memory consumption are much smaller and dynamic so it is simpler not to
    // include them here.
    int64_t jvm_max_heap_size = JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
    admit_mem_limit_ -= jvm_max_heap_size;
    if (admit_mem_limit_ <= 0) {
      return Status(
          Substitute("Invalid combination of --mem_limit_includes_jvm and JVM max heap "
                     "size $0, which must be smaller than process memory limit $1",
              jvm_max_heap_size, bytes_limit));
    }
  }

  bool is_percent = false;
  int64_t codegen_cache_capacity =
      ParseUtil::ParseMemSpec(FLAGS_codegen_cache_capacity, &is_percent, 0);
  if (codegen_cache_capacity > 0) {
    // If codegen_cache_capacity is larger than 0, the number should not be a percentage.
    DCHECK(!is_percent);
    int64_t codegen_cache_limit = admit_mem_limit_ * MAX_CODEGEN_CACHE_MEM_PERCENT;
    DCHECK(codegen_cache_limit > 0);
    if (codegen_cache_capacity > codegen_cache_limit) {
      LOG(INFO) << "CodeGen Cache capacity changed from "
                << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES) << " to "
                << PrettyPrinter::Print(codegen_cache_limit, TUnit::BYTES)
                << " due to reaching the limit.";
      codegen_cache_capacity = codegen_cache_limit;
    }
    codegen_cache_.reset(new CodeGenCache(metrics_.get()));
    RETURN_IF_ERROR(codegen_cache_->Init(codegen_cache_capacity));
    LOG(INFO) << "CodeGen Cache initialized with capacity "
              << PrettyPrinter::Print(codegen_cache_capacity, TUnit::BYTES);
    // Preserve the memory for codegen cache.
    admit_mem_limit_ -= codegen_cache_capacity;
    DCHECK_GT(admit_mem_limit_, 0);
  } else {
    LOG(INFO) << "CodeGen Cache is disabled.";
  }

  // Initialize the tuple cache
  RETURN_IF_ERROR(tuple_cache_mgr_->Init());

  LOG(INFO) << "Admit memory limit: "
            << PrettyPrinter::Print(admit_mem_limit_, TUnit::BYTES);

  int64_t buffer_pool_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_limit,
      &is_percent, admit_mem_limit_);
  if (buffer_pool_limit <= 0) {
    return Status(Substitute("Invalid --buffer_pool_limit value, must be a "
                             "positive bytes value or percentage: $0",
        FLAGS_buffer_pool_limit));
  }
  buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, FLAGS_min_buffer_size);
  LOG(INFO) << "Buffer pool limit: "
            << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);

  int64_t clean_pages_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_clean_pages_limit,
      &is_percent, buffer_pool_limit);
  if (clean_pages_limit <= 0) {
    return Status(Substitute("Invalid --buffer_pool_clean_pages_limit value, must be a "
                             "positive bytes value or percentage: $0",
        FLAGS_buffer_pool_clean_pages_limit));
  }
  InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);

  admission_slots_ = CpuInfo::num_cores();
  if (FLAGS_admission_control_slots > 0) {
    if (FLAGS_max_concurrent_queries > 0) {
      LOG(WARNING) << "Ignored --max_concurrent_queries, --admission_control_slots was "
                   << "set and takes precedence.";
    }
    admission_slots_ = FLAGS_admission_control_slots;
  } else if (FLAGS_max_concurrent_queries > 0) {
    admission_slots_ = FLAGS_max_concurrent_queries;
  } else if (FLAGS_is_coordinator && !FLAGS_is_executor) {
    // By default we assume that dedicated coordinators can handle more queries than
    // executors.
    admission_slots_ *= COORDINATOR_CONCURRENCY_MULTIPLIER;
  }

  InitSystemStateInfo();

  if (enable_webserver_) {
    RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(webserver_.get()));
  }
  if (FLAGS_metrics_webserver_port > 0) {
    RETURN_IF_ERROR(metrics_->RegisterHttpHandlers(metrics_webserver_.get()));
    RETURN_IF_ERROR(metrics_webserver_->Start());
  }
  catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
  catalogd_lightweight_req_client_cache_->InitMetrics(
      metrics_.get(), "catalog.server", "for-lightweight-rpc");
  RETURN_IF_ERROR(RegisterMemoryMetrics(
      metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get()));
  // Initialize impalad metrics
  ImpaladMetrics::CreateMetrics(
      exec_env_->metrics()->GetOrCreateChildGroup("impala-server"));

  InitMemTracker(bytes_limit);

  // Initializes the RPCMgr, ControlServices and DataStreamServices.
  // Initialization needs to happen in the following order due to dependencies:
  // - RPC manager, DataStreamService and DataStreamManager.
  RETURN_IF_ERROR(rpc_mgr_->Init(krpc_address_));
  control_svc_.reset(new ControlService(rpc_metrics_));
  RETURN_IF_ERROR(control_svc_->Init());
  data_svc_.reset(new DataStreamService(rpc_metrics_));
  RETURN_IF_ERROR(data_svc_->Init());
  RETURN_IF_ERROR(stream_mgr_->Init(data_svc_->mem_tracker()));

  // Bump thread cache to 1GB to reduce contention for TCMalloc central
  // list's spinlock.
  if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
    FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
  }

#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
  const static char* TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES =
    "tcmalloc.max_total_thread_cache_bytes";
  // Change the total TCMalloc thread cache size if necessary.
  if (FLAGS_tcmalloc_max_total_thread_cache_bytes > 0 &&
      !MallocExtension::instance()->SetNumericProperty(
          TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES,
          FLAGS_tcmalloc_max_total_thread_cache_bytes)) {
    return Status(Substitute("Failed to change {0}",
        TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES));
  }
  // Read the value back from tcmalloc to verify it matches what we set.
  size_t actual_max_total_thread_cache_bytes = 0;
  bool retval = MallocExtension::instance()->GetNumericProperty(
    TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES,
    &actual_max_total_thread_cache_bytes);
  if (!retval) {
    return Status(Substitute("Could not retrieve value of {0}.",
        TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES));
  }
  if (actual_max_total_thread_cache_bytes !=
      FLAGS_tcmalloc_max_total_thread_cache_bytes) {
    LOG(WARNING) << "Set " << TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES << " to "
                 << FLAGS_tcmalloc_max_total_thread_cache_bytes << " bytes but actually "
                 << "using " << actual_max_total_thread_cache_bytes << " bytes.";
  }

  // A MemTracker for TCMalloc overhead which is the difference between the physical bytes
  // reserved (TcmallocMetric::PHYSICAL_BYTES_RESERVED) and the bytes in use
  // (TcmallocMetrics::BYTES_IN_USE). This overhead accounts for all the cached freelists
  // used by TCMalloc.
  IntGauge* negated_bytes_in_use = obj_pool_->Add(new NegatedGauge(
      MakeTMetricDef("negated_tcmalloc_bytes_in_use", TMetricKind::GAUGE, TUnit::BYTES),
      TcmallocMetric::BYTES_IN_USE));
  vector<IntGauge*> overhead_metrics;
  overhead_metrics.push_back(negated_bytes_in_use);
  overhead_metrics.push_back(TcmallocMetric::PHYSICAL_BYTES_RESERVED);
  SumGauge* tcmalloc_overhead = obj_pool_->Add(new SumGauge(
      MakeTMetricDef("tcmalloc_overhead", TMetricKind::GAUGE, TUnit::BYTES),
      overhead_metrics));
  obj_pool_->Add(
      new MemTracker(tcmalloc_overhead, -1, "TCMalloc Overhead", mem_tracker_.get()));
#endif
  mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process");

  RETURN_IF_ERROR(disk_io_mgr_->Init());

  // Start services in order to ensure that dependencies between them are met
  if (enable_webserver_) {
    AddDefaultUrlCallbacks(webserver_.get(), metrics_.get(), mem_tracker_.get());
    RETURN_IF_ERROR(webserver_->Start());
  } else {
    LOG(INFO) << "Not starting webserver";
  }

  RETURN_IF_ERROR(cluster_membership_mgr_->Init());
  if (FLAGS_is_coordinator && frontend_ != nullptr) {
    cluster_membership_mgr_->RegisterUpdateCallbackFn(
        [this](const ClusterMembershipMgr::SnapshotPtr& snapshot) {
          SendClusterMembershipToFrontend(snapshot,
              this->cluster_membership_mgr()->GetExpectedExecGroupSets(),
              this->frontend());
        });
  }

  RETURN_IF_ERROR(admission_controller_->Init());
  RETURN_IF_ERROR(InitHadoopConfig());

  // If 'ai_api_key_jceks_secret' is set then extract the api_key and populate
  // AIFunctions::ai_api_key_
  if (frontend_ != nullptr && FLAGS_ai_api_key_jceks_secret != "") {
    string api_key;
    RETURN_IF_ERROR(
        frontend_->GetSecretFromKeyStore(FLAGS_ai_api_key_jceks_secret, &api_key));
    AiFunctions::set_api_key(api_key);
  }
  // Validate default ai_endpoint.
  if (FLAGS_ai_endpoint != "" &&
      !AiFunctions::is_api_endpoint_supported(FLAGS_ai_endpoint)) {
    string supported_platforms = Substitute("$0, $1",
        AiFunctions::OPEN_AI_AZURE_ENDPOINT, AiFunctions::OPEN_AI_PUBLIC_ENDPOINT);
    if (!FLAGS_ai_additional_platforms.empty()) {
      supported_platforms += ", " + FLAGS_ai_additional_platforms;
    }
    return Status(Substitute("Unsupported --ai_endpoint=$0, supported platforms are: $1",
        FLAGS_ai_endpoint, supported_platforms));
  }

  jwt_helper_ = new JWTHelper();
  oauth_helper_ = new JWTHelper();

  return Status::OK();
}