in infrastructure-provisioning/src/general/lib/gcp/actions_lib.py [0:0]
def configure_dataengine_spark(cluster_name, jars_dir, cluster_dir, datalake_enabled, spark_configs=''):
subprocess.run("jar_list=`find {0} -name '*.jar' | tr '\\n' ',' | sed 's/,$//'` ; echo \"spark.jars $jar_list\" >> \
/tmp/{1}/notebook_spark-defaults_local.conf".format(jars_dir, cluster_name), shell=True, check=True)
if os.path.exists('{0}spark/conf/spark-defaults.conf'.format(cluster_dir)):
additional_spark_properties = subprocess.run('diff --changed-group-format="%>" --unchanged-group-format="" '
'/tmp/{0}/notebook_spark-defaults_local.conf '
'{1}spark/conf/spark-defaults.conf | grep -v "^#"'.format(
cluster_name, cluster_dir), capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
for property in additional_spark_properties.split('\n'):
subprocess.run('echo "{0}" >> /tmp/{1}/notebook_spark-defaults_local.conf'.format(property, cluster_name), shell=True, check=True)
if os.path.exists('{0}'.format(cluster_dir)):
subprocess.run('cp -f /tmp/{0}/notebook_spark-defaults_local.conf {1}spark/conf/spark-defaults.conf'.format(cluster_name,
cluster_dir), shell=True, check=True)
if os.path.exists('{0}'.format(cluster_dir)):
subprocess.run('cp -f /opt/spark/conf/core-site.xml {}spark/conf/'.format(cluster_dir), shell=True, check=True)
if spark_configs and os.path.exists('{0}'.format(cluster_dir)):
datalab_header = subprocess.run('cat /tmp/{0}/notebook_spark-defaults_local.conf | grep "^#"'.format(cluster_name),
capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
spark_configurations = ast.literal_eval(spark_configs)
new_spark_defaults = list()
spark_defaults = subprocess.run('cat {0}spark/conf/spark-defaults.conf'.format(cluster_dir), capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
current_spark_properties = spark_defaults.split('\n')
for param in current_spark_properties:
if param.split(' ')[0] != '#':
for config in spark_configurations:
if config['Classification'] == 'spark-defaults':
for property in config['Properties']:
if property == param.split(' ')[0]:
param = property + ' ' + config['Properties'][property]
else:
new_spark_defaults.append(property + ' ' + config['Properties'][property])
new_spark_defaults.append(param)
new_spark_defaults = set(new_spark_defaults)
subprocess.run("echo '{0}' > {1}/spark/conf/spark-defaults.conf".format(datalab_header, cluster_dir), shell=True, check=True)
for prop in new_spark_defaults:
prop = prop.rstrip()
subprocess.run('echo "{0}" >> {1}/spark/conf/spark-defaults.conf'.format(prop, cluster_dir), shell=True, check=True)
subprocess.run('sed -i "/^\s*$/d" {0}/spark/conf/spark-defaults.conf'.format(cluster_dir), shell=True, check=True)