in infrastructure-provisioning/src/general/lib/gcp/actions_lib.py [0:0]
def configure_zeppelin_dataproc_interpreter(self, dataproc_version, cluster_name, spark_dir,
os_user, yarn_dir, bucket, user_name, multiple_clusters):
try:
port_number_found = False
zeppelin_restarted = False
default_port = 8998
GCPActions().get_cluster_app_version(bucket, user_name, cluster_name, 'python')
with open('/tmp/python_version') as f:
python_version = f.read()
python_version = python_version[0:5]
livy_port = ''
livy_path = '/opt/{0}/{1}/livy/'.format(dataproc_version, cluster_name)
subprocess.run('echo \"Configuring dataproc path for Zeppelin\"', shell=True, check=True)
subprocess.run('sed -i \"s/^export SPARK_HOME.*/export SPARK_HOME=\/opt\/{0}\/{1}\/spark/\" /opt/zeppelin/conf/zeppelin-env.sh'
.format(dataproc_version, cluster_name), shell=True, check=True)
subprocess.run('sed -i \"s/^export HADOOP_CONF_DIR.*/export HADOOP_CONF_DIR=\/opt\/{0}\/{1}\/conf/\" /opt/{0}/{1}/spark/conf/spark-env.sh'
.format(dataproc_version, cluster_name), shell=True, check=True)
subprocess.run('sed -i "/spark.executorEnv.PYTHONPATH/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf'.format(dataproc_version, cluster_name), shell=True, check=True)
subprocess.run('sed -i "/spark.yarn.dist.files/d" /opt/{0}/{1}/spark/conf/spark-defaults.conf'.format(dataproc_version, cluster_name), shell=True, check=True)
subprocess.run('sudo chown {0}:{0} -R /opt/zeppelin/'.format(os_user), shell=True, check=True)
subprocess.run('sudo systemctl restart zeppelin-notebook.service', shell=True, check=True)
while not zeppelin_restarted:
subprocess.run('sleep 5', shell=True, check=True)
result = subprocess.run('sudo bash -c "nmap -p 8080 localhost | grep closed > /dev/null" ; echo $?', capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
result = result[:1]
if result == '1':
zeppelin_restarted = True
subprocess.run('sleep 5', shell=True, check=True)
subprocess.run('echo \"Configuring dataproc spark interpreter for Zeppelin\"', shell=True, check=True)
if multiple_clusters == 'true':
while not port_number_found:
port_free = subprocess.run('sudo bash -c "nmap -p ' + str(default_port) +
' localhost | grep closed > /dev/null" ; echo $?', capture_output=True, shell=True, check=True).stdout.decode('UTF-8').rstrip("\n\r")
port_free = port_free[:1]
if port_free == '0':
livy_port = default_port
port_number_found = True
else:
default_port += 1
subprocess.run('sudo echo "livy.server.port = {0}" >> {1}conf/livy.conf'.format(str(livy_port), livy_path), shell=True, check=True)
subprocess.run('sudo echo "livy.spark.master = yarn" >> {}conf/livy.conf'.format(livy_path), shell=True, check=True)
if os.path.exists('{}conf/spark-blacklist.conf'.format(livy_path)):
subprocess.run('sudo sed -i "s/^/#/g" {}conf/spark-blacklist.conf'.format(livy_path), shell=True, check=True)
subprocess.run('sudo echo "export SPARK_HOME={0}" >> {1}conf/livy-env.sh'.format(spark_dir, livy_path), shell=True, check=True)
subprocess.run('sudo echo "export HADOOP_CONF_DIR={0}" >> {1}conf/livy-env.sh'.format(yarn_dir, livy_path), shell=True, check=True)
subprocess.run('sudo echo "export PYSPARK3_PYTHON=python{0}" >> {1}conf/livy-env.sh'.format(python_version[0:3], livy_path), shell=True, check=True)
template_file = "/tmp/dataengine-service_interpreter.json"
fr = open(template_file, 'r+')
text = fr.read()
text = text.replace('CLUSTER_NAME', cluster_name)
text = text.replace('SPARK_HOME', spark_dir)
text = text.replace('LIVY_PORT', str(livy_port))
fw = open(template_file, 'w')
fw.write(text)
fw.close()
for _ in range(5):
try:
subprocess.run("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " +
"@/tmp/dataengine-service_interpreter.json http://localhost:8080/api/interpreter/setting", shell=True, check=True)
break
except:
subprocess.run('sleep 5', shell=True, check=True)
subprocess.run('sudo cp /opt/livy-server-cluster.service /etc/systemd/system/livy-server-{}.service'.format(str(livy_port)), shell=True, check=True)
subprocess.run("sudo sed -i 's|OS_USER|{0}|' /etc/systemd/system/livy-server-{1}.service".format(os_user, str(livy_port)), shell=True, check=True)
subprocess.run("sudo sed -i 's|LIVY_PATH|{0}|' /etc/systemd/system/livy-server-{1}.service".format(livy_path, str(livy_port)), shell=True, check=True)
subprocess.run('sudo chmod 644 /etc/systemd/system/livy-server-{}.service'.format(str(livy_port)), shell=True, check=True)
subprocess.run('sudo systemctl daemon-reload', shell=True, check=True)
subprocess.run('sudo systemctl enable livy-server-{}'.format(str(livy_port)), shell=True, check=True)
subprocess.run('sudo systemctl start livy-server-{}'.format(str(livy_port)), shell=True, check=True)
else:
template_file = "/tmp/dataengine-service_interpreter.json"
p_versions = ["2", "{}-dp".format(python_version[:3])]
for p_version in p_versions:
fr = open(template_file, 'r+')
text = fr.read()
text = text.replace('CLUSTERNAME', cluster_name)
text = text.replace('PYTHONVERSION', p_version)
text = text.replace('SPARK_HOME', spark_dir)
text = text.replace('PYTHONVER_SHORT', p_version[:1])
text = text.replace('DATAENGINE-SERVICE_VERSION', dataproc_version)
tmp_file = '/tmp/dataproc_spark_py{}_interpreter.json'.format(p_version)
fw = open(tmp_file, 'w')
fw.write(text)
fw.close()
for _ in range(5):
try:
subprocess.run("curl --noproxy localhost -H 'Content-Type: application/json' -X POST -d " +
"@/tmp/dataproc_spark_py{}_interpreter.json http://localhost:8080/api/interpreter/setting".format(p_version), shell=True, check=True)
break
except:
subprocess.run('sleep 5', shell=True, check=True)
subprocess.run('touch /home/{0}/.ensure_dir/dataengine-service_{1}_interpreter_ensured'.format(os_user, cluster_name), shell=True, check=True)
except:
sys.exit(1)