def execute()

in providers/google/src/airflow/providers/google/cloud/operators/dataproc.py [0:0]


    def execute(self, context: Context):
        if self.asynchronous and self.deferrable:
            raise AirflowException(
                "Both asynchronous and deferrable parameters were passed. Please, provide only one."
            )

        batch_id: str = ""
        if self.batch_id:
            batch_id = self.batch_id
            self.log.info("Starting batch %s", batch_id)
            # Persist the link earlier so users can observe the progress
            DataprocBatchLink.persist(
                context=context,
                operator=self,
                project_id=self.project_id,
                region=self.region,
                batch_id=self.batch_id,
            )
        else:
            self.log.info("Starting batch. The batch ID will be generated since it was not provided.")

        if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info:
            self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
            self._inject_openlineage_properties_into_dataproc_batch(context)

        self.__update_batch_labels()

        try:
            self.operation = self.hook.create_batch(
                region=self.region,
                project_id=self.project_id,
                batch=self.batch,
                batch_id=self.batch_id,
                request_id=self.request_id,
                retry=self.retry,
                timeout=self.timeout,
                metadata=self.metadata,
            )
        except AlreadyExists:
            self.log.info("Batch with given id already exists.")
            self.log.info("Attaching to the job %s if it is still running.", batch_id)
        else:
            if self.operation and self.operation.metadata:
                batch_id = self.operation.metadata.batch.split("/")[-1]
            else:
                raise AirflowException("Operation metadata is not available.")
            self.log.info("The batch %s was created.", batch_id)

        DataprocBatchLink.persist(
            context=context,
            operator=self,
            project_id=self.project_id,
            region=self.region,
            batch_id=batch_id,
        )

        if self.asynchronous:
            batch = self.hook.get_batch(
                batch_id=batch_id,
                region=self.region,
                project_id=self.project_id,
                retry=self.retry,
                timeout=self.timeout,
                metadata=self.metadata,
            )
            self.log.info("The batch %s was created asynchronously. Exiting.", batch_id)
            return Batch.to_dict(batch)

        if self.deferrable:
            self.defer(
                trigger=DataprocBatchTrigger(
                    batch_id=batch_id,
                    project_id=self.project_id,
                    region=self.region,
                    gcp_conn_id=self.gcp_conn_id,
                    impersonation_chain=self.impersonation_chain,
                    polling_interval_seconds=self.polling_interval_seconds,
                ),
                method_name="execute_complete",
            )

        self.log.info("Waiting for the completion of batch job %s", batch_id)
        batch = self.hook.wait_for_batch(
            batch_id=batch_id,
            region=self.region,
            project_id=self.project_id,
            retry=self.retry,
            timeout=self.timeout,
            metadata=self.metadata,
        )
        if self.num_retries_if_resource_is_not_ready and self.hook.check_error_for_resource_is_not_ready_msg(
            batch.state_message
        ):
            attempt = self.num_retries_if_resource_is_not_ready
            while attempt > 0:
                attempt -= 1
                batch, batch_id = self.retry_batch_creation(batch_id)
                if not self.hook.check_error_for_resource_is_not_ready_msg(batch.state_message):
                    break

        self.handle_batch_status(context, batch.state, batch_id, batch.state_message)
        return Batch.to_dict(batch)