def configure_dataengine_spark()

in infrastructure-provisioning/src/general/lib/azure/actions_lib.py [0:0]


def configure_dataengine_spark(cluster_name, jars_dir, cluster_dir, datalake_enabled, spark_configs=''):
    subprocess.run("jar_list=`find {0} -name '*.jar' | tr '\\n' ',' | sed 's/,$//'` ; echo \"spark.jars $jar_list\" >> \
          /tmp/{1}/notebook_spark-defaults_local.conf".format(jars_dir, cluster_name), shell=True, check=True)
    if os.path.exists('{0}spark/conf/spark-defaults.conf'.format(cluster_dir)):
        additional_spark_properties = subprocess.run('diff --changed-group-format="%>" --unchanged-group-format="" '
                                                     '/tmp/{0}/notebook_spark-defaults_local.conf '
                                                     '{1}spark/conf/spark-defaults.conf | grep -v "^#"'.format(
            cluster_name, cluster_dir), capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip(
            "\n\r")
        for property in additional_spark_properties.split('\n'):
            subprocess.run('echo "{0}" >> /tmp/{1}/notebook_spark-defaults_local.conf'.format(property, cluster_name),
                           shell=True, check=True)
    if os.path.exists('{0}'.format(cluster_dir)):
        subprocess.run(
            'cp -f /tmp/{0}/notebook_spark-defaults_local.conf  {1}spark/conf/spark-defaults.conf'.format(cluster_name,
                                                                                                          cluster_dir),
            shell=True, check=True)
        if datalake_enabled == 'false':
            subprocess.run('cp -f /opt/spark/conf/core-site.xml {}spark/conf/'.format(cluster_dir), shell=True,
                           check=True)
        else:
            subprocess.run(
                'cp -f /opt/hadoop/etc/hadoop/core-site.xml {}hadoop/etc/hadoop/core-site.xml'.format(cluster_dir),
                shell=True, check=True)
    if spark_configs and os.path.exists('{0}'.format(cluster_dir)):
        datalab_header = subprocess.run(
            'cat /tmp/{0}/notebook_spark-defaults_local.conf | grep "^#"'.format(cluster_name),
            capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
        spark_configurations = ast.literal_eval(spark_configs)
        new_spark_defaults = list()
        spark_defaults = subprocess.run('cat {0}spark/conf/spark-defaults.conf'.format(cluster_dir),
                                        capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip(
            "\n\r")
        current_spark_properties = spark_defaults.split('\n')
        for param in current_spark_properties:
            if param.split(' ')[0] != '#':
                for config in spark_configurations:
                    if config['Classification'] == 'spark-defaults':
                        for property in config['Properties']:
                            if property == param.split(' ')[0]:
                                param = property + ' ' + config['Properties'][property]
                            else:
                                new_spark_defaults.append(property + ' ' + config['Properties'][property])
                new_spark_defaults.append(param)
        new_spark_defaults = set(new_spark_defaults)
        subprocess.run("echo '{0}' > {1}/spark/conf/spark-defaults.conf".format(datalab_header, cluster_dir),
                       shell=True, check=True)
        for prop in new_spark_defaults:
            prop = prop.rstrip()
            subprocess.run('echo "{0}" >> {1}/spark/conf/spark-defaults.conf'.format(prop, cluster_dir), shell=True,
                           check=True)
        subprocess.run('sed -i "/^\s*$/d" {0}/spark/conf/spark-defaults.conf'.format(cluster_dir), shell=True,
                       check=True)