in ambari-server/src/main/resources/stacks/BIGTOP/3.2.0/services/YARN/service_advisor.py [0:0]
def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name):
"""
Entry point for updating Hive's 'LLAP app' configs namely :
(1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb
(3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size
(6). hive.server2.tez.sessions.per.default.queue, (7). tez.am.resource.memory.mb (8). hive.tez.container.size
(9). tez.runtime.io.sort.mb (10). tez.runtime.unordered.output.buffer.size-mb (11). hive.llap.io.threadpool.size, and
(12). hive.llap.io.enabled.
The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following:
(1). 'enable_hive_interactive' set to 'true' (2). 'num_llap_nodes' (3). 'hive.server2.tez.sessions.per.default.queue'
(4). Change in queue selection for config 'hive.llap.daemon.queue.name'.
If change in value for 'num_llap_nodes' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config
value is not calulated, but read and use in calculation for dependent configs.
Note: All memory calculations are in MB, unless specified otherwise.
"""
self.logger.info("DBG: Entered updateLlapConfigs")
# Determine if we entered here during cluster creation.
operation = self.getUserOperationContext(services, "operation")
is_cluster_create_opr = False
if operation == self.CLUSTER_CREATE_OPERATION:
is_cluster_create_opr = True
self.logger.info(f"Is cluster create operation ? = {is_cluster_create_opr}")
putHiveInteractiveSiteProperty = self.putProperty(
configurations, YARNRecommender.HIVE_INTERACTIVE_SITE, services
)
putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(
configurations, YARNRecommender.HIVE_INTERACTIVE_SITE
)
putHiveInteractiveEnvProperty = self.putProperty(
configurations, "hive-interactive-env", services
)
putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(
configurations, "hive-interactive-env"
)
putTezInteractiveSiteProperty = self.putProperty(
configurations, "tez-interactive-site", services
)
putTezInteractiveSitePropertyAttribute = self.putPropertyAttribute(
configurations, "tez-interactive-site"
)
llap_daemon_selected_queue_name = None
selected_queue_is_ambari_managed_llap = (
None # Queue named 'llap' at root level is Ambari managed.
)
llap_selected_queue_am_percent = None
DEFAULT_EXECUTOR_TO_AM_RATIO = 20
MIN_EXECUTOR_TO_AM_RATIO = 10
MAX_CONCURRENT_QUERIES = 32
MAX_CONCURRENT_QUERIES_SMALL_CLUSTERS = (
4 # Concurrency for clusters with <10 executors
)
leafQueueNames = None
MB_TO_BYTES = 1048576
hsi_site = self.getServicesSiteProperties(
services, YARNRecommender.HIVE_INTERACTIVE_SITE
)
yarn_site = self.getServicesSiteProperties(services, "yarn-site")
min_memory_required = 0
# Update 'hive.llap.daemon.queue.name' prop combo entries
self.setLlapDaemonQueuePropAttributes(services, configurations)
if not services["changed-configurations"]:
read_llap_daemon_yarn_cont_mb = int(
self.get_yarn_min_container_size(services, configurations)
)
putHiveInteractiveSiteProperty(
"hive.llap.daemon.yarn.container.mb", read_llap_daemon_yarn_cont_mb
)
putHiveInteractiveSitePropertyAttribute(
"hive.llap.daemon.yarn.container.mb", "minimum", read_llap_daemon_yarn_cont_mb
)
putHiveInteractiveSitePropertyAttribute(
"hive.llap.daemon.yarn.container.mb",
"maximum",
self.__get_min_hsi_mem(services, hosts) * 0.8,
)
if hsi_site and "hive.llap.daemon.queue.name" in hsi_site:
llap_daemon_selected_queue_name = hsi_site["hive.llap.daemon.queue.name"]
# Update Visibility of 'num_llap_nodes' YARN Service. Visible only if selected queue is Ambari created 'llap'.
capacity_scheduler_properties, received_as_key_value_pair = (
self.getCapacitySchedulerProperties(services)
)
if capacity_scheduler_properties:
# Get all leaf queues.
leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties)
self.logger.info(f"YARN leaf Queues = {leafQueueNames}")
if len(leafQueueNames) == 0:
self.logger.error("Queue(s) couldn't be retrieved from capacity-scheduler.")
return
# Check if it's 1st invocation after enabling Hive Server Interactive (config: enable_hive_interactive).
changed_configs_has_enable_hive_int = self.isConfigPropertiesChanged(
services, "hive-interactive-env", ["enable_hive_interactive"], False
)
llap_named_queue_selected_in_curr_invocation = None
# Check if its : 1. 1st invocation from UI ('enable_hive_interactive' in changed-configurations)
# OR 2. 1st invocation from BP (services['changed-configurations'] should be empty in this case)
if (
changed_configs_has_enable_hive_int
or 0 == len(services["changed-configurations"])
) and services["configurations"]["hive-interactive-env"]["properties"][
"enable_hive_interactive"
]:
if len(leafQueueNames) == 1 or (
len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames
):
llap_named_queue_selected_in_curr_invocation = True
putHiveInteractiveSiteProperty("hive.llap.daemon.queue.name", llap_queue_name)
putHiveInteractiveSiteProperty(
"hive.server2.tez.default.queues", llap_queue_name
)
else:
first_leaf_queue = list(leafQueueNames)[
0
] # 1st invocation, pick the 1st leaf queue and set it as selected.
putHiveInteractiveSiteProperty(
"hive.llap.daemon.queue.name", first_leaf_queue
)
putHiveInteractiveSiteProperty(
"hive.server2.tez.default.queues", first_leaf_queue
)
llap_named_queue_selected_in_curr_invocation = False
self.logger.info(
f"DBG: llap_named_queue_selected_in_curr_invocation = {llap_named_queue_selected_in_curr_invocation}"
)
if (
len(leafQueueNames) == 2
and (
llap_daemon_selected_queue_name
and llap_daemon_selected_queue_name == llap_queue_name
)
or llap_named_queue_selected_in_curr_invocation
) or (
len(leafQueueNames) == 1
and llap_daemon_selected_queue_name == "default"
and llap_named_queue_selected_in_curr_invocation
):
self.logger.info(
"DBG: Setting 'num_llap_nodes' config's READ ONLY attribute as 'False'."
)
putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "read_only", "false")
selected_queue_is_ambari_managed_llap = True
self.logger.info(
"DBG: Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
"YARN Service visibility to 'True'".format(
llap_queue_name, list(leafQueueNames)
)
)
else:
self.logger.info(
"DBG: Setting 'num_llap_nodes' config's READ ONLY attribute as 'True'."
)
putHiveInteractiveEnvPropertyAttribute("num_llap_nodes", "read_only", "true")
self.logger.info(
"Selected YARN queue for LLAP is : '{0}'. Current YARN queues : {1}. Setting 'Number of LLAP nodes' "
"visibility to 'False'.".format(
llap_daemon_selected_queue_name, list(leafQueueNames)
)
)
selected_queue_is_ambari_managed_llap = False
if (
not llap_named_queue_selected_in_curr_invocation
): # We would be creating the 'llap' queue later. Thus, cap-sched doesn't have
# state information pertaining to 'llap' queue.
# Check: State of the selected queue should not be STOPPED.
if llap_daemon_selected_queue_name:
llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(
capacity_scheduler_properties, llap_daemon_selected_queue_name
)
if (
llap_selected_queue_state is None or llap_selected_queue_state == "STOPPED"
):
self.logger.error(
"Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default "
"values.".format(
llap_daemon_selected_queue_name, llap_selected_queue_state
)
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
else:
self.logger.error(
"Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values.".format(
llap_daemon_selected_queue_name
)
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
else:
self.logger.error(
"Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive."
" Not calculating LLAP configs."
)
return
changed_configs_in_hive_int_env = None
llap_concurrency_in_changed_configs = None
llap_daemon_queue_in_changed_configs = None
# Calculations are triggered only if there is change in any one of the following props :
# 'num_llap_nodes', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue'
# or 'hive.llap.daemon.queue.name' has change in value selection.
# OR
# services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation)
if "changed-configurations" in services.keys():
config_names_to_be_checked = set(["num_llap_nodes", "enable_hive_interactive"])
changed_configs_in_hive_int_env = self.isConfigPropertiesChanged(
services, "hive-interactive-env", config_names_to_be_checked, False
)
# Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs.
llap_concurrency_in_changed_configs = self.isConfigPropertiesChanged(
services,
YARNRecommender.HIVE_INTERACTIVE_SITE,
["hive.server2.tez.sessions.per.default.queue"],
False,
)
llap_daemon_queue_in_changed_configs = self.isConfigPropertiesChanged(
services,
YARNRecommender.HIVE_INTERACTIVE_SITE,
["hive.llap.daemon.queue.name"],
False,
)
if (
not changed_configs_in_hive_int_env
and not llap_concurrency_in_changed_configs
and not llap_daemon_queue_in_changed_configs
and services["changed-configurations"]
):
self.logger.info("DBG: LLAP parameters not modified. Not adjusting LLAP configs.")
self.logger.info(
f"DBG: Current 'changed-configuration' received is : {services['changed-configurations']}"
)
return
self.logger.info("\nDBG: Performing LLAP config calculations ......")
node_manager_host_list = self.getHostsForComponent(services, "YARN", "NODEMANAGER")
node_manager_cnt = len(node_manager_host_list)
yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations)
total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb
self.logger.info(
"DBG: Calculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, "
"yarn_nm_mem_in_mb : {2}".format(
total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb
)
)
yarn_min_container_size = float(
self.get_yarn_min_container_size(services, configurations)
)
tez_am_container_size = self.calculate_tez_am_container_size(
services,
int(total_cluster_capacity),
is_cluster_create_opr,
changed_configs_has_enable_hive_int,
)
normalized_tez_am_container_size = self._normalizeUp(
tez_am_container_size, yarn_min_container_size
)
if yarn_site and "yarn.nodemanager.resource.cpu-vcores" in yarn_site:
cpu_per_nm_host = float(yarn_site["yarn.nodemanager.resource.cpu-vcores"])
else:
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
self.logger.info(
"DBG Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, "
"total_cluster_capacity : {2}".format(
normalized_tez_am_container_size, tez_am_container_size, total_cluster_capacity
)
)
# Calculate the available memory for LLAP app
yarn_nm_mem_in_mb_normalized = self._normalizeDown(
yarn_nm_mem_in_mb, yarn_min_container_size
)
mem_per_thread_for_llap = float(
self.calculate_mem_per_thread_for_llap(
services,
yarn_nm_mem_in_mb_normalized,
cpu_per_nm_host,
is_cluster_create_opr,
changed_configs_has_enable_hive_int,
)
)
self.logger.info(
"DBG: Calculated mem_per_thread_for_llap : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, "
"cpu_per_nm_host : {2}".format(
mem_per_thread_for_llap, yarn_nm_mem_in_mb_normalized, cpu_per_nm_host
)
)
if mem_per_thread_for_llap is None:
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
# Get calculated value for YARN Service AM container Size
yarn_service_am_container_size = self._normalizeUp(
self.calculate_yarn_service_am_size(yarn_min_container_size),
yarn_min_container_size,
)
self.logger.info(
"DBG: Calculated 'yarn_service_am_container_size' : {0}, using following: yarn_min_container_size : "
"{1}".format(yarn_service_am_container_size, yarn_min_container_size)
)
min_memory_required = (
normalized_tez_am_container_size
+ yarn_service_am_container_size
+ self._normalizeUp(mem_per_thread_for_llap, yarn_min_container_size)
)
self.logger.info(
"DBG: Calculated 'min_memory_required': {0} using following : yarn_service_am_container_size: {1}, "
"normalized_tez_am_container_size : {2}, mem_per_thread_for_llap : {3}, yarn_min_container_size : "
"{4}".format(
min_memory_required,
yarn_service_am_container_size,
normalized_tez_am_container_size,
mem_per_thread_for_llap,
yarn_min_container_size,
)
)
min_nodes_required = int(ceil(min_memory_required / yarn_nm_mem_in_mb_normalized))
self.logger.info(
"DBG: Calculated 'min_node_required': {0}, using following : min_memory_required : {1}, yarn_nm_mem_in_mb_normalized "
": {2}".format(
min_nodes_required, min_memory_required, yarn_nm_mem_in_mb_normalized
)
)
if min_nodes_required > node_manager_cnt:
self.logger.warning("ERROR: Not enough memory/nodes to run LLAP")
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
mem_per_thread_for_llap = float(mem_per_thread_for_llap)
self.logger.info(
f"DBG: selected_queue_is_ambari_managed_llap = {selected_queue_is_ambari_managed_llap}"
)
if not selected_queue_is_ambari_managed_llap:
llap_daemon_selected_queue_cap = self.__getSelectedQueueTotalCap(
capacity_scheduler_properties,
llap_daemon_selected_queue_name,
total_cluster_capacity,
)
if llap_daemon_selected_queue_cap <= 0:
self.logger.warning(
"'{0}' queue capacity percentage retrieved = {1}. Expected > 0.".format(
llap_daemon_selected_queue_name, llap_daemon_selected_queue_cap
)
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
total_llap_mem_normalized = self._normalizeDown(
llap_daemon_selected_queue_cap, yarn_min_container_size
)
self.logger.info(
"DBG: Calculated '{0}' queue available capacity : {1}, using following: llap_daemon_selected_queue_cap : {2}, "
"yarn_min_container_size : {3}".format(
llap_daemon_selected_queue_name,
total_llap_mem_normalized,
llap_daemon_selected_queue_cap,
yarn_min_container_size,
)
)
"""Rounding up numNodes so that we run more daemons, and utilitze more CPUs. The rest of the calcaulations will take care of cutting this down if required"""
num_llap_nodes_requested = ceil(
total_llap_mem_normalized / yarn_nm_mem_in_mb_normalized
)
self.logger.info(
"DBG: Calculated 'num_llap_nodes_requested' : {0}, using following: total_llap_mem_normalized : {1}, "
"yarn_nm_mem_in_mb_normalized : {2}".format(
num_llap_nodes_requested,
total_llap_mem_normalized,
yarn_nm_mem_in_mb_normalized,
)
)
# Pouplate the 'num_llap_nodes_requested' in config 'num_llap_nodes', a read only config for non-Ambari managed queue case.
putHiveInteractiveEnvProperty("num_llap_nodes", num_llap_nodes_requested)
self.logger.info(
f"Setting config 'num_llap_nodes' as : {num_llap_nodes_requested}"
)
queue_am_fraction_perc = float(
self.__getQueueAmFractionFromCapacityScheduler(
capacity_scheduler_properties, llap_daemon_selected_queue_name
)
)
hive_tez_am_cap_available = queue_am_fraction_perc * total_llap_mem_normalized
self.logger.info(
"DBG: Calculated 'hive_tez_am_cap_available' : {0}, using following: queue_am_fraction_perc : {1}, "
"total_llap_mem_normalized : {2}".format(
hive_tez_am_cap_available, queue_am_fraction_perc, total_llap_mem_normalized
)
)
else: # Ambari managed 'llap' named queue at root level.
# Set 'num_llap_nodes_requested' for 1st invocation, as it gets passed as 1 otherwise, read from config.
# Check if its : 1. 1st invocation from UI ('enable_hive_interactive' in changed-configurations)
# OR 2. 1st invocation from BP (services['changed-configurations'] should be empty in this case)
if (
changed_configs_has_enable_hive_int
or 0 == len(services["changed-configurations"])
) and services["configurations"]["hive-interactive-env"]["properties"][
"enable_hive_interactive"
]:
num_llap_nodes_requested = min_nodes_required
else:
num_llap_nodes_requested = self.get_num_llap_nodes(
services, configurations
) # Input
total_llap_mem = num_llap_nodes_requested * yarn_nm_mem_in_mb_normalized
self.logger.info(
"DBG: Calculated 'total_llap_mem' : {0}, using following: num_llap_nodes_requested : {1}, "
"yarn_nm_mem_in_mb_normalized : {2}".format(
total_llap_mem, num_llap_nodes_requested, yarn_nm_mem_in_mb_normalized
)
)
total_llap_mem_normalized = float(
self._normalizeDown(total_llap_mem, yarn_min_container_size)
)
self.logger.info(
"DBG: Calculated 'total_llap_mem_normalized' : {0}, using following: total_llap_mem : {1}, "
"yarn_min_container_size : {2}".format(
total_llap_mem_normalized, total_llap_mem, yarn_min_container_size
)
)
# What percent is 'total_llap_mem' of 'total_cluster_capacity' ?
llap_named_queue_cap_fraction = ceil(
total_llap_mem_normalized / total_cluster_capacity * 100
)
self.logger.info(
f"DBG: Calculated '{llap_queue_name}' queue capacity percent = {llap_named_queue_cap_fraction}."
)
if llap_named_queue_cap_fraction > 100:
self.logger.warning(
f"Calculated '{llap_queue_name}' queue size = {llap_named_queue_cap_fraction}. Cannot be > 100."
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
# Adjust capacity scheduler for the 'llap' named queue.
self.checkAndManageLlapQueue(
services, configurations, hosts, llap_queue_name, llap_named_queue_cap_fraction
)
hive_tez_am_cap_available = total_llap_mem_normalized
self.logger.info(f"DBG: hive_tez_am_cap_available : {hive_tez_am_cap_available}")
# Common calculations now, irrespective of the queue selected.
llap_mem_for_tezAm_and_daemons = (
total_llap_mem_normalized - yarn_service_am_container_size
)
self.logger.info(
"DBG: Calculated 'llap_mem_for_tezAm_and_daemons' : {0}, using following : total_llap_mem_normalized : {1}, "
"yarn_service_am_container_size : {2}".format(
llap_mem_for_tezAm_and_daemons,
total_llap_mem_normalized,
yarn_service_am_container_size,
)
)
if llap_mem_for_tezAm_and_daemons < 2 * yarn_min_container_size:
self.logger.warning("Not enough capacity available on the cluster to run LLAP")
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
# Calculate llap concurrency (i.e. Number of Tez AM's)
max_executors_per_node = self.get_max_executors_per_node(
yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap
)
# Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it.
if not llap_concurrency_in_changed_configs:
if max_executors_per_node <= 0:
self.logger.warning(
f"Calculated 'max_executors_per_node' = {max_executors_per_node}. Expected value >= 1."
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
self.logger.info(
"DBG: Calculated 'max_executors_per_node' : {0}, using following: yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
"mem_per_thread_for_llap: {3}".format(
max_executors_per_node,
yarn_nm_mem_in_mb_normalized,
cpu_per_nm_host,
mem_per_thread_for_llap,
)
)
# Default 1 AM for every 20 executor threads.
# The second part of the min calculates based on mem required for DEFAULT_EXECUTOR_TO_AM_RATIO executors + 1 AM,
# making use of total memory. However, it's possible that total memory will not be used - and the numExecutors is
# instead limited by #CPUs. Use maxPerNode to factor this in.
llap_concurreny_limit = min(
floor(
max_executors_per_node
* num_llap_nodes_requested
/ DEFAULT_EXECUTOR_TO_AM_RATIO
),
MAX_CONCURRENT_QUERIES,
)
self.logger.info(
"DBG: Calculated 'llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested : {2}, DEFAULT_EXECUTOR_TO_AM_RATIO "
": {3}, MAX_CONCURRENT_QUERIES : {4}".format(
llap_concurreny_limit,
max_executors_per_node,
num_llap_nodes_requested,
DEFAULT_EXECUTOR_TO_AM_RATIO,
MAX_CONCURRENT_QUERIES,
)
)
llap_concurrency = min(
llap_concurreny_limit,
floor(
llap_mem_for_tezAm_and_daemons
/ (
DEFAULT_EXECUTOR_TO_AM_RATIO * mem_per_thread_for_llap
+ normalized_tez_am_container_size
)
),
)
self.logger.info(
"DBG: Calculated 'llap_concurrency' : {0}, using following : llap_concurreny_limit : {1}, llap_mem_for_tezAm_and_daemons : "
"{2}, DEFAULT_EXECUTOR_TO_AM_RATIO : {3}, mem_per_thread_for_llap : {4}, normalized_tez_am_container_size : "
"{5}".format(
llap_concurrency,
llap_concurreny_limit,
llap_mem_for_tezAm_and_daemons,
DEFAULT_EXECUTOR_TO_AM_RATIO,
mem_per_thread_for_llap,
normalized_tez_am_container_size,
)
)
if llap_concurrency == 0:
llap_concurrency = 1
self.logger.info(
"DBG: Readjusted 'llap_concurrency' to : 1. Earlier calculated value : 0"
)
if (
llap_concurrency * normalized_tez_am_container_size > hive_tez_am_cap_available
):
llap_concurrency = int(
math.floor(hive_tez_am_cap_available / normalized_tez_am_container_size)
)
self.logger.info(
"DBG: Readjusted 'llap_concurrency' to : {0}, as llap_concurrency({1}) * normalized_tez_am_container_size({2}) > hive_tez_am_cap_available({3}))".format(
llap_concurrency,
llap_concurrency,
normalized_tez_am_container_size,
hive_tez_am_cap_available,
)
)
if llap_concurrency <= 0:
self.logger.warning(
f"DBG: Calculated 'LLAP Concurrent Queries' = {llap_concurrency}. Expected value >= 1."
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
self.logger.info(
"DBG: Adjusted 'llap_concurrency' : {0}, using following: hive_tez_am_cap_available : {1}, normalized_tez_am_container_size: "
"{2}".format(
llap_concurrency,
hive_tez_am_cap_available,
normalized_tez_am_container_size,
)
)
else:
# Read current value
if "hive.server2.tez.sessions.per.default.queue" in hsi_site:
llap_concurrency = int(hsi_site["hive.server2.tez.sessions.per.default.queue"])
if llap_concurrency <= 0:
self.logger.warning(
f"'hive.server2.tez.sessions.per.default.queue' current value : {llap_concurrency}. Expected value : >= 1"
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
self.logger.info(f"DBG: Read 'llap_concurrency' : {llap_concurrency}")
else:
llap_concurrency = 1
self.logger.warning(
"Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config. Setting default value 1."
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
# Calculate 'Max LLAP Consurrency', irrespective of whether 'llap_concurrency' was read or calculated.
max_llap_concurreny_limit = min(
floor(
max_executors_per_node * num_llap_nodes_requested / MIN_EXECUTOR_TO_AM_RATIO
),
MAX_CONCURRENT_QUERIES,
)
self.logger.info(
"DBG: Calculated 'max_llap_concurreny_limit' : {0}, using following : max_executors_per_node : {1}, num_llap_nodes_requested "
": {2}, MIN_EXECUTOR_TO_AM_RATIO : {3}, MAX_CONCURRENT_QUERIES : {4}".format(
max_llap_concurreny_limit,
max_executors_per_node,
num_llap_nodes_requested,
MIN_EXECUTOR_TO_AM_RATIO,
MAX_CONCURRENT_QUERIES,
)
)
# Calculate value for 'num_llap_nodes', an across cluster config.
tez_am_memory_required = llap_concurrency * normalized_tez_am_container_size
self.logger.info(
"DBG: Calculated 'tez_am_memory_required' : {0}, using following : llap_concurrency : {1}, normalized_tez_am_container_size : "
"{2}".format(
tez_am_memory_required, llap_concurrency, normalized_tez_am_container_size
)
)
llap_mem_daemon_size = llap_mem_for_tezAm_and_daemons - tez_am_memory_required
if llap_mem_daemon_size < yarn_min_container_size:
self.logger.warning(
"Calculated 'LLAP Daemon Size = {0}'. Expected >= 'YARN Minimum Container Size' ({1})'".format(
llap_mem_daemon_size, yarn_min_container_size
)
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
if (
llap_mem_daemon_size < mem_per_thread_for_llap
or llap_mem_daemon_size < yarn_min_container_size
):
self.logger.warning("Not enough memory available for executors.")
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
self.logger.info(
"DBG: Calculated 'llap_mem_daemon_size' : {0}, using following : llap_mem_for_tezAm_and_daemons : {1}, tez_am_memory_required : "
"{2}".format(
llap_mem_daemon_size, llap_mem_for_tezAm_and_daemons, tez_am_memory_required
)
)
llap_daemon_mem_per_node = self._normalizeDown(
llap_mem_daemon_size / num_llap_nodes_requested, yarn_min_container_size
)
# This value takes into account total cluster capacity, and may not have left enough capcaity on each node to launch an AM.
self.logger.info(
"DBG: Calculated 'llap_daemon_mem_per_node' : {0}, using following : llap_mem_daemon_size : {1}, num_llap_nodes_requested : {2}, "
"yarn_min_container_size: {3}".format(
llap_daemon_mem_per_node,
llap_mem_daemon_size,
num_llap_nodes_requested,
yarn_min_container_size,
)
)
if llap_daemon_mem_per_node == 0:
# Small cluster. No capacity left on a node after running AMs.
llap_daemon_mem_per_node = self._normalizeUp(
mem_per_thread_for_llap, yarn_min_container_size
)
num_llap_nodes = floor(llap_mem_daemon_size / llap_daemon_mem_per_node)
self.logger.info(
"DBG: 'llap_daemon_mem_per_node' : 0, adjusted 'llap_daemon_mem_per_node' : {0}, 'num_llap_nodes' : {1}, using following: llap_mem_daemon_size : {2}, "
"mem_per_thread_for_llap : {3}".format(
llap_daemon_mem_per_node,
num_llap_nodes,
llap_mem_daemon_size,
mem_per_thread_for_llap,
)
)
elif llap_daemon_mem_per_node < mem_per_thread_for_llap:
# Previously computed value of memory per thread may be too high. Cut the number of nodes. (Alternately reduce memory per node)
llap_daemon_mem_per_node = mem_per_thread_for_llap
num_llap_nodes = floor(llap_mem_daemon_size / mem_per_thread_for_llap)
self.logger.info(
"DBG: 'llap_daemon_mem_per_node'({0}) < mem_per_thread_for_llap({1}), adjusted 'llap_daemon_mem_per_node' "
": {2}".format(
llap_daemon_mem_per_node, mem_per_thread_for_llap, llap_daemon_mem_per_node
)
)
else:
# All good. We have a proper value for memoryPerNode.
num_llap_nodes = num_llap_nodes_requested
self.logger.info(f"DBG: num_llap_nodes : {num_llap_nodes}")
# Make sure we have enough memory on each node to run AMs.
# If nodes vs nodes_requested is different - AM memory is already factored in.
# If llap_node_count < total_cluster_nodes - assuming AMs can run on a different node.
# Else factor in min_concurrency_per_node * tez_am_size, and yarn_service_am_container_size
# Also needs to factor in whether num_llap_nodes = cluster_node_count
min_mem_reserved_per_node = 0
if (
num_llap_nodes == num_llap_nodes_requested and num_llap_nodes == node_manager_cnt
):
min_mem_reserved_per_node = max(
normalized_tez_am_container_size, yarn_service_am_container_size
)
tez_AMs_per_node = llap_concurrency / num_llap_nodes
tez_AMs_per_node_low = int(math.floor(tez_AMs_per_node))
tez_AMs_per_node_high = int(math.ceil(tez_AMs_per_node))
min_mem_reserved_per_node = int(
max(
tez_AMs_per_node_high * normalized_tez_am_container_size,
tez_AMs_per_node_low * normalized_tez_am_container_size
+ yarn_service_am_container_size,
)
)
self.logger.info(
"DBG: Determined 'AM reservation per node': {0}, using following : concurrency: {1}, num_llap_nodes: {2}, AMsPerNode: {3}".format(
min_mem_reserved_per_node, llap_concurrency, num_llap_nodes, tez_AMs_per_node
)
)
max_single_node_mem_available_for_daemon = self._normalizeDown(
yarn_nm_mem_in_mb_normalized - min_mem_reserved_per_node, yarn_min_container_size
)
if (
max_single_node_mem_available_for_daemon <= 0
or max_single_node_mem_available_for_daemon < mem_per_thread_for_llap
):
self.logger.warning(
"Not enough capacity available per node for daemons after factoring in AM memory requirements. NM Mem: {0}, "
"minAMMemPerNode: {1}, available: {2}".format(
yarn_nm_mem_in_mb_normalized,
min_mem_reserved_per_node,
max_single_node_mem_available_for_daemon,
)
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
llap_daemon_mem_per_node = min(
max_single_node_mem_available_for_daemon, llap_daemon_mem_per_node
)
self.logger.info(
"DBG: Determined final memPerDaemon: {0}, using following: concurrency: {1}, numNMNodes: {2}, numLlapNodes: {3} ".format(
llap_daemon_mem_per_node, llap_concurrency, node_manager_cnt, num_llap_nodes
)
)
num_executors_per_node_max = self.get_max_executors_per_node(
yarn_nm_mem_in_mb_normalized, cpu_per_nm_host, mem_per_thread_for_llap
)
if num_executors_per_node_max < 1:
self.logger.warning(
f"Calculated 'Max. Executors per Node' = {num_executors_per_node_max}. Expected values >= 1."
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
self.logger.info(
"DBG: Calculated 'num_executors_per_node_max' : {0}, using following : yarn_nm_mem_in_mb_normalized : {1}, cpu_per_nm_host : {2}, "
"mem_per_thread_for_llap: {3}".format(
num_executors_per_node_max,
yarn_nm_mem_in_mb_normalized,
cpu_per_nm_host,
mem_per_thread_for_llap,
)
)
# NumExecutorsPerNode is not necessarily max - since some capacity would have been reserved for AMs, if this value were based on mem.
num_executors_per_node = min(
floor(llap_daemon_mem_per_node / mem_per_thread_for_llap),
num_executors_per_node_max,
)
if num_executors_per_node <= 0:
self.logger.warning(
f"Calculated 'Number of Executors Per Node' = {num_executors_per_node}. Expected value >= 1"
)
self.recommendDefaultLlapConfiguration(configurations, services, hosts)
return
self.logger.info(
"DBG: Calculated 'num_executors_per_node' : {0}, using following : llap_daemon_mem_per_node : {1}, num_executors_per_node_max : {2}, "
"mem_per_thread_for_llap: {3}".format(
num_executors_per_node,
llap_daemon_mem_per_node,
num_executors_per_node_max,
mem_per_thread_for_llap,
)
)
# Now figure out how much of the memory will be used by the executors, and how much will be used by the cache.
total_mem_for_executors_per_node = num_executors_per_node * mem_per_thread_for_llap
cache_mem_per_node = llap_daemon_mem_per_node - total_mem_for_executors_per_node
self.logger.info(
"DBG: Calculated 'Cache per node' : {0}, using following : llap_daemon_mem_per_node : {1}, total_mem_for_executors_per_node : {2}".format(
cache_mem_per_node, llap_daemon_mem_per_node, total_mem_for_executors_per_node
)
)
tez_runtime_io_sort_mb = int((0.8 * mem_per_thread_for_llap) / 3)
tez_runtime_unordered_output_buffer_size = int(
0.8 * 0.075 * mem_per_thread_for_llap
)
# 'hive_auto_convert_join_noconditionaltask_size' value is in bytes. Thus, multiplying it by 1048576.
hive_auto_convert_join_noconditionaltask_size = (
int((0.8 * mem_per_thread_for_llap) / 3)
) * MB_TO_BYTES
# Calculate value for prop 'llap_heap_size'
llap_xmx = max(
total_mem_for_executors_per_node * 0.8,
total_mem_for_executors_per_node
- self.get_llap_headroom_space(services, configurations),
)
self.logger.info(
f"DBG: Calculated llap_app_heap_size : {llap_xmx}, using following : total_mem_for_executors : {total_mem_for_executors_per_node}"
)
# Calculate 'hive_heapsize' for Hive2/HiveServer2 (HSI)
hive_server_interactive_heapsize = None
hive_server_interactive_hosts = self.getHostsWithComponent(
"HIVE", "HIVE_SERVER_INTERACTIVE", services, hosts
)
if hive_server_interactive_hosts is None:
# If its None, read the base service YARN's NODEMANAGER node memory, as are host are considered homogenous.
hive_server_interactive_hosts = self.getHostsWithComponent(
"YARN", "NODEMANAGER", services, hosts
)
if (
hive_server_interactive_hosts is not None
and len(hive_server_interactive_hosts) > 0
):
host_mem = int(hive_server_interactive_hosts[0]["Hosts"]["total_mem"])
hive_server_interactive_heapsize = min(
max(2048.0, 400.0 * llap_concurrency), 3.0 / 8 * host_mem
)
self.logger.info(
"DBG: Calculated 'hive_server_interactive_heapsize' : {0}, using following : llap_concurrency : {1}, host_mem : "
"{2}".format(hive_server_interactive_heapsize, llap_concurrency, host_mem)
)
# Done with calculations, updating calculated configs.
self.logger.info("DBG: Applying the calculated values....")
if is_cluster_create_opr or changed_configs_has_enable_hive_int:
normalized_tez_am_container_size = int(normalized_tez_am_container_size)
putTezInteractiveSiteProperty(
"tez.am.resource.memory.mb", normalized_tez_am_container_size
)
self.logger.info(
f"DBG: Setting 'tez.am.resource.memory.mb' config value as : {normalized_tez_am_container_size}"
)
if not llap_concurrency_in_changed_configs:
putHiveInteractiveSiteProperty(
"hive.server2.tez.sessions.per.default.queue",
max(int(num_executors_per_node / 16), 1),
)
putHiveInteractiveSitePropertyAttribute(
"hive.server2.tez.sessions.per.default.queue",
"maximum",
max(int(num_executors_per_node / 4), 1),
)
num_llap_nodes = int(num_llap_nodes)
putHiveInteractiveEnvPropertyAttribute(
"num_llap_nodes", "minimum", min_nodes_required
)
putHiveInteractiveEnvPropertyAttribute(
"num_llap_nodes", "maximum", node_manager_cnt
)
# TODO A single value is not being set for numNodes in case of a custom queue. Also the attribute is set to non-visible, so the UI likely ends up using an old cached value
if num_llap_nodes != num_llap_nodes_requested:
self.logger.info(
f"DBG: User requested num_llap_nodes : {num_llap_nodes_requested}, but used/adjusted value for calculations is : {num_llap_nodes}"
)
else:
self.logger.info(
f"DBG: Used num_llap_nodes for calculations : {num_llap_nodes_requested}"
)
# Safeguard for not adding "num_llap_nodes_for_llap_daemons" if it doesnt exist in hive-interactive-site.
# This can happen if we upgrade from Ambari 2.4 (with HDP 2.5) to Ambari 2.5, as this config is from 2.6 stack onwards only.
if (
"hive-interactive-env" in services["configurations"]
and "num_llap_nodes_for_llap_daemons"
in services["configurations"]["hive-interactive-env"]["properties"]
):
putHiveInteractiveEnvProperty("num_llap_nodes_for_llap_daemons", num_llap_nodes)
self.logger.info(
f"DBG: Setting config 'num_llap_nodes_for_llap_daemons' as : {num_llap_nodes}"
)
llap_container_size = int(llap_daemon_mem_per_node)
putHiveInteractiveSiteProperty(
"hive.llap.daemon.yarn.container.mb", llap_container_size
)
putHiveInteractiveSitePropertyAttribute(
"hive.llap.daemon.yarn.container.mb", "minimum", yarn_min_container_size
)
putHiveInteractiveSitePropertyAttribute(
"hive.llap.daemon.yarn.container.mb",
"maximum",
self.__get_min_hsi_mem(services, hosts) * 0.8,
)
# Set 'hive.tez.container.size' only if it is read as "SET_ON_FIRST_INVOCATION", implying initialization.
# Else, we don't (1). Override the previous calculated value or (2). User provided value.
if is_cluster_create_opr or changed_configs_has_enable_hive_int:
mem_per_thread_for_llap = int(mem_per_thread_for_llap)
putHiveInteractiveSiteProperty("hive.tez.container.size", mem_per_thread_for_llap)
self.logger.info(
f"DBG: Setting 'hive.tez.container.size' config value as : {mem_per_thread_for_llap}"
)
putTezInteractiveSiteProperty("tez.runtime.io.sort.mb", tez_runtime_io_sort_mb)
if (
"tez-site" in services["configurations"]
and "tez.runtime.sorter.class"
in services["configurations"]["tez-site"]["properties"]
):
if (
services["configurations"]["tez-site"]["properties"]["tez.runtime.sorter.class"]
== "LEGACY"
):
putTezInteractiveSitePropertyAttribute(
"tez.runtime.io.sort.mb", "maximum", 1800
)
putTezInteractiveSiteProperty(
"tez.runtime.unordered.output.buffer.size-mb",
tez_runtime_unordered_output_buffer_size,
)
putHiveInteractiveSiteProperty(
"hive.auto.convert.join.noconditionaltask.size",
hive_auto_convert_join_noconditionaltask_size,
)
num_executors_per_node = int(num_executors_per_node)
self.logger.info(f"DBG: Putting num_executors_per_node as {num_executors_per_node}")
putHiveInteractiveSiteProperty(
"hive.llap.daemon.num.executors", num_executors_per_node
)
putHiveInteractiveSitePropertyAttribute(
"hive.llap.daemon.num.executors", "minimum", 1
)
putHiveInteractiveSitePropertyAttribute(
"hive.llap.daemon.num.executors", "maximum", int(num_executors_per_node_max)
)
# 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for
# 'hive.llap.daemon.num.executors' at all times.
cache_mem_per_node = int(cache_mem_per_node)
putHiveInteractiveSiteProperty(
"hive.llap.io.threadpool.size", num_executors_per_node
)
putHiveInteractiveSiteProperty("hive.llap.io.memory.size", cache_mem_per_node)
putHiveInteractiveSitePropertyAttribute("hive.llap.io.memory.size", "minimum", 0)
putHiveInteractiveSitePropertyAttribute(
"hive.llap.io.memory.size",
"maximum",
self.__get_min_hsi_mem(services, hosts) * 0.8,
)
if hive_server_interactive_heapsize is not None:
putHiveInteractiveEnvProperty(
"hive_heapsize", int(hive_server_interactive_heapsize)
)
ssd_cache_on = (
services["configurations"]["hive-interactive-site"]["properties"][
"hive.llap.io.allocator.mmap"
]
== "true"
)
llap_io_enabled = (
"true" if int(cache_mem_per_node) >= 1024 or ssd_cache_on else "false"
)
services["forced-configurations"].append(
{"type": "hive-interactive-site", "name": "hive.llap.io.enabled"}
)
putHiveInteractiveSiteProperty("hive.llap.io.enabled", llap_io_enabled)
putHiveInteractiveEnvProperty("llap_heap_size", int(llap_xmx))
self.logger.info("DBG: Done putting all configs")