def updateLlapConfigs()

in ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/YARN/service_advisor.py [0:0]


  def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name):
    """
    Entry point for updating Hive's 'LLAP app' configs namely :
      (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb
      (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size
      (6). hive.server2.tez.sessions.per.default.queue, (7). tez.am.resource.memory.mb (8). hive.tez.container.size
      (9). tez.runtime.io.sort.mb  (10). tez.runtime.unordered.output.buffer.size-mb (11). hive.llap.io.threadpool.size, and
      (12). hive.llap.io.enabled.

      The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following:
      (1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue'
      (4). Change in queue selection for config 'hive.llap.daemon.queue.name'.

      If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config
      value is not calulated, but read and use in calculation for dependent configs.

      Note: All memory calculations are in MB, unless specified otherwise.
    """
    self.logger.info("DBG: Entered updateLlapConfigs")

    # Determine if we entered here during cluster creation.
    operation = self.getUserOperationContext(services, "operation")
    is_cluster_create_opr = False
    if operation == self.CLUSTER_CREATE_OPERATION:
      is_cluster_create_opr = True
    self.logger.info(f"Is cluster create operation ? = {is_cluster_create_opr}")

    putHiveInteractiveSiteProperty = self.putProperty(
      configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services
    )
    putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(
      configurations, YARNRecommender.HIVE_INTERACTIVE_SITE
    )
    putHiveInteractiveEnvProperty = self.putProperty(
      configurations, "hive-interactive-env", services
    )
    putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(
      configurations, "hive-interactive-env"
    )
    putTezInteractiveSiteProperty = self.putProperty(
      configurations, "tez-interactive-site", services
    )
    putTezInteractiveSitePropertyAttribute = self.putPropertyAttribute(
      configurations, "tez-interactive-site"
    )
    llap_daemon_selected_queue_name = None
    selected_queue_is_ambari_managed_llap = (
      None  # Queue named 'llap' at root level is Ambari managed.
    )
    llap_selected_queue_am_percent = None
    DEFAULT_EXECUTOR_TO_AM_RATIO = 20
    MIN_EXECUTOR_TO_AM_RATIO = 10
    MAX_CONCURRENT_QUERIES = 32
    MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS = (
      4  # Concurrency for clusters with <10 executors
    )
    leafQueueNames = None
    MB_TO_BYTES = 1048576
    hsi_site = self.getServicesSiteProperties(
      services, YARNRecommender.HIVE_INTERACTIVE_SITE
    )
    yarn_site = self.getServicesSiteProperties(services, "yarn-site")
    min_memory_required = 0

    # Update 'hive.llap.daemon.queue.name' prop combo entries
    self.setLlapDaemonQueuePropAttributes(services, configurations)

    if not services["changed-configurations"]:
      read_llap_daemon_yarn_cont_mb = int(
        self.get_yarn_min_container_size(services, configurations)
      )
      putHiveInteractiveSiteProperty(
        "hive.llap.daemon.yarn.container.mb", read_llap_daemon_yarn_cont_mb
      )
      putHiveInteractiveSitePropertyAttribute(
        "hive.llap.daemon.yarn.container.mb", "minimum", read_llap_daemon_yarn_cont_mb
      )
      putHiveInteractiveSitePropertyAttribute(
        "hive.llap.daemon.yarn.container.mb",
        "maximum",
        self.__get_min_hsi_mem(services, hosts) * 0.8,
      )

    if hsi_site and "hive.llap.daemon.queue.name" in hsi_site:
      llap_daemon_selected_queue_name = hsi_site["hive.llap.daemon.queue.name"]

    # Update Visibility of 'num_llap_nodes' YARN Service. Visible only if selected queue is Ambari created 'llap'.
    capacity_scheduler_properties, received_as_key_value_pair = (
      self.getCapacitySchedulerProperties(services)
    )
    if capacity_scheduler_properties:
      # Get all leaf queues.
      leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
      self.logger.info(f"YARN leaf Queues = {leafQueueNames}")
      if len(leafQueueNames) == 0:
        self.logger.error("Queue(s) couldn't be retrieved from capacity-scheduler.")
        return

      # Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive).
      changed_configs_has_enable_hive_int = self.isConfigPropertiesChanged(
        services, "hive-interactive-env", ["enable_hive_interactive"], False
      )
      llap_named_queue_selected_in_curr_invocation = None
      # Check if its : 1. 1st invocation from UI ('enable_hive_interactive' in changed-configurations)
      # OR 2. 1st invocation from BP (services['changed-configurations'] should be empty in this case)
      if (
        changed_configs_has_enable_hive_int
        or 0 == len(services["changed-configurations"])
      ) and services["configurations"]["hive-interactive-env"]["properties"][
        "enable_hive_interactive"
      ]:
        if len(leafQueueNames) == 1 or (
          len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames
        ):
          llap_named_queue_selected_in_curr_invocation = True
          putHiveInteractiveSiteProperty("hive.llap.daemon.queue.name", llap_queue_name)
          putHiveInteractiveSiteProperty(
            "hive.server2.tez.default.queues", llap_queue_name
          )
        else:
          first_leaf_queue = list(leafQueueNames)[
            0
          ]  # 1st invocation, pick the 1st leaf queue and set it as selected.
          putHiveInteractiveSiteProperty(
            "hive.llap.daemon.queue.name", first_leaf_queue
          )
          putHiveInteractiveSiteProperty(
            "hive.server2.tez.default.queues", first_leaf_queue
          )
          llap_named_queue_selected_in_curr_invocation = False
      self.logger.info(
        f"DBG: llap_named_queue_selected_in_curr_invocation = {llap_named_queue_selected_in_curr_invocation}"
      )

      if (
        len(leafQueueNames) == 2
        and (
          llap_daemon_selected_queue_name
          and llap_daemon_selected_queue_name == llap_queue_name
        )
        or llap_named_queue_selected_in_curr_invocation
      ) or (
        len(leafQueueNames) == 1
        and llap_daemon_selected_queue_name == "default"
        and llap_named_queue_selected_in_curr_invocation
      ):
        self.logger.info(
          "DBG: Setting 'num_llap_nodes' config's  READ ONLY attribute as 'False'."
        )
        putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "read_only", "false")
        selected_queue_is_ambari_managed_llap = True
        self.logger.info(
          "DBG: Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
          "YARN Service visibility to 'True'".format(
            llap_queue_name, list(leafQueueNames)
          )
        )
      else:
        self.logger.info(
          "DBG: Setting 'num_llap_nodes' config's  READ ONLY attribute as 'True'."
        )
        putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "read_only", "true")
        self.logger.info(
          "Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
          "visibility to 'False'.".format(
            llap_daemon_selected_queue_name, list(leafQueueNames)
          )
        )
        selected_queue_is_ambari_managed_llap = False

      if (
        not llap_named_queue_selected_in_curr_invocation
      ):  # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have
        # state information pertaining to 'llap' queue.
        # Check: State of the selected queue should not be STOPPED.
        if llap_daemon_selected_queue_name:
          llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(
            capacity_scheduler_properties, llap_daemon_selected_queue_name
          )
          if (
            llap_selected_queue_state is None or llap_selected_queue_state == "STOPPED"
          ):
            self.logger.error(
              "Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default "
              "values.".format(
                llap_daemon_selected_queue_name, llap_selected_queue_state
              )
            )
            self.recommendDefaultLlapConfiguration(configurations, services, hosts)
            return
        else:
          self.logger.error(
            "Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values.".format(
              llap_daemon_selected_queue_name
            )
          )
          self.recommendDefaultLlapConfiguration(configurations, services, hosts)
          return
    else:
      self.logger.error(
        "Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive."
        " Not calculating LLAP configs."
      )
      return

    changed_configs_in_hive_int_env = None
    llap_concurrency_in_changed_configs = None
    llap_daemon_queue_in_changed_configs = None
    # Calculations are triggered only if there is change in any one of the following props :
    # 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue'
    # or 'hive.llap.daemon.queue.name' has change in value selection.
    # OR
    # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation)
    if "changed-configurations" in services.keys():
      config_names_to_be_checked = set(["num_llap_nodes", "enable_hive_interactive"])
      changed_configs_in_hive_int_env = self.isConfigPropertiesChanged(
        services, "hive-interactive-env", config_names_to_be_checked, False
      )

      # Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs.
      llap_concurrency_in_changed_configs = self.isConfigPropertiesChanged(
        services,
        YARNRecommender.HIVE_INTERACTIVE_SITE,
        ["hive.server2.tez.sessions.per.default.queue"],
        False,
      )
      llap_daemon_queue_in_changed_configs = self.isConfigPropertiesChanged(
        services,
        YARNRecommender.HIVE_INTERACTIVE_SITE,
        ["hive.llap.daemon.queue.name"],
        False,
      )

    if (
      not changed_configs_in_hive_int_env
      and not llap_concurrency_in_changed_configs
      and not llap_daemon_queue_in_changed_configs
      and services["changed-configurations"]
    ):
      self.logger.info("DBG: LLAP parameters not modified. Not adjusting LLAP configs.")
      self.logger.info(
        f"DBG: Current 'changed-configuration' received is : {services['changed-configurations']}"
      )
      return

    self.logger.info("\nDBG: Performing LLAP config calculations ......")
    node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
    node_manager_cnt = len(node_manager_host_list)
    yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
    total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
    self.logger.info(
      "DBG: Calculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, "
      "yarn_nm_mem_in_mb : {2}".format(
        total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb
      )
    )
    yarn_min_container_size = float(
      self.get_yarn_min_container_size(services, configurations)
    )
    tez_am_container_size = self.calculate_tez_am_container_size(
      services,
      int(total_cluster_capacity),
      is_cluster_create_opr,
      changed_configs_has_enable_hive_int,
    )
    normalized_tez_am_container_size = self._normalizeUp(
      tez_am_container_size, yarn_min_container_size
    )

    if yarn_site and "yarn.nodemanager.resource.cpu-vcores" in yarn_site:
      cpu_per_nm_host = float(yarn_site["yarn.nodemanager.resource.cpu-vcores"])
    else:
      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
      return
    self.logger.info(
      "DBG Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, "
      "total_cluster_capacity : {2}".format(
        normalized_tez_am_container_size, tez_am_container_size, total_cluster_capacity
      )
    )

    # Calculate the available memory for LLAP app
    yarn_nm_mem_in_mb_normalized = self._normalizeDown(
      yarn_nm_mem_in_mb, yarn_min_container_size
    )
    mem_per_thread_for_llap = float(
      self.calculate_mem_per_thread_for_llap(
        services,
        yarn_nm_mem_in_mb_normalized,
        cpu_per_nm_host,
        is_cluster_create_opr,
        changed_configs_has_enable_hive_int,
      )
    )
    self.logger.info(
      "DBG: Calculated mem_per_thread_for_llap : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, "
      "cpu_per_nm_host : {2}".format(
        mem_per_thread_for_llap, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host
      )
    )

    if mem_per_thread_for_llap is None:
      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
      return

    # Get calculated value for YARN Service AM container Size
    yarn_service_am_container_size = self._normalizeUp(
      self.calculate_yarn_service_am_size(yarn_min_container_size),
      yarn_min_container_size,
    )
    self.logger.info(
      "DBG: Calculated 'yarn_service_am_container_size' : {0}, using following: yarn_min_container_size : "
      "{1}".format(yarn_service_am_container_size, yarn_min_container_size)
    )

    min_memory_required = (
      normalized_tez_am_container_size
      + yarn_service_am_container_size
      + self._normalizeUp(mem_per_thread_for_llap, yarn_min_container_size)
    )
    self.logger.info(
      "DBG: Calculated 'min_memory_required': {0} using following : yarn_service_am_container_size: {1}, "
      "normalized_tez_am_container_size : {2}, mem_per_thread_for_llap : {3}, yarn_min_container_size : "
      "{4}".format(
        min_memory_required,
        yarn_service_am_container_size,
        normalized_tez_am_container_size,
        mem_per_thread_for_llap,
        yarn_min_container_size,
      )
    )

    min_nodes_required = int(ceil(min_memory_required / yarn_nm_mem_in_mb_normalized))
    self.logger.info(
      "DBG: Calculated 'min_node_required': {0}, using following : min_memory_required : {1}, yarn_nm_mem_in_mb_normalized "
      ": {2}".format(
        min_nodes_required, min_memory_required, yarn_nm_mem_in_mb_normalized
      )
    )
    if min_nodes_required > node_manager_cnt:
      self.logger.warning("ERROR: Not enough memory/nodes to run LLAP")
      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
      return

    mem_per_thread_for_llap = float(mem_per_thread_for_llap)

    self.logger.info(
      f"DBG: selected_queue_is_ambari_managed_llap = {selected_queue_is_ambari_managed_llap}"
    )
    if not selected_queue_is_ambari_managed_llap:
      llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(
        capacity_scheduler_properties,
        llap_daemon_selected_queue_name,
        total_cluster_capacity,
      )

      if llap_daemon_selected_queue_cap <= 0:
        self.logger.warning(
          "'{0}' queue capacity percentage retrieved = {1}. Expected > 0.".format(
            llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap
          )
        )
        self.recommendDefaultLlapConfiguration(configurations, services, hosts)
        return

      total_llap_mem_normalized = self._normalizeDown(
        llap_daemon_selected_queue_cap, yarn_min_container_size
      )
      self.logger.info(
        "DBG: Calculated '{0}' queue available capacity : {1}, using following: llap_daemon_selected_queue_cap : {2}, "
        "yarn_min_container_size : {3}".format(
          llap_daemon_selected_queue_name,
          total_llap_mem_normalized,
          llap_daemon_selected_queue_cap,
          yarn_min_container_size,
        )
      )
      """Rounding up numNodes so that we run more daemons, and utilitze more CPUs. The rest of the calcaulations will take care of cutting this down if required"""
      num_llap_nodes_requested = ceil(
        total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized
      )
      self.logger.info(
        "DBG: Calculated 'num_llap_nodes_requested' : {0}, using following: total_llap_mem_normalized : {1}, "
        "yarn_nm_mem_in_mb_normalized : {2}".format(
          num_llap_nodes_requested,
          total_llap_mem_normalized,
          yarn_nm_mem_in_mb_normalized,
        )
      )
      # Pouplate the 'num_llap_nodes_requested' in config 'num_llap_nodes', a read only config for non-Ambari managed queue case.
      putHiveInteractiveEnvProperty("num_llap_nodes", num_llap_nodes_requested)
      self.logger.info(
        f"Setting config 'num_llap_nodes' as : {num_llap_nodes_requested}"
      )
      queue_am_fraction_perc = float(
        self.__getQueueAmFractionFromCapacityScheduler(
          capacity_scheduler_properties, llap_daemon_selected_queue_name
        )
      )
      hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized
      self.logger.info(
        "DBG: Calculated 'hive_tez_am_cap_available' : {0}, using following: queue_am_fraction_perc : {1}, "
        "total_llap_mem_normalized : {2}".format(
          hive_tez_am_cap_available, queue_am_fraction_perc, total_llap_mem_normalized
        )
      )
    else:  # Ambari managed 'llap' named queue at root level.
      # Set 'num_llap_nodes_requested' for 1st invocation, as it gets passed as 1 otherwise, read from config.

      # Check if its : 1. 1st invocation from UI ('enable_hive_interactive' in changed-configurations)
      # OR 2. 1st invocation from BP (services['changed-configurations'] should be empty in this case)
      if (
        changed_configs_has_enable_hive_int
        or 0 == len(services["changed-configurations"])
      ) and services["configurations"]["hive-interactive-env"]["properties"][
        "enable_hive_interactive"
      ]:
        num_llap_nodes_requested = min_nodes_required
      else:
        num_llap_nodes_requested = self.get_num_llap_nodes(
          services, configurations
        )  # Input
      total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized
      self.logger.info(
        "DBG: Calculated 'total_llap_mem' : {0}, using following: num_llap_nodes_requested : {1}, "
        "yarn_nm_mem_in_mb_normalized : {2}".format(
          total_llap_mem, num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized
        )
      )
      total_llap_mem_normalized = float(
        self._normalizeDown(total_llap_mem, yarn_min_container_size)
      )
      self.logger.info(
        "DBG: Calculated 'total_llap_mem_normalized' : {0}, using following: total_llap_mem : {1}, "
        "yarn_min_container_size : {2}".format(
          total_llap_mem_normalized, total_llap_mem, yarn_min_container_size
        )
      )

      # What percent is 'total_llap_mem' of 'total_cluster_capacity' ?
      llap_named_queue_cap_fraction = ceil(
        total_llap_mem_normalized / total_cluster_capacity * 100
      )
      self.logger.info(
        f"DBG: Calculated '{llap_queue_name}' queue capacity percent = {llap_named_queue_cap_fraction}."
      )

      if llap_named_queue_cap_fraction > 100:
        self.logger.warning(
          f"Calculated '{llap_queue_name}' queue size = {llap_named_queue_cap_fraction}. Cannot be > 100."
        )
        self.recommendDefaultLlapConfiguration(configurations, services, hosts)
        return

      # Adjust capacity scheduler for the 'llap' named queue.
      self.checkAndManageLlapQueue(
        services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction
      )
      hive_tez_am_cap_available = total_llap_mem_normalized
      self.logger.info(f"DBG: hive_tez_am_cap_available : {hive_tez_am_cap_available}")

    # Common calculations now, irrespective of the queue selected.

    llap_mem_for_tezAm_and_daemons = (
      total_llap_mem_normalized - yarn_service_am_container_size
    )
    self.logger.info(
      "DBG: Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using following : total_llap_mem_normalized : {1}, "
      "yarn_service_am_container_size : {2}".format(
        llap_mem_for_tezAm_and_daemons,
        total_llap_mem_normalized,
        yarn_service_am_container_size,
      )
    )

    if llap_mem_for_tezAm_and_daemons < 2 * yarn_min_container_size:
      self.logger.warning("Not enough capacity available on the cluster to run LLAP")
      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
      return

    # Calculate llap concurrency (i.e. Number of Tez AM's)
    max_executors_per_node = self.get_max_executors_per_node(
      yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap
    )

    # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it.
    if not llap_concurrency_in_changed_configs:
      if max_executors_per_node <= 0:
        self.logger.warning(
          f"Calculated 'max_executors_per_node' = {max_executors_per_node}. Expected value >= 1."
        )
        self.recommendDefaultLlapConfiguration(configurations, services, hosts)
        return

      self.logger.info(
        "DBG: Calculated 'max_executors_per_node' : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
        "mem_per_thread_for_llap: {3}".format(
          max_executors_per_node,
          yarn_nm_mem_in_mb_normalized,
          cpu_per_nm_host,
          mem_per_thread_for_llap,
        )
      )

      # Default 1 AM for every 20 executor threads.
      # The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM,
      # making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is
      # instead limited by #CPUs. Use maxPerNode to factor this in.
      llap_concurreny_limit = min(
        floor(
          max_executors_per_node
          * num_llap_nodes_requested
          / DEFAULT_EXECUTOR_TO_AM_RATIO
        ),
        MAX_CONCURRENT_QUERIES,
      )
      self.logger.info(
        "DBG: Calculated 'llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested : {2}, DEFAULT_EXECUTOR_TO_AM_RATIO "
        ": {3}, MAX_CONCURRENT_QUERIES : {4}".format(
          llap_concurreny_limit,
          max_executors_per_node,
          num_llap_nodes_requested,
          DEFAULT_EXECUTOR_TO_AM_RATIO,
          MAX_CONCURRENT_QUERIES,
        )
      )
      llap_concurrency = min(
        llap_concurreny_limit,
        floor(
          llap_mem_for_tezAm_and_daemons
          / (
            DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap
            + normalized_tez_am_container_size
          )
        ),
      )
      self.logger.info(
        "DBG: Calculated 'llap_concurrency' : {0}, using following : llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
        "{2}, DEFAULT_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
        "{5}".format(
          llap_concurrency,
          llap_concurreny_limit,
          llap_mem_for_tezAm_and_daemons,
          DEFAULT_EXECUTOR_TO_AM_RATIO,
          mem_per_thread_for_llap,
          normalized_tez_am_container_size,
        )
      )
      if llap_concurrency == 0:
        llap_concurrency = 1
        self.logger.info(
          "DBG: Readjusted 'llap_concurrency' to : 1. Earlier calculated value : 0"
        )

      if (
        llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available
      ):
        llap_concurrency = int(
          math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
        )
        self.logger.info(
          "DBG: Readjusted 'llap_concurrency' to : {0}, as llap_concurrency({1}) * normalized_tez_am_container_size({2}) > hive_tez_am_cap_available({3}))".format(
            llap_concurrency,
            llap_concurrency,
            normalized_tez_am_container_size,
            hive_tez_am_cap_available,
          )
        )

        if llap_concurrency <= 0:
          self.logger.warning(
            f"DBG: Calculated 'LLAP Concurrent Queries' = {llap_concurrency}. Expected value >= 1."
          )
          self.recommendDefaultLlapConfiguration(configurations, services, hosts)
          return
        self.logger.info(
          "DBG: Adjusted 'llap_concurrency' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
          "{2}".format(
            llap_concurrency,
            hive_tez_am_cap_available,
            normalized_tez_am_container_size,
          )
        )
    else:
      # Read current value
      if "hive.server2.tez.sessions.per.default.queue" in hsi_site:
        llap_concurrency = int(hsi_site["hive.server2.tez.sessions.per.default.queue"])
        if llap_concurrency <= 0:
          self.logger.warning(
            f"'hive.server2.tez.sessions.per.default.queue' current value : {llap_concurrency}. Expected value : >= 1"
          )
          self.recommendDefaultLlapConfiguration(configurations, services, hosts)
          return
        self.logger.info(f"DBG: Read 'llap_concurrency' : {llap_concurrency}")
      else:
        llap_concurrency = 1
        self.logger.warning(
          "Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config. Setting default value 1."
        )
        self.recommendDefaultLlapConfiguration(configurations, services, hosts)
        return

    # Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated.
    max_llap_concurreny_limit = min(
      floor(
        max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO
      ),
      MAX_CONCURRENT_QUERIES,
    )
    self.logger.info(
      "DBG: Calculated 'max_llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested "
      ": {2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, MAX_CONCURRENT_QUERIES : {4}".format(
        max_llap_concurreny_limit,
        max_executors_per_node,
        num_llap_nodes_requested,
        MIN_EXECUTOR_TO_AM_RATIO,
        MAX_CONCURRENT_QUERIES,
      )
    )

    # Calculate value for 'num_llap_nodes', an across cluster config.
    tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size
    self.logger.info(
      "DBG: Calculated 'tez_am_memory_required' : {0}, using following : llap_concurrency : {1}, normalized_tez_am_container_size : "
      "{2}".format(
        tez_am_memory_required, llap_concurrency, normalized_tez_am_container_size
      )
    )
    llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required

    if llap_mem_daemon_size < yarn_min_container_size:
      self.logger.warning(
        "Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container Size' ({1})'".format(
          llap_mem_daemon_size, yarn_min_container_size
        )
      )
      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
      return

    if (
      llap_mem_daemon_size < mem_per_thread_for_llap
      or llap_mem_daemon_size < yarn_min_container_size
    ):
      self.logger.warning("Not enough memory available for executors.")
      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
      return
    self.logger.info(
      "DBG: Calculated 'llap_mem_daemon_size' : {0}, using following : llap_mem_for_tezAm_and_daemons : {1}, tez_am_memory_required : "
      "{2}".format(
        llap_mem_daemon_size, llap_mem_for_tezAm_and_daemons, tez_am_memory_required
      )
    )

    llap_daemon_mem_per_node = self._normalizeDown(
      llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size
    )
    # This value takes into account total cluster capacity, and may not have left enough capcaity on each node to launch an AM.
    self.logger.info(
      "DBG: Calculated 'llap_daemon_mem_per_node' : {0}, using following : llap_mem_daemon_size : {1}, num_llap_nodes_requested : {2}, "
      "yarn_min_container_size: {3}".format(
        llap_daemon_mem_per_node,
        llap_mem_daemon_size,
        num_llap_nodes_requested,
        yarn_min_container_size,
      )
    )
    if llap_daemon_mem_per_node == 0:
      # Small cluster. No capacity left on a node after running AMs.
      llap_daemon_mem_per_node = self._normalizeUp(
        mem_per_thread_for_llap, yarn_min_container_size
      )
      num_llap_nodes = floor(llap_mem_daemon_size / llap_daemon_mem_per_node)
      self.logger.info(
        "DBG: 'llap_daemon_mem_per_node' : 0, adjusted 'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: llap_mem_daemon_size : {2}, "
        "mem_per_thread_for_llap : {3}".format(
          llap_daemon_mem_per_node,
          num_llap_nodes,
          llap_mem_daemon_size,
          mem_per_thread_for_llap,
        )
      )
    elif llap_daemon_mem_per_node < mem_per_thread_for_llap:
      # Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node)
      llap_daemon_mem_per_node = mem_per_thread_for_llap
      num_llap_nodes = floor(llap_mem_daemon_size / mem_per_thread_for_llap)
      self.logger.info(
        "DBG: 'llap_daemon_mem_per_node'({0}) < mem_per_thread_for_llap({1}), adjusted 'llap_daemon_mem_per_node' "
        ": {2}".format(
          llap_daemon_mem_per_node, mem_per_thread_for_llap, llap_daemon_mem_per_node
        )
      )
    else:
      # All good. We have a proper value for memoryPerNode.
      num_llap_nodes = num_llap_nodes_requested
      self.logger.info(f"DBG: num_llap_nodes : {num_llap_nodes}")

    # Make sure we have enough memory on each node to run AMs.
    # If nodes vs nodes_requested is different - AM memory is already factored in.
    # If llap_node_count < total_cluster_nodes - assuming AMs can run on a different node.
    # Else factor in min_concurrency_per_node * tez_am_size, and yarn_service_am_container_size
    # Also needs to factor in whether num_llap_nodes = cluster_node_count
    min_mem_reserved_per_node = 0
    if (
      num_llap_nodes == num_llap_nodes_requested and num_llap_nodes == node_manager_cnt
    ):
      min_mem_reserved_per_node = max(
        normalized_tez_am_container_size, yarn_service_am_container_size
      )
      tez_AMs_per_node = llap_concurrency / num_llap_nodes
      tez_AMs_per_node_low = int(math.floor(tez_AMs_per_node))
      tez_AMs_per_node_high = int(math.ceil(tez_AMs_per_node))
      min_mem_reserved_per_node = int(
        max(
          tez_AMs_per_node_high * normalized_tez_am_container_size,
          tez_AMs_per_node_low * normalized_tez_am_container_size
          + yarn_service_am_container_size,
        )
      )
      self.logger.info(
        "DBG: Determined 'AM reservation per node': {0}, using following : concurrency: {1}, num_llap_nodes: {2}, AMsPerNode: {3}".format(
          min_mem_reserved_per_node, llap_concurrency, num_llap_nodes, tez_AMs_per_node
        )
      )

    max_single_node_mem_available_for_daemon = self._normalizeDown(
      yarn_nm_mem_in_mb_normalized - min_mem_reserved_per_node, yarn_min_container_size
    )
    if (
      max_single_node_mem_available_for_daemon <= 0
      or max_single_node_mem_available_for_daemon < mem_per_thread_for_llap
    ):
      self.logger.warning(
        "Not enough capacity available per node for daemons after factoring in AM memory requirements. NM Mem: {0}, "
        "minAMMemPerNode: {1}, available: {2}".format(
          yarn_nm_mem_in_mb_normalized,
          min_mem_reserved_per_node,
          max_single_node_mem_available_for_daemon,
        )
      )
      self.recommendDefaultLlapConfiguration(configurations, services, hosts)

    llap_daemon_mem_per_node = min(
      max_single_node_mem_available_for_daemon, llap_daemon_mem_per_node
    )
    self.logger.info(
      "DBG: Determined final memPerDaemon: {0}, using following: concurrency: {1}, numNMNodes: {2}, numLlapNodes: {3} ".format(
        llap_daemon_mem_per_node, llap_concurrency, node_manager_cnt, num_llap_nodes
      )
    )

    num_executors_per_node_max = self.get_max_executors_per_node(
      yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap
    )
    if num_executors_per_node_max < 1:
      self.logger.warning(
        f"Calculated 'Max. Executors per Node' = {num_executors_per_node_max}. Expected values >= 1."
      )
      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
      return
    self.logger.info(
      "DBG: Calculated 'num_executors_per_node_max' : {0}, using following : yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
      "mem_per_thread_for_llap: {3}".format(
        num_executors_per_node_max,
        yarn_nm_mem_in_mb_normalized,
        cpu_per_nm_host,
        mem_per_thread_for_llap,
      )
    )

    # NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem.
    num_executors_per_node = min(
      floor(llap_daemon_mem_per_node / mem_per_thread_for_llap),
      num_executors_per_node_max,
    )
    if num_executors_per_node <= 0:
      self.logger.warning(
        f"Calculated 'Number of Executors Per Node' = {num_executors_per_node}. Expected value >= 1"
      )
      self.recommendDefaultLlapConfiguration(configurations, services, hosts)
      return
    self.logger.info(
      "DBG: Calculated 'num_executors_per_node' : {0}, using following : llap_daemon_mem_per_node : {1}, num_executors_per_node_max : {2}, "
      "mem_per_thread_for_llap: {3}".format(
        num_executors_per_node,
        llap_daemon_mem_per_node,
        num_executors_per_node_max,
        mem_per_thread_for_llap,
      )
    )

    # Now figure out how much of the memory will be used by the executors, and how much will be used by the cache.
    total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap
    cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node
    self.logger.info(
      "DBG: Calculated 'Cache per node' : {0}, using following : llap_daemon_mem_per_node : {1}, total_mem_for_executors_per_node : {2}".format(
        cache_mem_per_node, llap_daemon_mem_per_node, total_mem_for_executors_per_node
      )
    )

    tez_runtime_io_sort_mb = int((0.8 * mem_per_thread_for_llap) / 3)
    tez_runtime_unordered_output_buffer_size = int(
      0.8 * 0.075 * mem_per_thread_for_llap
    )
    # 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576.
    hive_auto_convert_join_noconditionaltask_size = (
      int((0.8 * mem_per_thread_for_llap) / 3)
    ) * MB_TO_BYTES

    # Calculate value for prop 'llap_heap_size'
    llap_xmx = max(
      total_mem_for_executors_per_node * 0.8,
      total_mem_for_executors_per_node
      - self.get_llap_headroom_space(services, configurations),
    )
    self.logger.info(
      f"DBG: Calculated llap_app_heap_size : {llap_xmx}, using following : total_mem_for_executors : {total_mem_for_executors_per_node}"
    )

    # Calculate 'hive_heapsize' for Hive2/HiveServer2 (HSI)
    hive_server_interactive_heapsize = None
    hive_server_interactive_hosts = self.getHostsWithComponent(
      "HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts
    )
    if hive_server_interactive_hosts is None:
      # If its None, read the base service YARN's NODEMANAGER node memory, as are host are considered homogenous.
      hive_server_interactive_hosts = self.getHostsWithComponent(
        "YARN", "NODEMANAGER", services, hosts
      )
    if (
      hive_server_interactive_hosts is not None
      and len(hive_server_interactive_hosts) > 0
    ):
      host_mem = int(hive_server_interactive_hosts[0]["Hosts"]["total_mem"])
      hive_server_interactive_heapsize = min(
        max(2048.0, 400.0 * llap_concurrency), 3.0 / 8 * host_mem
      )
      self.logger.info(
        "DBG: Calculated 'hive_server_interactive_heapsize' : {0}, using following : llap_concurrency : {1}, host_mem : "
        "{2}".format(hive_server_interactive_heapsize, llap_concurrency, host_mem)
      )

    # Done with calculations, updating calculated configs.
    self.logger.info("DBG: Applying the calculated values....")

    if is_cluster_create_opr or changed_configs_has_enable_hive_int:
      normalized_tez_am_container_size = int(normalized_tez_am_container_size)
      putTezInteractiveSiteProperty(
        "tez.am.resource.memory.mb", normalized_tez_am_container_size
      )
      self.logger.info(
        f"DBG: Setting 'tez.am.resource.memory.mb' config value as : {normalized_tez_am_container_size}"
      )

    if not llap_concurrency_in_changed_configs:
      putHiveInteractiveSiteProperty(
        "hive.server2.tez.sessions.per.default.queue",
        max(int(num_executors_per_node / 16), 1),
      )
    putHiveInteractiveSitePropertyAttribute(
      "hive.server2.tez.sessions.per.default.queue",
      "maximum",
      max(int(num_executors_per_node / 4), 1),
    )

    num_llap_nodes = int(num_llap_nodes)
    putHiveInteractiveEnvPropertyAttribute(
      "num_llap_nodes", "minimum", min_nodes_required
    )
    putHiveInteractiveEnvPropertyAttribute(
      "num_llap_nodes", "maximum", node_manager_cnt
    )
    # TODO A single value is not being set for numNodes in case of a custom queue. Also the attribute is set to non-visible, so the UI likely ends up using an old cached value
    if num_llap_nodes != num_llap_nodes_requested:
      self.logger.info(
        f"DBG: User requested num_llap_nodes : {num_llap_nodes_requested}, but used/adjusted value for calculations is : {num_llap_nodes}"
      )
    else:
      self.logger.info(
        f"DBG: Used num_llap_nodes for calculations : {num_llap_nodes_requested}"
      )

    # Safeguard for not adding "num_llap_nodes_for_llap_daemons" if it doesnt exist in hive-interactive-site.
    # This can happen if we upgrade from Ambari 2.4 (with HDP 2.5) to Ambari 2.5, as this config is from 2.6 stack onwards only.
    if (
      "hive-interactive-env" in services["configurations"]
      and "num_llap_nodes_for_llap_daemons"
      in services["configurations"]["hive-interactive-env"]["properties"]
    ):
      putHiveInteractiveEnvProperty("num_llap_nodes_for_llap_daemons", num_llap_nodes)
      self.logger.info(
        f"DBG: Setting config 'num_llap_nodes_for_llap_daemons' as : {num_llap_nodes}"
      )

    llap_container_size = int(llap_daemon_mem_per_node)
    putHiveInteractiveSiteProperty(
      "hive.llap.daemon.yarn.container.mb", llap_container_size
    )
    putHiveInteractiveSitePropertyAttribute(
      "hive.llap.daemon.yarn.container.mb", "minimum", yarn_min_container_size
    )
    putHiveInteractiveSitePropertyAttribute(
      "hive.llap.daemon.yarn.container.mb",
      "maximum",
      self.__get_min_hsi_mem(services, hosts) * 0.8,
    )

    # Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization.
    # Else, we don't (1). Override the previous calculated value or (2). User provided value.
    if is_cluster_create_opr or changed_configs_has_enable_hive_int:
      mem_per_thread_for_llap = int(mem_per_thread_for_llap)
      putHiveInteractiveSiteProperty("hive.tez.container.size", mem_per_thread_for_llap)
      self.logger.info(
        f"DBG: Setting 'hive.tez.container.size' config value as : {mem_per_thread_for_llap}"
      )

    putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", tez_runtime_io_sort_mb)
    if (
      "tez-site" in services["configurations"]
      and "tez.runtime.sorter.class"
      in services["configurations"]["tez-site"]["properties"]
    ):
      if (
        services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"]
        == "LEGACY"
      ):
        putTezInteractiveSitePropertyAttribute(
          "tez.runtime.io.sort.mb", "maximum", 1800
        )

    putTezInteractiveSiteProperty(
      "tez.runtime.unordered.output.buffer.size-mb",
      tez_runtime_unordered_output_buffer_size,
    )
    putHiveInteractiveSiteProperty(
      "hive.auto.convert.join.noconditionaltask.size",
      hive_auto_convert_join_noconditionaltask_size,
    )

    num_executors_per_node = int(num_executors_per_node)
    self.logger.info(f"DBG: Putting num_executors_per_node as {num_executors_per_node}")
    putHiveInteractiveSiteProperty(
      "hive.llap.daemon.num.executors", num_executors_per_node
    )
    putHiveInteractiveSitePropertyAttribute(
      "hive.llap.daemon.num.executors", "minimum", 1
    )
    putHiveInteractiveSitePropertyAttribute(
      "hive.llap.daemon.num.executors", "maximum", int(num_executors_per_node_max)
    )

    # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for
    # 'hive.llap.daemon.num.executors' at all times.
    cache_mem_per_node = int(cache_mem_per_node)

    putHiveInteractiveSiteProperty(
      "hive.llap.io.threadpool.size", num_executors_per_node
    )
    putHiveInteractiveSiteProperty("hive.llap.io.memory.size", cache_mem_per_node)
    putHiveInteractiveSitePropertyAttribute("hive.llap.io.memory.size", "minimum", 0)
    putHiveInteractiveSitePropertyAttribute(
      "hive.llap.io.memory.size",
      "maximum",
      self.__get_min_hsi_mem(services, hosts) * 0.8,
    )

    if hive_server_interactive_heapsize is not None:
      putHiveInteractiveEnvProperty(
        "hive_heapsize", int(hive_server_interactive_heapsize)
      )

    ssd_cache_on = (
      services["configurations"]["hive-interactive-site"]["properties"][
        "hive.llap.io.allocator.mmap"
      ]
      == "true"
    )
    llap_io_enabled = (
      "true" if int(cache_mem_per_node) >= 1024 or ssd_cache_on else "false"
    )
    services["forced-configurations"].append(
      {"type": "hive-interactive-site", "name": "hive.llap.io.enabled"}
    )
    putHiveInteractiveSiteProperty("hive.llap.io.enabled", llap_io_enabled)

    putHiveInteractiveEnvProperty("llap_heap_size", int(llap_xmx))
    self.logger.info("DBG: Done putting all configs")