in providers/google/src/airflow/providers/google/cloud/operators/bigquery.py [0:0]
def execute(self, context: Any):
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
)
self.hook = hook
if self.project_id is None:
self.project_id = hook.project_id
self.job_id = hook.generate_job_id(
job_id=self.job_id,
dag_id=self.dag_id,
task_id=self.task_id,
logical_date=context["logical_date"],
configuration=self.configuration,
force_rerun=self.force_rerun,
)
try:
self.log.info("Executing: %s'", self.configuration)
# Create a job
if self.job_id is None:
raise ValueError("job_id cannot be None")
job: BigQueryJob | UnknownJob = self._submit_job(hook, self.job_id)
except Conflict:
# If the job already exists retrieve it
job = hook.get_job(
project_id=self.project_id,
location=self.location,
job_id=self.job_id,
)
if job.state not in self.reattach_states:
# Same job configuration, so we need force_rerun
raise AirflowException(
f"Job with id: {self.job_id} already exists and is in {job.state} state. If you "
f"want to force rerun it consider setting `force_rerun=True`."
f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`"
)
# Job already reached state DONE
if job.state == "DONE":
raise AirflowException("Job is already in state DONE. Can not reattach to this job.")
# We are reattaching to a job
self.log.info("Reattaching to existing Job in state %s", job.state)
self._handle_job_error(job)
job_types = {
LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"],
ExtractJob._JOB_TYPE: ["sourceTable"],
QueryJob._JOB_TYPE: ["destinationTable"],
}
if self.project_id:
for job_type, tables_prop in job_types.items():
job_configuration = job.to_api_repr()["configuration"]
if job_type in job_configuration:
for table_prop in tables_prop:
if table_prop in job_configuration[job_type]:
table = job_configuration[job_type][table_prop]
persist_kwargs = {
"context": context,
"task_instance": self,
"project_id": self.project_id,
"table_id": table,
}
if not isinstance(table, str):
persist_kwargs["table_id"] = table["tableId"]
persist_kwargs["dataset_id"] = table["datasetId"]
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)
self.job_id = job.job_id
if self.project_id:
job_id_path = convert_job_id(
job_id=self.job_id, # type: ignore[arg-type]
project_id=self.project_id,
location=self.location,
)
context["ti"].xcom_push(key="job_id_path", value=job_id_path)
persist_kwargs = {
"context": context,
"task_instance": self,
"project_id": self.project_id,
"location": self.location,
"job_id": self.job_id,
}
BigQueryJobDetailLink.persist(**persist_kwargs)
# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
self._handle_job_error(job)
return self.job_id
if job.running():
self.defer(
timeout=self.execution_timeout,
trigger=BigQueryInsertJobTrigger(
conn_id=self.gcp_conn_id,
job_id=self.job_id,
project_id=self.project_id,
location=self.location or hook.location,
poll_interval=self.poll_interval,
impersonation_chain=self.impersonation_chain,
cancel_on_kill=self.cancel_on_kill,
),
method_name="execute_complete",
)
self.log.info("Current state of job %s is %s", job.job_id, job.state)
self._handle_job_error(job)
return self.job_id