utils/dataproc.py (517 lines of code) (raw):

import os from collections import namedtuple from airflow import models from airflow.exceptions import AirflowException from airflow.operators.bash import BashOperator from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from airflow.providers.google.cloud.hooks.dataproc import DataProcJobBuilder # When google deprecates dataproc_v1beta2 in DataprocHook/Operator classes # We can import these from our patched code, rather than upgrading/deploying # apache-airflow-providers-google > 6.0.0, and google-cloud-dataproc > 2.5.0 # from utils.patched.dataproc_operator import ( from airflow.providers.google.cloud.operators.dataproc import ( ClusterGenerator, DataprocCreateClusterOperator, DataprocDeleteClusterOperator, DataprocSubmitJobOperator, ) class DataProcHelper: """Helper class for creating/deleting dataproc clusters.""" def __init__( self, cluster_name=None, job_name=None, num_workers=2, image_version="1.4-debian10", region="us-west1", subnetwork_uri=None, internal_ip_only=None, idle_delete_ttl=14400, auto_delete_ttl=28800, master_machine_type="n1-standard-8", worker_machine_type="n1-standard-4", num_preemptible_workers=0, service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com", init_actions_uris=None, additional_metadata=None, additional_properties=None, optional_components=None, install_component_gateway=True, aws_conn_id=None, gcp_conn_id="google_cloud_airflow_dataproc", project_id="airflow-dataproc", artifact_bucket="moz-fx-data-prod-airflow-dataproc-artifacts", storage_bucket="moz-fx-data-prod-dataproc-scratch", master_disk_type="pd-standard", master_disk_size=1024, master_num_local_ssds=0, worker_disk_type="pd-standard", worker_disk_size=1024, worker_num_local_ssds=0, ): if optional_components is None: optional_components = ["ANACONDA"] self.cluster_name = cluster_name self.job_name = job_name self.num_workers = num_workers self.image_version = image_version self.region = region self.subnetwork_uri = subnetwork_uri self.internal_ip_only = internal_ip_only self.idle_delete_ttl = idle_delete_ttl self.auto_delete_ttl = auto_delete_ttl self.master_machine_type = master_machine_type self.worker_machine_type = worker_machine_type self.num_preemptible_workers = num_preemptible_workers self.service_account = service_account # The bucket with a default dataproc init script self.artifact_bucket = artifact_bucket self.storage_bucket = storage_bucket self.master_disk_type = master_disk_type self.master_disk_size = master_disk_size self.master_num_local_ssds = master_num_local_ssds self.worker_disk_type = worker_disk_type self.worker_disk_size = worker_disk_size self.worker_num_local_ssds = worker_num_local_ssds if init_actions_uris is None: self.init_actions_uris = [ f"gs://{self.artifact_bucket}/bootstrap/dataproc_init.sh" ] else: self.init_actions_uris = init_actions_uris if additional_metadata is None: self.additional_metadata = {} else: self.additional_metadata = additional_metadata if additional_properties is None: self.additional_properties = {} else: self.additional_properties = additional_properties self.optional_components = optional_components self.install_component_gateway = install_component_gateway self.aws_conn_id = aws_conn_id self.gcp_conn_id = gcp_conn_id self.project_id = project_id def create_cluster(self): """Return a DataprocCreateClusterOperator.""" properties = {} # Google cloud storage requires object.create permission when reading from pyspark properties["core:fs.gs.implicit.dir.repair.enable"] = "false" # Set hadoop properties to access s3 from dataproc if self.aws_conn_id: for key, value in zip( ("access.key", "secret.key", "session.token"), AwsBaseHook( aws_conn_id=self.aws_conn_id, client_type="s3" ).get_credentials(), ): if value is not None: properties["core:fs.s3a." + key] = value # For older spark versions we need to set the properties differently if key == "access.key": properties["core:fs.s3.awsAccessKeyId"] = value elif key == "secret.key": properties["core:fs.s3.awsSecretAccessKey"] = value properties.update(self.additional_properties) metadata = { "gcs-connector-version": "1.9.16", "bigquery-connector-version": "0.13.6", } metadata.update(self.additional_metadata) cluster_generator = ClusterGenerator( project_id=self.project_id, num_workers=self.num_workers, subnetwork_uri=self.subnetwork_uri, internal_ip_only=self.internal_ip_only, storage_bucket=self.storage_bucket, init_actions_uris=self.init_actions_uris, metadata=metadata, image_version=self.image_version, properties=properties, optional_components=self.optional_components, master_machine_type=self.master_machine_type, master_disk_type=self.master_disk_type, master_disk_size=self.master_disk_size, worker_machine_type=self.worker_machine_type, worker_disk_type=self.worker_disk_type, worker_disk_size=self.worker_disk_size, num_preemptible_workers=self.num_preemptible_workers, service_account=self.service_account, idle_delete_ttl=self.idle_delete_ttl, auto_delete_ttl=self.auto_delete_ttl, ) cluster_config = cluster_generator.make() # The DataprocCreateClusterOperator and ClusterGenerator dont support component gateway or local ssds # ClusterConfig format is # https://cloud.google.com/dataproc/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.ClusterConfig if self.install_component_gateway: cluster_config.update( {"endpoint_config": {"enable_http_port_access": True}} ) if self.master_num_local_ssds > 0: master_instance_group_config = cluster_config["master_config"] master_instance_group_config["disk_config"]["num_local_ssds"] = ( self.master_num_local_ssds ) cluster_config.update({"master_config": master_instance_group_config}) if self.worker_num_local_ssds > 0: worker_instance_group_config = cluster_config["worker_config"] worker_instance_group_config["disk_config"]["num_local_ssds"] = ( self.worker_num_local_ssds ) cluster_config.update({"worker_config": worker_instance_group_config}) return DataprocCreateClusterOperator( task_id="create_dataproc_cluster", cluster_name=self.cluster_name, project_id=self.project_id, use_if_exists=True, delete_on_error=True, labels={ "env": os.getenv("DEPLOY_ENVIRONMENT", "env_not_set"), "owner": os.getenv("AIRFLOW_CTX_DAG_OWNER", "owner_not_set"), "jobname": self.job_name.lower().replace("_", "-"), }, gcp_conn_id=self.gcp_conn_id, region=self.region, cluster_config=cluster_config, ) def delete_cluster(self): """Return a DataprocDeleteClusterOperator.""" return DataprocDeleteClusterOperator( task_id="delete_dataproc_cluster", cluster_name=self.cluster_name, region=self.region, gcp_conn_id=self.gcp_conn_id, project_id=self.project_id, ) # End DataProcHelper def moz_dataproc_pyspark_runner( parent_dag_name=None, dag_name="run_pyspark_on_dataproc", default_args=None, cluster_name=None, num_workers=2, image_version="1.4-debian10", region="us-west1", subnetwork_uri=None, internal_ip_only=None, idle_delete_ttl=10800, auto_delete_ttl=21600, master_machine_type="n1-standard-8", worker_machine_type="n1-standard-4", num_preemptible_workers=0, service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com", init_actions_uris=None, additional_metadata=None, additional_properties=None, optional_components=None, install_component_gateway=True, python_driver_code=None, py_args=None, job_name=None, aws_conn_id=None, gcp_conn_id="google_cloud_airflow_dataproc", project_id="airflow-dataproc", artifact_bucket="moz-fx-data-prod-airflow-dataproc-artifacts", storage_bucket="moz-fx-data-prod-dataproc-scratch", master_disk_type="pd-standard", worker_disk_type="pd-standard", master_disk_size=1024, worker_disk_size=1024, master_num_local_ssds=0, worker_num_local_ssds=0, ): """ Create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway. Then we call DataprocSubmitJobOperator to execute the pyspark script defined by the argument python_driver_code. Once that succeeds, we teardown the cluster. **Example**: :: # Unsalted cluster name so subsequent runs fail if the cluster name exists cluster_name = 'test-dataproc-cluster-hwoo' # Defined in Airflow's UI -> Admin -> Connections gcp_conn_id = 'google_cloud_airflow_dataproc' run_dataproc_pyspark = SubDagOperator( task_id='run_dataproc_pyspark', dag=dag, subdag = moz_dataproc_pyspark_runner( parent_dag_name=dag.dag_id, dag_name='run_dataproc_pyspark', job_name='Do_something_on_pyspark', default_args=default_args, cluster_name=cluster_name, python_driver_code='gs://some_bucket/some_py_script.py', py_args=["-d", "{{ ds_nodash }}"], gcp_conn_id=gcp_conn_id) ) Airflow related args: --- :param str parent_dag_name: Parent dag name. :param str dag_name: Dag name. :param dict default_args: Dag configuration. Dataproc Cluster related args: --- :param str cluster_name: The name of the dataproc cluster. :param int num_workers: The number of spark workers. :param str image_version: The image version of software to use for dataproc cluster. :param str region: Region where the dataproc cluster will be located. Zone will be chosen automatically :param str subnetwork_uri: The subnetwork uri to be used for machine communication, cannot be specified with network_uri. Only need this if setting internal_ip_only = True. (See next parameter) :param bool internal_ip_only: If True, cluster nodes will only have internal IP addresses. Can only be enabled with subnetwork_uri enabled networks. We use this for NAT'd dataproc clusters whose outbound traffic needs to be whitelisted. To use a NAT'd cluster, set subnetwork_uri='default', internal_ip_only=True, and region=us-west2-a|b|c :param int idle_delete_ttl: The duration in seconds to keep idle cluster alive. :param int auto_delete_ttl: The duration in seconds that the cluster will live. :param str master_machine_type: Compute engine machine type to use for master. :param str worker_machine_type: Compute engine machine type to use for the workers. :param int num_preemptible_workers: Number of preemptible worker nodes to spin up. :param str service_account: The service account for spark VMs to use. For example if cross project access is needed. Note that this svc account needs the following permissions: roles/logging.logWriter and roles/storage.objectAdmin. :param list init_actions_uris: List of GCS uri's containing dataproc init scripts. :param dict additional_metadata Custom metadata keys and values, might be used to configure initialization actions. :param dict additional_properties Custom cluster properties, can be used to configure cluster components, add Spark packages, etc. :param str job_name: Name of the spark job to run. :param str aws_conn_id: Airflow connection id for S3 access (if needed). :param str gcp_conn_id: The connection ID to use connecting to GCP. :param str project_id: The project ID corresponding to the gcp_conn_id. We add this because the dev environment doesn't parse it out correctly from the dummy connections. :param str artifact_bucket: Path to resources for bootstrapping the dataproc cluster :param str storage_bucket: Path to scratch bucket for intermediate cluster results :param list optional_components: List of optional components to install on cluster Defaults to ['ANACONDA'] for now since JUPYTER is broken. :param str install_component_gateway: Enable alpha feature component gateway. :param master_disk_type: Type of the boot disk for the master node (default is ``pd-standard``). Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or ``pd-standard`` (Persistent Disk Hard Disk Drive). :type master_disk_type: str :param master_disk_size: Disk size for the master node :type master_disk_size: int :param master_num_local_ssds : Number of local SSDs to mount (default is 0) :type master_num_local_ssds : int :param worker_disk_type: Type of the boot disk for the worker node (default is ``pd-standard``). Valid values: ``pd-ssd`` (Persistent Disk Solid State Drive) or ``pd-standard`` (Persistent Disk Hard Disk Drive). :type worker_disk_type: str :param worker_disk_size: Disk size for the worker node :type worker_disk_size: int :param worker_num_local_ssds : Number of local SSDs to mount (default is 0) :type worker_num_local_ssds : int Pyspark related args: --- :param str python_driver_code: The Hadoop Compatible Filesystem (HCFS) URI of the main Python file to use as the driver. Must be a .py file. :param list py_args: Arguments for the pyspark job. """ if optional_components is None: optional_components = ["ANACONDA"] if cluster_name is None or python_driver_code is None: raise AirflowException("Please specify cluster_name and/or python_driver_code.") dataproc_helper = DataProcHelper( cluster_name=cluster_name, job_name=job_name, num_workers=num_workers, image_version=image_version, region=region, subnetwork_uri=subnetwork_uri, internal_ip_only=internal_ip_only, idle_delete_ttl=idle_delete_ttl, auto_delete_ttl=auto_delete_ttl, master_machine_type=master_machine_type, worker_machine_type=worker_machine_type, num_preemptible_workers=num_preemptible_workers, service_account=service_account, init_actions_uris=init_actions_uris, optional_components=optional_components, additional_metadata=additional_metadata, additional_properties=additional_properties, install_component_gateway=install_component_gateway, aws_conn_id=aws_conn_id, gcp_conn_id=gcp_conn_id, project_id=project_id, artifact_bucket=artifact_bucket, storage_bucket=storage_bucket, master_disk_type=master_disk_type, master_disk_size=master_disk_size, worker_disk_type=worker_disk_type, worker_disk_size=worker_disk_size, master_num_local_ssds=master_num_local_ssds, worker_num_local_ssds=worker_num_local_ssds, ) _dag_name = f"{parent_dag_name}.{dag_name}" with models.DAG(_dag_name, default_args=default_args) as dag: create_dataproc_cluster = dataproc_helper.create_cluster() dataproc_job_builder = DataProcJobBuilder( job_type="pyspark_job", task_id="run_dataproc_pyspark", cluster_name=cluster_name, project_id=project_id, ) dataproc_job_builder.set_job_name(job_name) dataproc_job_builder.set_python_main(python_driver_code) dataproc_job_builder.add_args(py_args) dataproc_job = dataproc_job_builder.build() run_pyspark_on_dataproc = DataprocSubmitJobOperator( task_id="run_dataproc_pyspark", job=dataproc_job["job"], region=region, gcp_conn_id=gcp_conn_id, project_id=project_id, ) delete_dataproc_cluster = dataproc_helper.delete_cluster() create_dataproc_cluster >> run_pyspark_on_dataproc >> delete_dataproc_cluster return dag # End moz_dataproc_pyspark_runner def moz_dataproc_jar_runner( parent_dag_name=None, dag_name="run_script_on_dataproc", default_args=None, cluster_name=None, num_workers=2, image_version="1.4-debian10", region="us-west1", subnetwork_uri=None, internal_ip_only=None, idle_delete_ttl=14400, auto_delete_ttl=28800, master_machine_type="n1-standard-8", worker_machine_type="n1-standard-4", num_preemptible_workers=0, service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com", init_actions_uris=None, optional_components=None, install_component_gateway=True, jar_urls=None, main_class=None, jar_args=None, job_name=None, aws_conn_id=None, gcp_conn_id="google_cloud_airflow_dataproc", project_id="airflow-dataproc", master_disk_type="pd-standard", worker_disk_type="pd-standard", master_disk_size=1024, worker_disk_size=1024, master_num_local_ssds=0, worker_num_local_ssds=0, ): """ Create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway. Then we call DataprocSubmitJobOperator to execute the jar defined by the arguments jar_urls and main_class. Once that succeeds, we teardown the cluster. **Example**: :: # Unsalted cluster name so subsequent runs fail if the cluster name exists cluster_name = 'test-dataproc-cluster-hwoo' # Defined in Airflow's UI -> Admin -> Connections gcp_conn_id = 'google_cloud_airflow_dataproc' run_dataproc_jar = SubDagOperator( task_id='run_dataproc_jar', dag=dag, subdag = moz_dataproc_jar_runner( parent_dag_name=dag.dag_id, dag_name='run_dataproc_jar', job_name='Run_some_spark_jar_on_dataproc', default_args=default_args, cluster_name=cluster_name, jar_urls=['gs://some_bucket/some_jar.jar'], main_class='com.mozilla.path.to.ClassName', jar_args=["-d", "{{ ds_nodash }}"], gcp_conn_id=gcp_conn_id) ) Airflow related args: --- See moz_dataproc_pyspark_runner Dataproc Cluster related args: --- See moz_dataproc_pyspark_runner Jar runner related args: --- :param list jar_urls: URIs to jars provisioned in Cloud Storage (example: for UDFs and libs) and are ideal to put in default arguments. :param str main_class: Name of the job class entrypoint to execute. :param list jar_args: Arguments for the job. """ if optional_components is None: optional_components = ["ANACONDA"] if cluster_name is None or jar_urls is None or main_class is None: raise AirflowException( "Please specify cluster_name, jar_urls, and/or main_class." ) dataproc_helper = DataProcHelper( cluster_name=cluster_name, job_name=job_name, num_workers=num_workers, image_version=image_version, region=region, subnetwork_uri=subnetwork_uri, internal_ip_only=internal_ip_only, idle_delete_ttl=idle_delete_ttl, auto_delete_ttl=auto_delete_ttl, master_machine_type=master_machine_type, worker_machine_type=worker_machine_type, num_preemptible_workers=num_preemptible_workers, service_account=service_account, init_actions_uris=init_actions_uris, optional_components=optional_components, install_component_gateway=install_component_gateway, aws_conn_id=aws_conn_id, gcp_conn_id=gcp_conn_id, project_id=project_id, master_disk_type=master_disk_type, master_disk_size=master_disk_size, worker_disk_type=worker_disk_type, worker_disk_size=worker_disk_size, master_num_local_ssds=master_num_local_ssds, worker_num_local_ssds=worker_num_local_ssds, ) _dag_name = f"{parent_dag_name}.{dag_name}" with models.DAG(_dag_name, default_args=default_args) as dag: create_dataproc_cluster = dataproc_helper.create_cluster() dataproc_job_builder = DataProcJobBuilder( job_type="spark_job", task_id="run_jar_on_dataproc", cluster_name=cluster_name, project_id=project_id, ) dataproc_job_builder.set_job_name(job_name) dataproc_job_builder.add_jar_file_uris(jar_urls) dataproc_job_builder.set_main(main_class=main_class) dataproc_job_builder.add_args(jar_args) dataproc_job = dataproc_job_builder.build() run_jar_on_dataproc = DataprocSubmitJobOperator( region=region, task_id="run_jar_on_dataproc", job=dataproc_job["job"], gcp_conn_id=gcp_conn_id, project_id=project_id, ) delete_dataproc_cluster = dataproc_helper.delete_cluster() create_dataproc_cluster >> run_jar_on_dataproc >> delete_dataproc_cluster return dag # End moz_dataproc_jar_runner def _format_envvar(env=None): # Use a default value if an environment dictionary isn't supplied return " ".join([f"{k}={v}" for k, v in (env or {}).items()]) def moz_dataproc_scriptrunner( parent_dag_name=None, dag_name="run_script_on_dataproc", default_args=None, cluster_name=None, num_workers=2, image_version="1.4-debian10", region="us-west1", subnetwork_uri=None, internal_ip_only=None, idle_delete_ttl=14400, auto_delete_ttl=28800, master_machine_type="n1-standard-8", worker_machine_type="n1-standard-4", num_preemptible_workers=0, service_account="dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com", init_actions_uris=None, optional_components=None, install_component_gateway=True, uri=None, env=None, arguments=None, job_name=None, aws_conn_id=None, gcp_conn_id="google_cloud_airflow_dataproc", project_id="airflow-dataproc", master_disk_type="pd-standard", worker_disk_type="pd-standard", master_disk_size=1024, worker_disk_size=1024, master_num_local_ssds=0, worker_num_local_ssds=0, ): """ Create a GCP Dataproc cluster with Anaconda/Jupyter/Component gateway. Then we execute a script uri (either https or gcs) similar to how we use our custom AWS EmrSparkOperator. This will call DataprocSubmitJobOperator using EMR's script-runner.jar, which then executes the airflow_gcp.sh entrypoint script. The entrypoint script expects another script uri, along with it's arguments, as parameters. Once that succeeds, we teardown the cluster. **Example**: :: # Unsalted cluster name so subsequent runs fail if the cluster name exists cluster_name = 'test-dataproc-cluster-hwoo' # Defined in Airflow's UI -> Admin -> Connections gcp_conn_id = 'google_cloud_airflow_dataproc' run_dataproc_script = SubDagOperator( task_id='run_dataproc_script', dag=dag, subdag = moz_dataproc_scriptrunner( parent_dag_name=dag.dag_id, dag_name='run_dataproc_script', default_args=default_args, cluster_name=cluster_name, job_name='Run_a_script_on_dataproc', uri='https://raw.githubusercontent.com/mozilla/telemetry-airflow/main/jobs/some_bash_or_py_script.py', env={"date": "{{ ds_nodash }}"}, arguments="-d {{ ds_nodash }}", gcp_conn_id=gcp_conn_id) ) Airflow related args: --- See moz_dataproc_pyspark_runner Dataproc Cluster related args: --- See moz_dataproc_pyspark_runner Scriptrunner specific args: --- :param str uri: The HTTP or GCS URI of the script to run. Can be .py, .jar, or other type of script (e.g. bash). Is ran via the airflow_gcp.sh entrypoint. Ipynb is no longer supported. :param dict env: If env is not None, it must be a mapping that defines the environment variables for the new process (templated). :param str arguments: Passed to `airflow_gcp.sh`, passed as one long string of space separated args. """ if optional_components is None: optional_components = ["ANACONDA"] if job_name is None or uri is None or cluster_name is None: raise AirflowException("Please specify job_name, uri, and cluster_name.") dataproc_helper = DataProcHelper( cluster_name=cluster_name, job_name=job_name, num_workers=num_workers, image_version=image_version, region=region, subnetwork_uri=subnetwork_uri, internal_ip_only=internal_ip_only, idle_delete_ttl=idle_delete_ttl, auto_delete_ttl=auto_delete_ttl, master_machine_type=master_machine_type, worker_machine_type=worker_machine_type, num_preemptible_workers=num_preemptible_workers, service_account=service_account, init_actions_uris=init_actions_uris, optional_components=optional_components, install_component_gateway=install_component_gateway, aws_conn_id=aws_conn_id, gcp_conn_id=gcp_conn_id, project_id=project_id, master_disk_type=master_disk_type, master_disk_size=master_disk_size, worker_disk_type=worker_disk_type, worker_disk_size=worker_disk_size, master_num_local_ssds=master_num_local_ssds, worker_num_local_ssds=worker_num_local_ssds, ) _dag_name = f"{parent_dag_name}.{dag_name}" environment = _format_envvar(env) script_bucket = "moz-fx-data-prod-airflow-dataproc-artifacts" jar_url = f"gs://{script_bucket}/bin/script-runner.jar" args = [ f"gs://{script_bucket}/bootstrap/airflow_gcp.sh", "--job-name", job_name, "--uri", uri, "--environment", environment, ] if arguments: args += ["--arguments", arguments] with models.DAG(_dag_name, default_args=default_args) as dag: create_dataproc_cluster = dataproc_helper.create_cluster() # Run DataprocSubmitJobOperator with script-runner.jar pointing to airflow_gcp.sh. dataproc_job_builder = DataProcJobBuilder( job_type="spark_job", task_id="run_script_on_dataproc", cluster_name=cluster_name, project_id=project_id, ) dataproc_job_builder.set_job_name(job_name) dataproc_job_builder.add_jar_file_uris([jar_url]) dataproc_job_builder.set_main( main_class="com.amazon.elasticmapreduce.scriptrunner.ScriptRunner" ) dataproc_job_builder.add_args(args) dataproc_job = dataproc_job_builder.build() run_script_on_dataproc = DataprocSubmitJobOperator( region=region, task_id="run_script_on_dataproc", job=dataproc_job["job"], gcp_conn_id=gcp_conn_id, project_id=project_id, ) delete_dataproc_cluster = dataproc_helper.delete_cluster() create_dataproc_cluster >> run_script_on_dataproc >> delete_dataproc_cluster return dag # End moz_dataproc_scriptrunner def copy_artifacts_dev(dag, project_id, artifact_bucket, storage_bucket): """ Bootstrap a dataproc job for local testing. This job requires setting GOOGLE_APPLICATION_CREDENTIALS before starting the airflow container. It will copy the contents of the local jobs and dataproc_boostrap folders to the artifacts bucket, and create a scratch storage bucket for dataproc. :dag DAG: The dag to register the job :project_id str: The project id, necessary for setting the default project :artifact_bucket str: The bucket for storing bootstrap artifacts :storage_bucket str: The scratch bucket for dataproc """ return BashOperator( task_id="copy_to_dev_artifacts", bash_command=""" gcloud auth activate-service-account --key-file ~/.credentials || cat ~/.credentials gcloud config set project ${PROJECT_ID} gsutil mb gs://${ARTIFACT_BUCKET} gsutil mb gs://${STORAGE_BUCKET} gsutil -m cp -r ~/dataproc_bootstrap gs://${ARTIFACT_BUCKET} gsutil -m cp -r ~/jobs gs://${ARTIFACT_BUCKET} echo "listing artifacts..." gsutil ls -r gs://${ARTIFACT_BUCKET} """, env={ # https://github.com/GoogleCloudPlatform/gsutil/issues/236 "CLOUDSDK_PYTHON": "python", "PROJECT_ID": project_id, "ARTIFACT_BUCKET": artifact_bucket, "STORAGE_BUCKET": storage_bucket, }, dag=dag, ) # parameters that can be used to reconfigure a dataproc job for dev testing DataprocParameters = namedtuple( "DataprocParameters", [ "conn_id", "project_id", "is_dev", "client_email", "artifact_bucket", "storage_bucket", "output_bucket", ], ) def get_dataproc_parameters(conn_id="google_cloud_airflow_dataproc"): """ Can be used to gather parameters that correspond to development parameters. The provided connection string should be a Google Cloud connection and should either be the production default ("dataproc-runner-prod"), or a service key associated with a sandbox account. """ dev_project_id = "replace_me" dev_client_email = "replace_me" is_dev = os.environ.get("DEPLOY_ENVIRONMENT") == "dev" project_id = "airflow-dataproc" if is_dev else dev_project_id client_email = ( dev_client_email if is_dev else "dataproc-runner-prod@airflow-dataproc.iam.gserviceaccount.com" ) artifact_bucket = ( f"{project_id}-dataproc-artifacts" if is_dev else "moz-fx-data-prod-airflow-dataproc-artifacts" ) storage_bucket = ( f"{project_id}-dataproc-scratch" if is_dev else "moz-fx-data-prod-dataproc-scratch" ) output_bucket = artifact_bucket if is_dev else "airflow-dataproc-bq-parquet-exports" return DataprocParameters( conn_id, project_id, is_dev, client_email, artifact_bucket, storage_bucket, output_bucket, )