in providers/google/src/airflow/providers/google/cloud/operators/bigquery.py [0:0]
def execute(self, context: Context):
if self.project_id:
self.log.warning(
"The project_id parameter is deprecated, and will be removed in a future release."
" Please use table_project_id instead.",
)
if not self.table_project_id:
self.table_project_id = self.project_id
else:
self.log.info("Ignoring 'project_id' parameter, as 'table_project_id' is found.")
if not exactly_one(self.job_id, self.table_id):
raise AirflowException(
"'job_id' and 'table_id' parameters are mutually exclusive, "
"ensure that exactly one of them is specified"
)
hook = BigQueryHook(
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
use_legacy_sql=self.use_legacy_sql,
)
if not self.deferrable:
if not self.job_id:
self.log.info(
"Fetching Data from %s.%s.%s max results: %s",
self.table_project_id or hook.project_id,
self.dataset_id,
self.table_id,
self.max_results,
)
if not self.selected_fields:
schema: dict[str, list] = hook.get_schema(
dataset_id=self.dataset_id,
table_id=self.table_id,
project_id=self.table_project_id or hook.project_id,
)
if "fields" in schema:
self.selected_fields = ",".join([field["name"] for field in schema["fields"]])
rows: list[Row] | RowIterator | list[dict[str, Any]] = hook.list_rows(
dataset_id=self.dataset_id,
table_id=self.table_id,
max_results=self.max_results,
selected_fields=self.selected_fields,
location=self.location,
project_id=self.table_project_id or hook.project_id,
)
else:
self.log.info(
"Fetching data from job '%s:%s.%s' max results: %s",
self.job_project_id or hook.project_id,
self.location,
self.job_id,
self.max_results,
)
rows = hook.get_query_results(
job_id=self.job_id,
location=self.location,
selected_fields=self.selected_fields,
max_results=self.max_results,
project_id=self.job_project_id or hook.project_id,
)
if isinstance(rows, RowIterator):
raise TypeError(
"BigQueryHook.list_rows() returns iterator when return_iterator is False (default)"
)
self.log.info("Total extracted rows: %s", len(rows))
if self.as_dict:
table_data = [dict(row) for row in rows]
else:
table_data = [row.values() if isinstance(row, Row) else list(row.values()) for row in rows]
return table_data
if not self.job_id:
job: BigQueryJob | UnknownJob = self._submit_job(hook, job_id="")
else:
job = hook.get_job(
job_id=self.job_id, project_id=self.job_project_id or hook.project_id, location=self.location
)
context["ti"].xcom_push(key="job_id", value=job.job_id)
self.defer(
timeout=self.execution_timeout,
trigger=BigQueryGetDataTrigger(
conn_id=self.gcp_conn_id,
job_id=job.job_id,
dataset_id=self.dataset_id,
table_id=self.table_id,
project_id=self.job_project_id or hook.project_id,
location=self.location or hook.location,
poll_interval=self.poll_interval,
as_dict=self.as_dict,
impersonation_chain=self.impersonation_chain,
selected_fields=self.selected_fields,
),
method_name="execute_complete",
)