def configure_zeppelin_emr_interpreter()

in infrastructure-provisioning/src/general/lib/aws/actions_lib.py [0:0]


def configure_zeppelin_emr_interpreter(emr_version, cluster_name, region, spark_dir, os_user, yarn_dir, bucket,
                                       user_name, endpoint_url, multiple_emrs):
    try:
        port_number_found = False
        zeppelin_restarted = False
        default_port = 8998
        get_cluster_python_version(region, bucket, user_name, cluster_name)
        with open('/tmp/python_version') as f:
            python_version = f.read()
        python_version = python_version[0:5]
        livy_port = ''
        livy_path = '/opt/{0}/{1}/livy/'.format(emr_version, cluster_name)
        spark_libs = "/opt/{0}/jars/usr/share/aws/aws-java-sdk/aws-java-sdk-core*.jar " \
                     "/opt/{0}/jars/usr/lib/hadoop/hadoop-aws*.jar " \
                     "/opt/{0}/jars/usr/share/aws/aws-java-sdk/aws-java-sdk-s3-*.jar " \
                     "/opt/{0}/jars/usr/lib/hadoop-lzo/lib/hadoop-lzo-*.jar".format(emr_version)
        # fix due to: Multiple py4j files found under ..../spark/python/lib
        # py4j-0.10.7-src.zip still in folder. Versions may varies.
        subprocess.run('rm /opt/{0}/{1}/spark/python/lib/py4j-src.zip'.format(emr_version, cluster_name), shell=True, check=True)

        subprocess.run('echo \"Configuring emr path for Zeppelin\"', shell=True, check=True)
        subprocess.run('sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/{0}\/{1}\/spark/\" '
              '/opt/zeppelin/conf/zeppelin-env.sh'.format(emr_version, cluster_name), shell=True, check=True)
        subprocess.run('sed -i "s/^export HADOOP_CONF_DIR.*/export HADOOP_CONF_DIR=' + \
              '\/opt\/{0}\/{1}\/conf/" /opt/{0}/{1}/spark/conf/spark-env.sh'.format(emr_version, cluster_name), shell=True, check=True)
        subprocess.run('echo \"spark.jars $(ls {0} | tr \'\\n\' \',\')\" >> /opt/{1}/{2}/spark/conf/spark-defaults.conf'
              .format(spark_libs, emr_version, cluster_name), shell=True, check=True)
        subprocess.run('sed -i "/spark.executorEnv.PYTHONPATH/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf'
              .format(emr_version, cluster_name), shell=True, check=True)
        subprocess.run('sed -i "/spark.yarn.dist.files/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf'
              .format(emr_version, cluster_name), shell=True, check=True)
        subprocess.run('sudo chown {0}:{0} -R /opt/zeppelin/'.format(os_user), shell=True, check=True)
        subprocess.run('sudo systemctl daemon-reload', shell=True, check=True)
        subprocess.run('sudo service zeppelin-notebook stop', shell=True, check=True)
        subprocess.run('sudo service zeppelin-notebook start', shell=True, check=True)
        while not zeppelin_restarted:
            subprocess.run('sleep 5', shell=True, check=True)
            result = subprocess.run('sudo bash -c "nmap -p 8080 localhost | grep closed > /dev/null" ; echo $?', capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
            result = result[:1]
            if result == '1':
                zeppelin_restarted = True
        subprocess.run('sleep 5', shell=True, check=True)
        subprocess.run('echo \"Configuring emr spark interpreter for Zeppelin\"', shell=True, check=True)
        if multiple_emrs == 'true':
            while not port_number_found:
                port_free = subprocess.run('sudo bash -c "nmap -p ' + str(default_port) +
                                  ' localhost | grep closed > /dev/null" ; echo $?', capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
                port_free = port_free[:1]
                if port_free == '0':
                    livy_port = default_port
                    port_number_found = True
                else:
                    default_port += 1
            subprocess.run('sudo echo "livy.server.port = {0}" >> {1}conf/livy.conf'.format(str(livy_port), livy_path), shell=True, check=True)
            subprocess.run('sudo echo "livy.spark.master = yarn" >> {}conf/livy.conf'.format(livy_path), shell=True, check=True)
            if os.path.exists('{}conf/spark-blacklist.conf'.format(livy_path)):
                subprocess.run('sudo sed -i "s/^/#/g" {}conf/spark-blacklist.conf'.format(livy_path), shell=True, check=True)
            subprocess.run(''' sudo echo "export SPARK_HOME={0}" >> {1}conf/livy-env.sh'''.format(spark_dir, livy_path), shell=True, check=True)
            subprocess.run(''' sudo echo "export HADOOP_CONF_DIR={0}" >> {1}conf/livy-env.sh'''.format(yarn_dir, livy_path), shell=True, check=True)
            subprocess.run(''' sudo echo "export PYSPARK3_PYTHON=python{0}" >> {1}conf/livy-env.sh'''.format(python_version[0:3],
                                                                                                    livy_path), shell=True, check=True)
            template_file = "/tmp/dataengine-service_interpreter.json"
            fr = open(template_file, 'r+')
            text = fr.read()
            text = text.replace('CLUSTER_NAME', cluster_name)
            text = text.replace('SPARK_HOME', spark_dir)
            text = text.replace('ENDPOINTURL', endpoint_url)
            text = text.replace('LIVY_PORT', str(livy_port))
            fw = open(template_file, 'w')
            fw.write(text)
            fw.close()
            for _ in range(5):
                try:
                    subprocess.run("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " +
                          "@/tmp/dataengine-service_interpreter.json http://localhost:8080/api/interpreter/setting", shell=True, check=True)
                    break
                except:
                    subprocess.run('sleep 5', shell=True, check=True)
            subprocess.run('sudo cp /opt/livy-server-cluster.service /etc/systemd/system/livy-server-{}.service'
                  .format(str(livy_port)), shell=True, check=True)
            subprocess.run("sudo sed -i 's|OS_USER|{0}|' /etc/systemd/system/livy-server-{1}.service"
                  .format(os_user, str(livy_port)), shell=True, check=True)
            subprocess.run("sudo sed -i 's|LIVY_PATH|{0}|' /etc/systemd/system/livy-server-{1}.service"
                  .format(livy_path, str(livy_port)), shell=True, check=True)
            subprocess.run('sudo chmod 644 /etc/systemd/system/livy-server-{}.service'.format(str(livy_port)), shell=True, check=True)
            subprocess.run("sudo systemctl daemon-reload", shell=True, check=True)
            subprocess.run("sudo systemctl enable livy-server-{}".format(str(livy_port)), shell=True, check=True)
            subprocess.run('sudo systemctl start livy-server-{}'.format(str(livy_port)), shell=True, check=True)
        else:
            template_file = "/tmp/dataengine-service_interpreter.json"
            p_versions = ["2", "{}-dp".format(python_version[:3])]
            for p_version in p_versions:
                fr = open(template_file, 'r+')
                text = fr.read()
                text = text.replace('CLUSTERNAME', cluster_name)
                text = text.replace('PYTHONVERSION', p_version)
                text = text.replace('SPARK_HOME', spark_dir)
                text = text.replace('PYTHONVER_SHORT', p_version[:1])
                text = text.replace('ENDPOINTURL', endpoint_url)
                text = text.replace('DATAENGINE-SERVICE_VERSION', emr_version)
                tmp_file = "/tmp/emr_spark_py{}_interpreter.json".format(p_version)
                fw = open(tmp_file, 'w')
                fw.write(text)
                fw.close()
                for _ in range(5):
                    try:
                        subprocess.run("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " +
                              "@/tmp/emr_spark_py" + p_version +
                              "_interpreter.json http://localhost:8080/api/interpreter/setting", shell=True, check=True)
                        break
                    except:
                        subprocess.run('sleep 5', shell=True, check=True)
        subprocess.run('touch /home/' + os_user + '/.ensure_dir/dataengine-service_' + cluster_name + '_interpreter_ensured', shell=True, check=True)
    except:
        sys.exit(1)