in providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py [0:0]
def execute(self, context: Context):
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
location=self.location,
impersonation_chain=self.impersonation_chain,
)
self.hook = hook
self.source_format = self.source_format.upper()
job_id = self.hook.generate_job_id(
job_id=self.job_id,
dag_id=self.dag_id,
task_id=self.task_id,
logical_date=context["logical_date"],
configuration=self.configuration,
force_rerun=self.force_rerun,
)
self.source_objects = (
self.source_objects if isinstance(self.source_objects, list) else [self.source_objects]
)
self.source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects]
if not self.schema_fields:
# Check for self.autodetect explicitly False. self.autodetect equal to None
# entails we do not want to detect schema from files. Instead, it means we
# rely on an already existing table's schema
if not self.schema_object and self.autodetect is False:
raise AirflowException(
"Table schema was not found. Neither schema object nor schema fields were specified"
)
if self.schema_object and self.source_format != "DATASTORE_BACKUP":
gcs_hook = GCSHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
self.schema_fields = json.loads(
gcs_hook.download(self.schema_object_bucket, self.schema_object).decode("utf-8")
)
self.log.info("Loaded fields from schema object: %s", self.schema_fields)
else:
self.schema_fields = None
if self.external_table:
self.log.info("Creating a new BigQuery table for storing data...")
table_obj_api_repr = self._create_external_table()
BigQueryTableLink.persist(
context=context,
task_instance=self,
dataset_id=table_obj_api_repr["tableReference"]["datasetId"],
project_id=table_obj_api_repr["tableReference"]["projectId"],
table_id=table_obj_api_repr["tableReference"]["tableId"],
)
if self.max_id_key:
max_id = self._find_max_value_in_column()
return max_id
else:
if self.force_delete:
self.log.info("Deleting table %s", self.destination_project_dataset_table)
hook.delete_table(table_id=self.destination_project_dataset_table)
else:
self.log.info("Using existing BigQuery table for storing data...")
self.configuration = self._use_existing_table()
try:
self.log.info("Executing: %s", self.configuration)
job: BigQueryJob | UnknownJob = self._submit_job(self.hook, job_id)
except Conflict:
# If the job already exists retrieve it
job = self.hook.get_job(
project_id=self.project_id or self.hook.project_id,
location=self.location,
job_id=job_id,
)
if job.state not in self.reattach_states:
# Same job configuration, so we need force_rerun
raise AirflowException(
f"Job with id: {job_id} already exists and is in {job.state} state. If you "
f"want to force rerun it consider setting `force_rerun=True`."
f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
)
# Job already reached state DONE
if job.state == "DONE":
raise AirflowException("Job is already in state DONE. Can not reattach to this job.")
# We are reattaching to a job
self.log.info("Reattaching to existing Job in state %s", job.state)
self._handle_job_error(job)
job_types = {
LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"],
ExtractJob._JOB_TYPE: ["sourceTable"],
QueryJob._JOB_TYPE: ["destinationTable"],
}
if self.hook.project_id:
for job_type, tables_prop in job_types.items():
job_configuration = job.to_api_repr()["configuration"]
if job_type in job_configuration:
for table_prop in tables_prop:
if table_prop in job_configuration[job_type]:
table = job_configuration[job_type][table_prop]
persist_kwargs = {
"context": context,
"task_instance": self,
"table_id": table,
}
if not isinstance(table, str):
persist_kwargs["table_id"] = table["tableId"]
persist_kwargs["dataset_id"] = table["datasetId"]
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)
self.job_id = job.job_id
context["ti"].xcom_push(key="job_id", value=self.job_id)
if self.deferrable:
self.defer(
timeout=self.execution_timeout,
trigger=BigQueryInsertJobTrigger(
conn_id=self.gcp_conn_id,
job_id=self.job_id,
project_id=self.project_id or self.hook.project_id,
location=self.location or self.hook.location,
impersonation_chain=self.impersonation_chain,
),
method_name="execute_complete",
)
else:
job.result(timeout=self.result_timeout, retry=self.result_retry)
self._handle_job_error(job)
if self.max_id_key:
return self._find_max_value_in_column()