def create_cluster()

in utils/dataproc.py [0:0]


    def create_cluster(self):
        """Return a DataprocCreateClusterOperator."""
        properties = {}

        # Google cloud storage requires object.create permission when reading from pyspark
        properties["core:fs.gs.implicit.dir.repair.enable"] = "false"

        # Set hadoop properties to access s3 from dataproc
        if self.aws_conn_id:
            for key, value in zip(
                ("access.key", "secret.key", "session.token"),
                AwsBaseHook(
                    aws_conn_id=self.aws_conn_id, client_type="s3"
                ).get_credentials(),
            ):
                if value is not None:
                    properties["core:fs.s3a." + key] = value
                    # For older spark versions we need to set the properties differently
                    if key == "access.key":
                        properties["core:fs.s3.awsAccessKeyId"] = value
                    elif key == "secret.key":
                        properties["core:fs.s3.awsSecretAccessKey"] = value

        properties.update(self.additional_properties)

        metadata = {
            "gcs-connector-version": "1.9.16",
            "bigquery-connector-version": "0.13.6",
        }
        metadata.update(self.additional_metadata)

        cluster_generator = ClusterGenerator(
            project_id=self.project_id,
            num_workers=self.num_workers,
            subnetwork_uri=self.subnetwork_uri,
            internal_ip_only=self.internal_ip_only,
            storage_bucket=self.storage_bucket,
            init_actions_uris=self.init_actions_uris,
            metadata=metadata,
            image_version=self.image_version,
            properties=properties,
            optional_components=self.optional_components,
            master_machine_type=self.master_machine_type,
            master_disk_type=self.master_disk_type,
            master_disk_size=self.master_disk_size,
            worker_machine_type=self.worker_machine_type,
            worker_disk_type=self.worker_disk_type,
            worker_disk_size=self.worker_disk_size,
            num_preemptible_workers=self.num_preemptible_workers,
            service_account=self.service_account,
            idle_delete_ttl=self.idle_delete_ttl,
            auto_delete_ttl=self.auto_delete_ttl,
        )

        cluster_config = cluster_generator.make()

        # The DataprocCreateClusterOperator and ClusterGenerator dont support component gateway or local ssds
        # ClusterConfig format is
        # https://cloud.google.com/dataproc/docs/reference/rpc/google.cloud.dataproc.v1#google.cloud.dataproc.v1.ClusterConfig
        if self.install_component_gateway:
            cluster_config.update(
                {"endpoint_config": {"enable_http_port_access": True}}
            )

        if self.master_num_local_ssds > 0:
            master_instance_group_config = cluster_config["master_config"]
            master_instance_group_config["disk_config"]["num_local_ssds"] = (
                self.master_num_local_ssds
            )
            cluster_config.update({"master_config": master_instance_group_config})

        if self.worker_num_local_ssds > 0:
            worker_instance_group_config = cluster_config["worker_config"]
            worker_instance_group_config["disk_config"]["num_local_ssds"] = (
                self.worker_num_local_ssds
            )
            cluster_config.update({"worker_config": worker_instance_group_config})

        return DataprocCreateClusterOperator(
            task_id="create_dataproc_cluster",
            cluster_name=self.cluster_name,
            project_id=self.project_id,
            use_if_exists=True,
            delete_on_error=True,
            labels={
                "env": os.getenv("DEPLOY_ENVIRONMENT", "env_not_set"),
                "owner": os.getenv("AIRFLOW_CTX_DAG_OWNER", "owner_not_set"),
                "jobname": self.job_name.lower().replace("_", "-"),
            },
            gcp_conn_id=self.gcp_conn_id,
            region=self.region,
            cluster_config=cluster_config,
        )