in providers/google/src/airflow/providers/google/cloud/operators/dataproc.py [0:0]
def _build_cluster_data(self):
if self.zone:
master_type_uri = (
f"projects/{self.project_id}/zones/{self.zone}/machineTypes/{self.master_machine_type}"
)
worker_type_uri = (
f"projects/{self.project_id}/zones/{self.zone}/machineTypes/{self.worker_machine_type}"
)
else:
master_type_uri = self.master_machine_type
worker_type_uri = self.worker_machine_type
cluster_data = {
"gce_cluster_config": {},
"master_config": {
"num_instances": self.num_masters,
"machine_type_uri": master_type_uri,
"disk_config": {
"boot_disk_type": self.master_disk_type,
"boot_disk_size_gb": self.master_disk_size,
},
},
"worker_config": {
"num_instances": self.num_workers,
"machine_type_uri": worker_type_uri,
"disk_config": {
"boot_disk_type": self.worker_disk_type,
"boot_disk_size_gb": self.worker_disk_size,
},
},
"secondary_worker_config": {},
"software_config": {},
"lifecycle_config": {},
"encryption_config": {},
"autoscaling_config": {},
"endpoint_config": {},
}
if self.min_num_workers:
cluster_data["worker_config"]["min_num_instances"] = self.min_num_workers
if self.master_accelerator_type:
cluster_data["master_config"]["accelerators"] = {
"accelerator_type_uri": self.master_accelerator_type,
"accelerator_count": self.master_accelerator_count,
}
if self.worker_accelerator_type:
cluster_data["worker_config"]["accelerators"] = {
"accelerator_type_uri": self.worker_accelerator_type,
"accelerator_count": self.worker_accelerator_count,
}
if self.num_preemptible_workers > 0:
cluster_data["secondary_worker_config"] = {
"num_instances": self.num_preemptible_workers,
"machine_type_uri": worker_type_uri,
"disk_config": {
"boot_disk_type": self.worker_disk_type,
"boot_disk_size_gb": self.worker_disk_size,
},
"is_preemptible": True,
"preemptibility": self.preemptibility.value,
}
if self.worker_accelerator_type:
cluster_data["secondary_worker_config"]["accelerators"] = {
"accelerator_type_uri": self.secondary_worker_accelerator_type,
"accelerator_count": self.secondary_worker_accelerator_count,
}
if self.secondary_worker_instance_flexibility_policy:
cluster_data["secondary_worker_config"]["instance_flexibility_policy"] = {
"instance_selection_list": [
vars(s)
for s in self.secondary_worker_instance_flexibility_policy.instance_selection_list
]
}
if self.storage_bucket:
cluster_data["config_bucket"] = self.storage_bucket
if self.image_version:
cluster_data["software_config"]["image_version"] = self.image_version
elif self.custom_image:
project_id = self.custom_image_project_id or self.project_id
custom_image_url = (
f"https://www.googleapis.com/compute/beta/projects/{project_id}"
f"/global/images/{self.custom_image}"
)
cluster_data["master_config"]["image_uri"] = custom_image_url
if not self.single_node:
cluster_data["worker_config"]["image_uri"] = custom_image_url
elif self.custom_image_family:
project_id = self.custom_image_project_id or self.project_id
custom_image_url = (
"https://www.googleapis.com/compute/beta/projects/"
f"{project_id}/global/images/family/{self.custom_image_family}"
)
cluster_data["master_config"]["image_uri"] = custom_image_url
if not self.single_node:
cluster_data["worker_config"]["image_uri"] = custom_image_url
if self.driver_pool_size > 0:
cluster_data["auxiliary_node_groups"] = [self._build_driver_pool()]
cluster_data = self._build_gce_cluster_config(cluster_data)
if self.single_node:
self.properties["dataproc:dataproc.allow.zero.workers"] = "true"
if self.properties:
cluster_data["software_config"]["properties"] = self.properties
if self.optional_components:
cluster_data["software_config"]["optional_components"] = self.optional_components
cluster_data = self._build_lifecycle_config(cluster_data)
if self.init_actions_uris:
init_actions_dict = [
{"executable_file": uri, "execution_timeout": self._get_init_action_timeout()}
for uri in self.init_actions_uris
]
cluster_data["initialization_actions"] = init_actions_dict
if self.customer_managed_key:
cluster_data["encryption_config"] = {"gce_pd_kms_key_name": self.customer_managed_key}
if self.autoscaling_policy:
cluster_data["autoscaling_config"] = {"policy_uri": self.autoscaling_policy}
if self.enable_component_gateway:
cluster_data["endpoint_config"] = {"enable_http_port_access": self.enable_component_gateway}
return cluster_data