in providers/google/src/airflow/providers/google/cloud/operators/dataproc.py [0:0]
def execute(self, context: Context) -> dict:
self.log.info("Creating cluster: %s", self.cluster_name)
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
# Save data required to display extra link no matter what the cluster status will be
project_id = self.project_id or hook.project_id
if project_id:
DataprocClusterLink.persist(
context=context,
operator=self,
cluster_id=self.cluster_name,
project_id=project_id,
region=self.region,
)
try:
# First try to create a new cluster
operation = self._create_cluster(hook)
if not self.deferrable and type(operation) is not str:
cluster = hook.wait_for_operation(
timeout=self.timeout, result_retry=self.retry, operation=operation
)
self.log.info("Cluster created.")
return Cluster.to_dict(cluster)
cluster = hook.get_cluster(
project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
)
if cluster.status.state == cluster.status.State.RUNNING:
self.log.info("Cluster created.")
return Cluster.to_dict(cluster)
self.defer(
trigger=DataprocClusterTrigger(
cluster_name=self.cluster_name,
project_id=self.project_id,
region=self.region,
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
polling_interval_seconds=self.polling_interval_seconds,
delete_on_error=self.delete_on_error,
),
method_name="execute_complete",
)
except AlreadyExists:
if not self.use_if_exists:
raise
self.log.info("Cluster already exists.")
cluster = self._get_cluster(hook)
except DataprocResourceIsNotReadyError as resource_not_ready_error:
if self.num_retries_if_resource_is_not_ready:
attempt = self.num_retries_if_resource_is_not_ready
while attempt > 0:
attempt -= 1
try:
cluster = self._retry_cluster_creation(hook)
except DataprocResourceIsNotReadyError:
continue
else:
return cluster
self.log.info(
"Retrying Cluster %s creation because of resource not ready was unsuccessful.",
self.cluster_name,
)
if self.delete_on_error:
self._delete_cluster(hook)
self._wait_for_cluster_in_deleting_state(hook)
raise resource_not_ready_error
except AirflowException as ae:
# There still could be a cluster created here in an ERROR state which
# should be deleted immediately rather than consuming another retry attempt
# (assuming delete_on_error is true (default))
# This reduces overall the number of task attempts from 3 to 2 to successful cluster creation
# assuming the underlying GCE issues have resolved within that window. Users can configure
# a higher number of retry attempts in powers of two with 30s-60s wait interval
try:
cluster = self._get_cluster(hook)
self._handle_error_state(hook, cluster)
except AirflowException as ae_inner:
# We could get any number of failures here, including cluster not found and we
# can just ignore to ensure we surface the original cluster create failure
self.log.exception(ae_inner)
finally:
raise ae
# Check if cluster is not in ERROR state
self._handle_error_state(hook, cluster)
if cluster.status.state == cluster.status.State.CREATING:
# Wait for cluster to be created
cluster = self._wait_for_cluster_in_creating_state(hook)
self._handle_error_state(hook, cluster)
elif cluster.status.state == cluster.status.State.DELETING:
# Wait for cluster to be deleted
self._wait_for_cluster_in_deleting_state(hook)
# Create new cluster
cluster = self._create_cluster(hook)
self._handle_error_state(hook, cluster)
elif cluster.status.state == cluster.status.State.STOPPED:
# if the cluster exists and already stopped, then start the cluster
self._start_cluster(hook)
return Cluster.to_dict(cluster)