in python/pipelines/pipeline_ops.py [0:0]
def _extract_schema_from_bigquery(
project: str,
location: str,
table_name: str,
table_schema: str,
) -> list:
"""
Extracts the schema from a BigQuery table or view.
Args:
project: The ID of the project that contains the table or view.
location: The location of the table or view.
table_name: The name of the table or view.
table_schema: The path to the schema file.
Returns:
A list of the column names in the table or view.
Raises:
Exception: If the table or view does not exist.
"""
from google.cloud import bigquery
from google.api_core import exceptions
try:
client = bigquery.Client(
project=project,
#location=location,
)
table = client.get_table(table_name)
schema = [schema.name for schema in table.schema]
except exceptions.NotFound as e:
logging.warn(f'Pipeline compiled without columns transformation. \
Make sure the `data_source_bigquery_table_path` table or view exists! \
Loading default values from schema file {table_schema}.')
import json
with open(table_schema) as f:
d = json.load(f)
schema = [feature['name'] for feature in d]
return schema