def execute()

in providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py [0:0]


    def execute(self, context: Context):
        hook = BigQueryHook(
            gcp_conn_id=self.gcp_conn_id,
            location=self.location,
            impersonation_chain=self.impersonation_chain,
        )
        self.hook = hook
        self.source_format = self.source_format.upper()

        job_id = self.hook.generate_job_id(
            job_id=self.job_id,
            dag_id=self.dag_id,
            task_id=self.task_id,
            logical_date=context["logical_date"],
            configuration=self.configuration,
            force_rerun=self.force_rerun,
        )

        self.source_objects = (
            self.source_objects if isinstance(self.source_objects, list) else [self.source_objects]
        )
        self.source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]

        if not self.schema_fields:
            # Check for self.autodetect explicitly False. self.autodetect equal to None
            # entails we do not want to detect schema from files. Instead, it means we
            # rely on an already existing table's schema
            if not self.schema_object and self.autodetect is False:
                raise AirflowException(
                    "Table schema was not found. Neither schema object nor schema fields were specified"
                )
            if self.schema_object and self.source_format != "DATASTORE_BACKUP":
                gcs_hook = GCSHook(
                    gcp_conn_id=self.gcp_conn_id,
                    impersonation_chain=self.impersonation_chain,
                )
                self.schema_fields = json.loads(
                    gcs_hook.download(self.schema_object_bucket, self.schema_object).decode("utf-8")
                )
                self.log.info("Loaded fields from schema object: %s", self.schema_fields)
            else:
                self.schema_fields = None

        if self.external_table:
            self.log.info("Creating a new BigQuery table for storing data...")
            table_obj_api_repr = self._create_external_table()

            BigQueryTableLink.persist(
                context=context,
                task_instance=self,
                dataset_id=table_obj_api_repr["tableReference"]["datasetId"],
                project_id=table_obj_api_repr["tableReference"]["projectId"],
                table_id=table_obj_api_repr["tableReference"]["tableId"],
            )
            if self.max_id_key:
                max_id = self._find_max_value_in_column()
                return max_id
        else:
            if self.force_delete:
                self.log.info("Deleting table %s", self.destination_project_dataset_table)
                hook.delete_table(table_id=self.destination_project_dataset_table)
            else:
                self.log.info("Using existing BigQuery table for storing data...")
            self.configuration = self._use_existing_table()

            try:
                self.log.info("Executing: %s", self.configuration)
                job: BigQueryJob | UnknownJob = self._submit_job(self.hook, job_id)
            except Conflict:
                # If the job already exists retrieve it
                job = self.hook.get_job(
                    project_id=self.project_id or self.hook.project_id,
                    location=self.location,
                    job_id=job_id,
                )
                if job.state not in self.reattach_states:
                    # Same job configuration, so we need force_rerun
                    raise AirflowException(
                        f"Job with id: {job_id} already exists and is in {job.state} state. If you "
                        f"want to force rerun it consider setting `force_rerun=True`."
                        f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
                    )
                # Job already reached state DONE
                if job.state == "DONE":
                    raise AirflowException("Job is already in state DONE. Can not reattach to this job.")

                # We are reattaching to a job
                self.log.info("Reattaching to existing Job in state %s", job.state)
                self._handle_job_error(job)

            job_types = {
                LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
                CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"],
                ExtractJob._JOB_TYPE: ["sourceTable"],
                QueryJob._JOB_TYPE: ["destinationTable"],
            }

            if self.hook.project_id:
                for job_type, tables_prop in job_types.items():
                    job_configuration = job.to_api_repr()["configuration"]
                    if job_type in job_configuration:
                        for table_prop in tables_prop:
                            if table_prop in job_configuration[job_type]:
                                table = job_configuration[job_type][table_prop]
                                persist_kwargs = {
                                    "context": context,
                                    "task_instance": self,
                                    "table_id": table,
                                }
                                if not isinstance(table, str):
                                    persist_kwargs["table_id"] = table["tableId"]
                                    persist_kwargs["dataset_id"] = table["datasetId"]
                                    persist_kwargs["project_id"] = table["projectId"]
                                BigQueryTableLink.persist(**persist_kwargs)

            self.job_id = job.job_id
            context["ti"].xcom_push(key="job_id", value=self.job_id)
            if self.deferrable:
                self.defer(
                    timeout=self.execution_timeout,
                    trigger=BigQueryInsertJobTrigger(
                        conn_id=self.gcp_conn_id,
                        job_id=self.job_id,
                        project_id=self.project_id or self.hook.project_id,
                        location=self.location or self.hook.location,
                        impersonation_chain=self.impersonation_chain,
                    ),
                    method_name="execute_complete",
                )
            else:
                job.result(timeout=self.result_timeout, retry=self.result_retry)
                self._handle_job_error(job)
                if self.max_id_key:
                    return self._find_max_value_in_column()