in o2a/o2a_libs/src/o2a_lib/el_fs_functions.py [0:0]
def _pig_job_executor(cmd: str):
"""
Run command on Dataproc cluster.
"""
cluster_name = os.environ.get("DATAPROC_CLUSTER")
region = os.environ.get("DATAPROC_REGION")
# The delimiter is used to split the output
delimiter = "c822c1b63853ed273b89687ac505f9fa" # md5 hash of Google
dataproc_cmd = [
"gcloud",
"dataproc",
"jobs",
"submit",
"pig",
f"--execute=sh echo {delimiter};{cmd};sh echo {delimiter}",
f"--cluster={cluster_name}",
f"--region={region}",
]
print("Executing for output: '{}'".format(" ".join(dataproc_cmd)))
process = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = process.communicate()
retcode = process.poll()
if retcode:
print("Error when executing '{}'".format(" ".join(cmd)))
print("Stdout: {}".format(output.decode("utf-8")))
print("Stderr: {}".format(err.decode("utf-8")))
raise AirflowException(
"Retcode {} on {} with stdout: {}, stderr: {}".format(
retcode, " ".join(cmd), output.decode("utf-8"), err.decode("utf-8")
)
)
_, out, _ = err.decode("utf-8").split(delimiter, 3)
return out