in infrastructure-provisioning/src/general/lib/aws/actions_lib.py [0:0]
def configure_local_spark(jars_dir, templates_dir, memory_type='driver'):
try:
# Checking if spark.jars parameter was generated previously
spark_jars_paths = None
if exists(datalab.fab.conn, '/opt/spark/conf/spark-defaults.conf'):
try:
spark_jars_paths = datalab.fab.conn.sudo('cat /opt/spark/conf/spark-defaults.conf | grep -e "^spark.jars " ').stdout
except:
spark_jars_paths = None
region = datalab.fab.conn.sudo('curl http://169.254.169.254/latest/meta-data/placement/availability-zone').stdout[:-1]
if region == 'us-east-1':
endpoint_url = 'https://s3.amazonaws.com'
elif region == 'cn-north-1':
endpoint_url = "https://s3.{}.amazonaws.com.cn".format(region)
else:
endpoint_url = 'https://s3-' + region + '.amazonaws.com'
datalab.fab.conn.put(templates_dir + 'notebook_spark-defaults_local.conf', '/tmp/notebook_spark-defaults_local.conf')
datalab.fab.conn.sudo('echo "spark.hadoop.fs.s3a.endpoint {}" >> /tmp/notebook_spark-defaults_local.conf'.format(
endpoint_url))
datalab.fab.conn.sudo('echo "spark.hadoop.fs.s3a.server-side-encryption-algorithm AES256" >> '
'/tmp/notebook_spark-defaults_local.conf')
java_home = datalab.fab.conn.run("update-alternatives --query java | grep -o --color=never \'/.*/java-8.*/jre\'").stdout.splitlines()[0].replace('\n','')
datalab.fab.conn.sudo("echo 'export JAVA_HOME=\'{}\'' >> /opt/spark/conf/spark-env.sh".format(java_home))
if os.environ['application'] == 'zeppelin':
datalab.fab.conn.sudo('echo \"spark.jars $(ls -1 ' + jars_dir + '* | tr \'\\n\' \',\')\" >> '
'/tmp/notebook_spark-defaults_local.conf')
datalab.fab.conn.sudo('\cp -f /tmp/notebook_spark-defaults_local.conf /opt/spark/conf/spark-defaults.conf')
if memory_type == 'driver':
spark_memory = datalab.fab.get_spark_memory()
datalab.fab.conn.sudo('sed -i "/spark.*.memory/d" /opt/spark/conf/spark-defaults.conf')
datalab.fab.conn.sudo('''bash -c 'echo "spark.{0}.memory {1}m" >> /opt/spark/conf/spark-defaults.conf' '''.format(memory_type,
spark_memory))
if 'spark_configurations' in os.environ:
datalab_header = datalab.fab.conn.sudo('cat /tmp/notebook_spark-defaults_local.conf | grep "^#"').stdout
spark_configurations = ast.literal_eval(os.environ['spark_configurations'])
new_spark_defaults = list()
spark_defaults = datalab.fab.conn.sudo('cat /opt/spark/conf/spark-defaults.conf').stdout
current_spark_properties = spark_defaults.split('\n')
for param in current_spark_properties:
if param.split(' ')[0] != '#':
for config in spark_configurations:
if config['Classification'] == 'spark-defaults':
for property in config['Properties']:
if property == param.split(' ')[0]:
param = property + ' ' + config['Properties'][property]
else:
new_spark_defaults.append(property + ' ' + config['Properties'][property])
new_spark_defaults.append(param)
new_spark_defaults = set(new_spark_defaults)
datalab.fab.conn.sudo('''bash -c 'echo "{}" > /opt/spark/conf/spark-defaults.conf' '''.format(datalab_header))
for prop in new_spark_defaults:
prop = prop.rstrip()
datalab.fab.conn.sudo('''bash -c 'echo "{}" >> /opt/spark/conf/spark-defaults.conf' '''.format(prop))
datalab.fab.conn.sudo('sed -i "/^\s*$/d" /opt/spark/conf/spark-defaults.conf')
if spark_jars_paths:
datalab.fab.conn.sudo('''bash -c 'echo "{}" >> /opt/spark/conf/spark-defaults.conf' '''.format(spark_jars_paths))
except Exception as err:
print('Error:', str(err))
sys.exit(1)