def execute()

in providers/google/src/airflow/providers/google/cloud/operators/bigquery.py [0:0]


    def execute(self, context: Any):
        hook = BigQueryHook(
            gcp_conn_id=self.gcp_conn_id,
            impersonation_chain=self.impersonation_chain,
        )
        self.hook = hook
        if self.project_id is None:
            self.project_id = hook.project_id

        self.job_id = hook.generate_job_id(
            job_id=self.job_id,
            dag_id=self.dag_id,
            task_id=self.task_id,
            logical_date=context["logical_date"],
            configuration=self.configuration,
            force_rerun=self.force_rerun,
        )

        try:
            self.log.info("Executing: %s'", self.configuration)
            # Create a job
            if self.job_id is None:
                raise ValueError("job_id cannot be None")
            job: BigQueryJob | UnknownJob = self._submit_job(hook, self.job_id)
        except Conflict:
            # If the job already exists retrieve it
            job = hook.get_job(
                project_id=self.project_id,
                location=self.location,
                job_id=self.job_id,
            )

            if job.state not in self.reattach_states:
                # Same job configuration, so we need force_rerun
                raise AirflowException(
                    f"Job with id: {self.job_id} already exists and is in {job.state} state. If you "
                    f"want to force rerun it consider setting `force_rerun=True`."
                    f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
                )

            # Job already reached state DONE
            if job.state == "DONE":
                raise AirflowException("Job is already in state DONE. Can not reattach to this job.")

            # We are reattaching to a job
            self.log.info("Reattaching to existing Job in state %s", job.state)
            self._handle_job_error(job)

        job_types = {
            LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
            CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"],
            ExtractJob._JOB_TYPE: ["sourceTable"],
            QueryJob._JOB_TYPE: ["destinationTable"],
        }

        if self.project_id:
            for job_type, tables_prop in job_types.items():
                job_configuration = job.to_api_repr()["configuration"]
                if job_type in job_configuration:
                    for table_prop in tables_prop:
                        if table_prop in job_configuration[job_type]:
                            table = job_configuration[job_type][table_prop]
                            persist_kwargs = {
                                "context": context,
                                "task_instance": self,
                                "project_id": self.project_id,
                                "table_id": table,
                            }
                            if not isinstance(table, str):
                                persist_kwargs["table_id"] = table["tableId"]
                                persist_kwargs["dataset_id"] = table["datasetId"]
                                persist_kwargs["project_id"] = table["projectId"]
                            BigQueryTableLink.persist(**persist_kwargs)

        self.job_id = job.job_id

        if self.project_id:
            job_id_path = convert_job_id(
                job_id=self.job_id,  # type: ignore[arg-type]
                project_id=self.project_id,
                location=self.location,
            )
            context["ti"].xcom_push(key="job_id_path", value=job_id_path)

        persist_kwargs = {
            "context": context,
            "task_instance": self,
            "project_id": self.project_id,
            "location": self.location,
            "job_id": self.job_id,
        }
        BigQueryJobDetailLink.persist(**persist_kwargs)

        # Wait for the job to complete
        if not self.deferrable:
            job.result(timeout=self.result_timeout, retry=self.result_retry)
            self._handle_job_error(job)

            return self.job_id
        if job.running():
            self.defer(
                timeout=self.execution_timeout,
                trigger=BigQueryInsertJobTrigger(
                    conn_id=self.gcp_conn_id,
                    job_id=self.job_id,
                    project_id=self.project_id,
                    location=self.location or hook.location,
                    poll_interval=self.poll_interval,
                    impersonation_chain=self.impersonation_chain,
                    cancel_on_kill=self.cancel_on_kill,
                ),
                method_name="execute_complete",
            )
        self.log.info("Current state of job %s is %s", job.job_id, job.state)
        self._handle_job_error(job)
        return self.job_id