def _pig_job_executor()

in o2a/o2a_libs/src/o2a_lib/el_fs_functions.py [0:0]


def _pig_job_executor(cmd: str):
    """
    Run command on Dataproc cluster.
    """
    cluster_name = os.environ.get("DATAPROC_CLUSTER")
    region = os.environ.get("DATAPROC_REGION")

    # The delimiter is used to split the output
    delimiter = "c822c1b63853ed273b89687ac505f9fa"  # md5 hash of Google

    dataproc_cmd = [
        "gcloud",
        "dataproc",
        "jobs",
        "submit",
        "pig",
        f"--execute=sh echo {delimiter};{cmd};sh echo {delimiter}",
        f"--cluster={cluster_name}",
        f"--region={region}",
    ]

    print("Executing for output: '{}'".format(" ".join(dataproc_cmd)))
    process = subprocess.Popen(args=cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    output, err = process.communicate()
    retcode = process.poll()

    if retcode:
        print("Error when executing '{}'".format(" ".join(cmd)))
        print("Stdout: {}".format(output.decode("utf-8")))
        print("Stderr: {}".format(err.decode("utf-8")))
        raise AirflowException(
            "Retcode {} on {} with stdout: {}, stderr: {}".format(
                retcode, " ".join(cmd), output.decode("utf-8"), err.decode("utf-8")
            )
        )

    _, out, _ = err.decode("utf-8").split(delimiter, 3)
    return out