def execute()

in providers/google/src/airflow/providers/google/cloud/operators/dataproc.py [0:0]


    def execute(self, context: Context) -> dict:
        self.log.info("Creating cluster: %s", self.cluster_name)
        hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)

        # Save data required to display extra link no matter what the cluster status will be
        project_id = self.project_id or hook.project_id
        if project_id:
            DataprocClusterLink.persist(
                context=context,
                operator=self,
                cluster_id=self.cluster_name,
                project_id=project_id,
                region=self.region,
            )

        try:
            # First try to create a new cluster
            operation = self._create_cluster(hook)
            if not self.deferrable and type(operation) is not str:
                cluster = hook.wait_for_operation(
                    timeout=self.timeout, result_retry=self.retry, operation=operation
                )
                self.log.info("Cluster created.")
                return Cluster.to_dict(cluster)
            cluster = hook.get_cluster(
                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
            )
            if cluster.status.state == cluster.status.State.RUNNING:
                self.log.info("Cluster created.")
                return Cluster.to_dict(cluster)
            self.defer(
                trigger=DataprocClusterTrigger(
                    cluster_name=self.cluster_name,
                    project_id=self.project_id,
                    region=self.region,
                    gcp_conn_id=self.gcp_conn_id,
                    impersonation_chain=self.impersonation_chain,
                    polling_interval_seconds=self.polling_interval_seconds,
                    delete_on_error=self.delete_on_error,
                ),
                method_name="execute_complete",
            )
        except AlreadyExists:
            if not self.use_if_exists:
                raise
            self.log.info("Cluster already exists.")
            cluster = self._get_cluster(hook)
        except DataprocResourceIsNotReadyError as resource_not_ready_error:
            if self.num_retries_if_resource_is_not_ready:
                attempt = self.num_retries_if_resource_is_not_ready
                while attempt > 0:
                    attempt -= 1
                    try:
                        cluster = self._retry_cluster_creation(hook)
                    except DataprocResourceIsNotReadyError:
                        continue
                    else:
                        return cluster
                self.log.info(
                    "Retrying Cluster %s creation because of resource not ready was unsuccessful.",
                    self.cluster_name,
                )
            if self.delete_on_error:
                self._delete_cluster(hook)
                self._wait_for_cluster_in_deleting_state(hook)
            raise resource_not_ready_error
        except AirflowException as ae:
            # There still could be a cluster created here in an ERROR state which
            # should be deleted immediately rather than consuming another retry attempt
            # (assuming delete_on_error is true (default))
            # This reduces overall the number of task attempts from 3 to 2 to successful cluster creation
            # assuming the underlying GCE issues have resolved within that window. Users can configure
            # a higher number of retry attempts in powers of two with 30s-60s wait interval
            try:
                cluster = self._get_cluster(hook)
                self._handle_error_state(hook, cluster)
            except AirflowException as ae_inner:
                # We could get any number of failures here, including cluster not found and we
                # can just ignore to ensure we surface the original cluster create failure
                self.log.exception(ae_inner)
            finally:
                raise ae

        # Check if cluster is not in ERROR state
        self._handle_error_state(hook, cluster)
        if cluster.status.state == cluster.status.State.CREATING:
            # Wait for cluster to be created
            cluster = self._wait_for_cluster_in_creating_state(hook)
            self._handle_error_state(hook, cluster)
        elif cluster.status.state == cluster.status.State.DELETING:
            # Wait for cluster to be deleted
            self._wait_for_cluster_in_deleting_state(hook)
            # Create new cluster
            cluster = self._create_cluster(hook)
            self._handle_error_state(hook, cluster)
        elif cluster.status.state == cluster.status.State.STOPPED:
            # if the cluster exists and already stopped, then start the cluster
            self._start_cluster(hook)

        return Cluster.to_dict(cluster)