def _build_cluster_data()

in providers/google/src/airflow/providers/google/cloud/operators/dataproc.py [0:0]


    def _build_cluster_data(self):
        if self.zone:
            master_type_uri = (
                f"projects/{self.project_id}/zones/{self.zone}/machineTypes/{self.master_machine_type}"
            )
            worker_type_uri = (
                f"projects/{self.project_id}/zones/{self.zone}/machineTypes/{self.worker_machine_type}"
            )
        else:
            master_type_uri = self.master_machine_type
            worker_type_uri = self.worker_machine_type

        cluster_data = {
            "gce_cluster_config": {},
            "master_config": {
                "num_instances": self.num_masters,
                "machine_type_uri": master_type_uri,
                "disk_config": {
                    "boot_disk_type": self.master_disk_type,
                    "boot_disk_size_gb": self.master_disk_size,
                },
            },
            "worker_config": {
                "num_instances": self.num_workers,
                "machine_type_uri": worker_type_uri,
                "disk_config": {
                    "boot_disk_type": self.worker_disk_type,
                    "boot_disk_size_gb": self.worker_disk_size,
                },
            },
            "secondary_worker_config": {},
            "software_config": {},
            "lifecycle_config": {},
            "encryption_config": {},
            "autoscaling_config": {},
            "endpoint_config": {},
        }

        if self.min_num_workers:
            cluster_data["worker_config"]["min_num_instances"] = self.min_num_workers

        if self.master_accelerator_type:
            cluster_data["master_config"]["accelerators"] = {
                "accelerator_type_uri": self.master_accelerator_type,
                "accelerator_count": self.master_accelerator_count,
            }

        if self.worker_accelerator_type:
            cluster_data["worker_config"]["accelerators"] = {
                "accelerator_type_uri": self.worker_accelerator_type,
                "accelerator_count": self.worker_accelerator_count,
            }

        if self.num_preemptible_workers > 0:
            cluster_data["secondary_worker_config"] = {
                "num_instances": self.num_preemptible_workers,
                "machine_type_uri": worker_type_uri,
                "disk_config": {
                    "boot_disk_type": self.worker_disk_type,
                    "boot_disk_size_gb": self.worker_disk_size,
                },
                "is_preemptible": True,
                "preemptibility": self.preemptibility.value,
            }
            if self.worker_accelerator_type:
                cluster_data["secondary_worker_config"]["accelerators"] = {
                    "accelerator_type_uri": self.secondary_worker_accelerator_type,
                    "accelerator_count": self.secondary_worker_accelerator_count,
                }
            if self.secondary_worker_instance_flexibility_policy:
                cluster_data["secondary_worker_config"]["instance_flexibility_policy"] = {
                    "instance_selection_list": [
                        vars(s)
                        for s in self.secondary_worker_instance_flexibility_policy.instance_selection_list
                    ]
                }

        if self.storage_bucket:
            cluster_data["config_bucket"] = self.storage_bucket

        if self.image_version:
            cluster_data["software_config"]["image_version"] = self.image_version

        elif self.custom_image:
            project_id = self.custom_image_project_id or self.project_id
            custom_image_url = (
                f"https://www.googleapis.com/compute/beta/projects/{project_id}"
                f"/global/images/{self.custom_image}"
            )
            cluster_data["master_config"]["image_uri"] = custom_image_url
            if not self.single_node:
                cluster_data["worker_config"]["image_uri"] = custom_image_url

        elif self.custom_image_family:
            project_id = self.custom_image_project_id or self.project_id
            custom_image_url = (
                "https://www.googleapis.com/compute/beta/projects/"
                f"{project_id}/global/images/family/{self.custom_image_family}"
            )
            cluster_data["master_config"]["image_uri"] = custom_image_url
            if not self.single_node:
                cluster_data["worker_config"]["image_uri"] = custom_image_url

        if self.driver_pool_size > 0:
            cluster_data["auxiliary_node_groups"] = [self._build_driver_pool()]

        cluster_data = self._build_gce_cluster_config(cluster_data)

        if self.single_node:
            self.properties["dataproc:dataproc.allow.zero.workers"] = "true"

        if self.properties:
            cluster_data["software_config"]["properties"] = self.properties

        if self.optional_components:
            cluster_data["software_config"]["optional_components"] = self.optional_components

        cluster_data = self._build_lifecycle_config(cluster_data)

        if self.init_actions_uris:
            init_actions_dict = [
                {"executable_file": uri, "execution_timeout": self._get_init_action_timeout()}
                for uri in self.init_actions_uris
            ]
            cluster_data["initialization_actions"] = init_actions_dict

        if self.customer_managed_key:
            cluster_data["encryption_config"] = {"gce_pd_kms_key_name": self.customer_managed_key}
        if self.autoscaling_policy:
            cluster_data["autoscaling_config"] = {"policy_uri": self.autoscaling_policy}
        if self.enable_component_gateway:
            cluster_data["endpoint_config"] = {"enable_http_port_access": self.enable_component_gateway}

        return cluster_data