def configure_local_spark()

in infrastructure-provisioning/src/general/lib/azure/actions_lib.py [0:0]


def configure_local_spark(jars_dir, templates_dir, memory_type='driver'):
    try:
        # Checking if spark.jars parameter was generated previously
        spark_jars_paths = None
        if exists(datalab.fab.conn, '/opt/spark/conf/spark-defaults.conf'):
            try:
                spark_jars_paths = datalab.fab.conn.sudo(
                    'cat /opt/spark/conf/spark-defaults.conf | grep -e "^spark.jars " ').stdout
            except:
                spark_jars_paths = None
        user_storage_account_tag = "{}-{}-{}-bucket".format(os.environ['conf_service_base_name'],
                                                            os.environ['project_name'].lower(),
                                                            os.environ['endpoint_name'].lower())
        shared_storage_account_tag = '{0}-{1}-shared-bucket'.format(os.environ['conf_service_base_name'],
                                                                    os.environ['endpoint_name'].lower())
        for storage_account in datalab.meta_lib.AzureMeta().list_storage_accounts(
                os.environ['azure_resource_group_name']):
            if user_storage_account_tag == storage_account.tags["Name"]:
                user_storage_account_name = storage_account.name
                user_storage_account_key = datalab.meta_lib.AzureMeta().list_storage_keys(
                    os.environ['azure_resource_group_name'], user_storage_account_name)[0]
            if shared_storage_account_tag == storage_account.tags["Name"]:
                shared_storage_account_name = storage_account.name
                shared_storage_account_key = datalab.meta_lib.AzureMeta().list_storage_keys(
                    os.environ['azure_resource_group_name'], shared_storage_account_name)[0]
        if os.environ['azure_datalake_enable'] == 'false':
            datalab.fab.conn.put(templates_dir + 'core-site-storage.xml', '/tmp/core-site.xml')
        else:
            datalab.fab.conn.put(templates_dir + 'core-site-datalake.xml', '/tmp/core-site.xml')
        datalab.fab.conn.sudo(
            'sed -i "s|USER_STORAGE_ACCOUNT|{}|g" /tmp/core-site.xml'.format(user_storage_account_name))
        datalab.fab.conn.sudo(
            'sed -i "s|SHARED_STORAGE_ACCOUNT|{}|g" /tmp/core-site.xml'.format(shared_storage_account_name))
        datalab.fab.conn.sudo('sed -i "s|USER_ACCOUNT_KEY|{}|g" /tmp/core-site.xml'.format(user_storage_account_key))
        datalab.fab.conn.sudo(
            'sed -i "s|SHARED_ACCOUNT_KEY|{}|g" /tmp/core-site.xml'.format(shared_storage_account_key))
        if os.environ['azure_datalake_enable'] == 'true':
            client_id = os.environ['azure_application_id']
            refresh_token = os.environ['azure_user_refresh_token']
            datalab.fab.conn.sudo('sed -i "s|CLIENT_ID|{}|g" /tmp/core-site.xml'.format(client_id))
            datalab.fab.conn.sudo('sed -i "s|REFRESH_TOKEN|{}|g" /tmp/core-site.xml'.format(refresh_token))
        if os.environ['azure_datalake_enable'] == 'false':
            datalab.fab.conn.sudo('rm -f /opt/spark/conf/core-site.xml')
            datalab.fab.conn.sudo('mv /tmp/core-site.xml /opt/spark/conf/core-site.xml')
        else:
            datalab.fab.conn.sudo('rm -f /opt/hadoop/etc/hadoop/core-site.xml')
            datalab.fab.conn.sudo('mv /tmp/core-site.xml /opt/hadoop/etc/hadoop/core-site.xml')
        datalab.fab.conn.put(templates_dir + 'notebook_spark-defaults_local.conf',
                             '/tmp/notebook_spark-defaults_local.conf')
        datalab.fab.conn.sudo("jar_list=`find {} -name '*.jar' | tr '\\n' ','` ; echo \"spark.jars   $jar_list\" >> \
              /tmp/notebook_spark-defaults_local.conf".format(jars_dir))
        datalab.fab.conn.sudo('cp -f /tmp/notebook_spark-defaults_local.conf /opt/spark/conf/spark-defaults.conf')
        if memory_type == 'driver':
            spark_memory = datalab.fab.get_spark_memory()
            datalab.fab.conn.sudo('sed -i "/spark.*.memory/d" /opt/spark/conf/spark-defaults.conf')
            datalab.fab.conn.sudo(
                '''bash -c 'echo "spark.{0}.memory {1}m" >> /opt/spark/conf/spark-defaults.conf' '''.format(memory_type,
                                                                                                            spark_memory))
        if not exists(datalab.fab.conn, '/opt/spark/conf/spark-env.sh'):
            datalab.fab.conn.sudo('mv /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh')
        java_home = datalab.fab.conn.run(
            "update-alternatives --query java | grep -o --color=never \'/.*/java-8.*/jre\'").stdout.splitlines()[
            0].replace('\n', '')
        datalab.fab.conn.sudo("echo 'export JAVA_HOME=\'{}\'' >> /opt/spark/conf/spark-env.sh".format(java_home))
        if 'spark_configurations' in os.environ:
            datalab_header = datalab.fab.conn.sudo('cat /tmp/notebook_spark-defaults_local.conf | grep "^#"').stdout
            spark_configurations = ast.literal_eval(os.environ['spark_configurations'])
            new_spark_defaults = list()
            spark_defaults = datalab.fab.conn.sudo('cat /opt/spark/conf/spark-defaults.conf').stdout
            current_spark_properties = spark_defaults.split('\n')
            for param in current_spark_properties:
                if param.split(' ')[0] != '#':
                    for config in spark_configurations:
                        if config['Classification'] == 'spark-defaults':
                            for property in config['Properties']:
                                if property == param.split(' ')[0]:
                                    param = property + ' ' + config['Properties'][property]
                                else:
                                    new_spark_defaults.append(property + ' ' + config['Properties'][property])
                    new_spark_defaults.append(param)
            new_spark_defaults = set(new_spark_defaults)
            datalab.fab.conn.sudo(
                '''bash -c 'echo "{}" > /opt/spark/conf/spark-defaults.conf' '''.format(datalab_header))
            for prop in new_spark_defaults:
                prop = prop.rstrip()
                datalab.fab.conn.sudo('''bash -c 'echo "{}" >> /opt/spark/conf/spark-defaults.conf' '''.format(prop))
            datalab.fab.conn.sudo('sed -i "/^\s*$/d" /opt/spark/conf/spark-defaults.conf')
            if spark_jars_paths:
                datalab.fab.conn.sudo(
                    '''bash -c 'echo "{}" >> /opt/spark/conf/spark-defaults.conf' '''.format(spark_jars_paths))
    except Exception as err:
        print('Error:', str(err))
        sys.exit(1)