in utils/dataproc.py [0:0]
def create_cluster(self):
"""Return a DataprocCreateClusterOperator."""
properties = {}
# Google cloud storage requires object.create permission when reading from pyspark
properties["core:fs.gs.implicit.dir.repair.enable"] = "false"
# Set hadoop properties to access s3 from dataproc
if self.aws_conn_id:
for key, value in zip(
("access.key", "secret.key", "session.token"),
AwsBaseHook(
aws_conn_id=self.aws_conn_id, client_type="s3"
).get_credentials(),
):
if value is not None:
properties["core:fs.s3a." + key] = value
# For older spark versions we need to set the properties differently
if key == "access.key":
properties["core:fs.s3.awsAccessKeyId"] = value
elif key == "secret.key":
properties["core:fs.s3.awsSecretAccessKey"] = value
properties.update(self.additional_properties)
metadata = {
"gcs-connector-version": "1.9.16",
"bigquery-connector-version": "0.13.6",
}
metadata.update(self.additional_metadata)
cluster_generator = ClusterGenerator(
project_id=self.project_id,
num_workers=self.num_workers,
subnetwork_uri=self.subnetwork_uri,
internal_ip_only=self.internal_ip_only,
storage_bucket=self.storage_bucket,
init_actions_uris=self.init_actions_uris,
metadata=metadata,
image_version=self.image_version,
properties=properties,
optional_components=self.optional_components,
master_machine_type=self.master_machine_type,
master_disk_type=self.master_disk_type,
master_disk_size=self.master_disk_size,
worker_machine_type=self.worker_machine_type,
worker_disk_type=self.worker_disk_type,
worker_disk_size=self.worker_disk_size,
num_preemptible_workers=self.num_preemptible_workers,
service_account=self.service_account,
idle_delete_ttl=self.idle_delete_ttl,
auto_delete_ttl=self.auto_delete_ttl,
)
cluster_config = cluster_generator.make()
# The DataprocCreateClusterOperator and ClusterGenerator dont support component gateway or local ssds
# ClusterConfig format is
# https://cloud.google.com/dataproc/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.ClusterConfig
if self.install_component_gateway:
cluster_config.update(
{"endpoint_config": {"enable_http_port_access": True}}
)
if self.master_num_local_ssds > 0:
master_instance_group_config = cluster_config["master_config"]
master_instance_group_config["disk_config"]["num_local_ssds"] = (
self.master_num_local_ssds
)
cluster_config.update({"master_config": master_instance_group_config})
if self.worker_num_local_ssds > 0:
worker_instance_group_config = cluster_config["worker_config"]
worker_instance_group_config["disk_config"]["num_local_ssds"] = (
self.worker_num_local_ssds
)
cluster_config.update({"worker_config": worker_instance_group_config})
return DataprocCreateClusterOperator(
task_id="create_dataproc_cluster",
cluster_name=self.cluster_name,
project_id=self.project_id,
use_if_exists=True,
delete_on_error=True,
labels={
"env": os.getenv("DEPLOY_ENVIRONMENT", "env_not_set"),
"owner": os.getenv("AIRFLOW_CTX_DAG_OWNER", "owner_not_set"),
"jobname": self.job_name.lower().replace("_", "-"),
},
gcp_conn_id=self.gcp_conn_id,
region=self.region,
cluster_config=cluster_config,
)