in redash/query_runner/big_query.py [0:0]
def get_table_sample(self, table_name):
if not self.configuration.get("loadSchema", False):
return {}
service = self._get_bigquery_service()
project_id = self._get_project_id()
dataset_id, table_id = table_name.split(".", 1)
try:
# NOTE: the `sample_response` is limited by `maxResults` here.
# Without this limit, the response would be very large and require
# pagination using `nextPageToken`.
sample_response = (
service.tabledata()
.list(
projectId=project_id,
datasetId=dataset_id,
tableId=table_id,
fields="rows",
maxResults=1,
)
.execute()
)
schema_response = (
service.tables()
.get(
projectId=project_id,
datasetId=dataset_id,
tableId=table_id,
fields="schema,id",
)
.execute()
)
table_rows = sample_response.get("rows", [])
if len(table_rows) == 0:
samples = []
else:
samples = table_rows[0].get("f", [])
schema = schema_response.get("schema", {}).get("fields", [])
columns = self._get_columns_schema(schema_response).get("columns", [])
flattened_samples = self._flatten_samples(samples)
samples_dict = self._columns_and_samples_to_dict(schema, flattened_samples)
return samples_dict
except HttpError as http_error:
logger.exception(
"Error communicating with server for sample for table %s: %s",
table_name,
http_error,
)
# If there is an error getting the sample using the API,
# try to do it by running a `select *` with a limit.
return super().get_table_sample(table_name)