in tools/dataproc-event-driven-spark-recommendations/src/main.py [0:0]
def evaluate_properties(cluster):
"""
Evaluate each cluster's spark properties based on standardized
best practices.
Return report of necessary changes.
"""
w_map = cluster.config.worker_config
p_map = cluster.config.software_config.properties
m_type_index = len(w_map.machine_type_uri.split('/')) - 1
m_type_str = w_map.machine_type_uri.split('/')[m_type_index]
m_type_info = load_machine_type_info(m_type_str)[0]
m_type = m_type_info['name']
nodes = int(w_map.num_instances)
ram_per_node = m_type_info['memoryMb']
vcores = m_type_info['guestCpus']
executor_cores = str(p_map['spark:spark.executor.cores'])
driver_cores = str(p_map['spark:spark.driver.cores'])
executor_instances = str(p_map['spark:spark.executor.instances'])
executor_memory = str(p_map['spark:spark.executor.memory'])
driver_memory = str(p_map['spark:spark.driver.memory'])
executor_memory_overhead = str(p_map['spark:spark.executor.memoryOverhead'])
default_parallelism = str(p_map['spark:spark.default.parallelism'])
sql_shuffle_partitions = str(p_map['spark:spark.sql.shuffle.partitions'])
shuffle_spill_compress = str(p_map['spark:spark.shuffle.spill.compress'])
checkpoint_compress = str(p_map['spark:spark.checkpoint.compress'])
io_compression_codec = str(p_map['spark:spark.io.compression.codec'])
dynamic_allocation = str(p_map['spark:spark.dynamicAllocation.enabled'])
shuffle_service = str(p_map['spark:spark.shuffle.service.enabled'])
if 'g' in executor_memory:
executor_memory = gb_to_mb_property(executor_memory)
if 'g' in driver_memory:
driver_memory = gb_to_mb_property(driver_memory)
if 'g' in executor_memory_overhead:
executor_memory_overhead = gb_to_mb_property(executor_memory_overhead)
executor_per_node = round((vcores - 1) / 5) if round(
(vcores - 1) / 5) > 0 else 1
rec_executor_instances = 1
if (executor_per_node * nodes) - 1 > 0:
rec_executor_instances = (executor_per_node * nodes) - 1
rec_executor_memory = str(
math.floor((ram_per_node - 1024) / executor_per_node * 0.9)) + 'm'
rec_driver_memory = rec_executor_memory
rec_executor_memory_overhead = str(
math.ceil((ram_per_node - 1024) / executor_per_node * 0.1)) + 'm'
rec_default_parallelism = ((executor_per_node * nodes) - 1) * 10
rec_sql_shuffle_partitions = rec_default_parallelism
recs = {}
if executor_cores != 5:
recs['spark:spark.executor.cores'] = 5
if driver_cores != 5:
recs['spark:spark.driver.cores'] = 5
if shuffle_spill_compress != 'true':
recs['spark:spark.shuffle.spill.compress'] = 'true'
if checkpoint_compress != 'true':
recs['spark:spark.checkpoint.compress'] = 'true'
if io_compression_codec == '':
codec_msg = 'snappy (if splittable files), lz4 (otherwise)'
recs['spark:spark.io.compression.codec'] = codec_msg
if shuffle_service == '':
ss_msg = 'true (if multiple spark apps on cluster), false (otherwise)'
recs['spark:spark.shuffle.service.enabled'] = ss_msg
if dynamic_allocation == '':
da_msg = 'true (if multiple spark apps on cluster), false (otherwise)'
recs['spark:spark.dynamicAllocation.enabled'] = da_msg
if executor_instances != rec_executor_instances:
recs['spark:spark.executor.instances'] = rec_executor_instances
if executor_memory != rec_executor_memory:
recs['spark:spark.executor.memory'] = rec_executor_memory
if driver_memory != rec_driver_memory:
recs['spark:spark.driver.memory'] = rec_driver_memory
if executor_memory_overhead != rec_executor_memory_overhead:
recs[
'spark:spark.executor.memoryOverhead'] = rec_executor_memory_overhead
if default_parallelism != rec_default_parallelism:
recs['spark:spark.default.parallelism'] = rec_default_parallelism
if sql_shuffle_partitions != rec_sql_shuffle_partitions:
recs['spark:spark.sql.shuffle.partitions'] = rec_sql_shuffle_partitions
curr_conf = {}
curr_conf['cluster_name'] = cluster.cluster_name
curr_conf['m_type'] = m_type
curr_conf['nodes'] = nodes
curr_conf['vcores'] = vcores
curr_conf['ram_per_node'] = ram_per_node
curr_conf['spark.executor.cores'] = executor_cores
curr_conf['spark.driver.cores'] = driver_cores
curr_conf['spark.executor.instances'] = executor_instances
curr_conf['spark.executor.memory'] = executor_memory
curr_conf['spark.driver.memory'] = driver_memory
curr_conf['spark.executor.memoryOverhead'] = executor_memory_overhead
curr_conf['spark.default.parallelism'] = default_parallelism
curr_conf['spark.sql.shuffle.partitions'] = sql_shuffle_partitions
curr_conf['spark.shuffle.spill.compress'] = shuffle_spill_compress
curr_conf['spark.io.compression.codec'] = io_compression_codec
curr_conf['spark.dynamicAllocation.enabled'] = dynamic_allocation
curr_conf['spark.shuffle.service.enabled'] = shuffle_service
report = {}
report['curr_confuration'] = curr_conf
report['recs'] = recs
json_report = json.dumps(report, indent=4)
return json_report