output/bigquery.py (101 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base import Output, NotConfiguredException
import re
from google.cloud import bigquery
class InvalidJobOptionException(Exception):
pass
class BigqueryOutput(Output):
"""
BigQuery output processors can write data into BigQuery tables.
Args:
datasetWithTable (str): BigQuery table in "dataset.table" notation.
source (str): Source file on Cloud Storage to load the file from (in "gs://bucket/file" format).
location (str): Dataset location (eg. "europe-west4")
job (dict, optional): Job configuration, eg. skipLeadingRows, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
project (str, optional): Google Cloud project to issue BigQuery API calls against.
"""
def output(self):
if 'datasetWithTable' not in self.output_config:
raise NotConfiguredException(
'No destination dataset specified in BigQuery output.')
if 'source' not in self.output_config:
raise NotConfiguredException(
'No GCS source specified in BigQuery output.')
if 'location' not in self.output_config:
raise NotConfiguredException(
'No dataset location specified in BigQuery output.')
if 'job' not in self.output_config:
raise NotConfiguredException(
'No load job location specified in BigQuery output.')
project = self.output_config[
'project'] if 'project' in self.output_config else None
bigquery_client = bigquery.Client(
client_info=self._get_grpc_client_info(), project=project)
job_config = {}
job_field_type = {
'projectionFields': 'list',
'schema': 'dict',
'schemaUpdateOptions': 'list',
'timePartitioning': 'dict',
'rangePartitioning': 'dict',
'clustering': 'dict',
'destinationEncryptionConfiguration': 'dict',
'hivePartitioningOptions': 'dict',
'useAvroLogicalTypes': 'bool',
'allowQuotedNewlines': 'bool',
'allowJaggedRows': 'bool',
'ignoreUnknownValues': 'bool,',
'autodetect': 'bool',
'decimalTargetTypes': 'list',
'parquetOptions': 'dict',
'destinationTableDescription': 'str',
'destinationTableFriendlyName': 'str',
'nullMarker': 'str',
'quoteCharacter': 'str',
'labels': 'dict',
'sourceFormat': 'str',
'encoding': 'str',
'writeDisposition': 'str',
'createDisposition': 'str',
'maxBadRecords': 'int',
'skipLeadingRows': 'int'
}
job_field_map = {}
for camel_name in job_field_type:
snake_name = re.sub(r'(?<!^)(?=[A-Z])', '_', camel_name).lower()
job_field_map[camel_name] = snake_name
if 'job' in self.output_config:
for k, v in self.output_config['job'].items():
if k not in job_field_map:
raise InvalidJobOptionException('Unknown job option "%s"' %
k)
field = job_field_map[k]
if k not in job_field_type or job_field_type[k] == 'str':
job_config[field] = self._jinja_expand_string(v)
elif job_field_type[k] == 'list':
job_config[field] = self._jinja_var_to_list(v)
elif job_field_type[k] == 'dict':
job_config[field] = self._jinja_expand_dict(v)
elif job_field_type[k] == 'bool':
job_config[field] = self._jinja_expand_bool(v)
elif job_field_type[k] == 'int':
job_config[field] = self._jinja_expand_int(v)
bq_job_config = bigquery.job.LoadJobConfig.from_api_repr(
{'load': job_config})
table = self._jinja_expand_string(
self.output_config['datasetWithTable'])
location = self._jinja_expand_string(self.output_config['location'])
source = self._jinja_expand_string(self.output_config['source'])
self.logger.info('BigQuery load job starting...',
extra={
'source_url': source,
'dataset': table,
'location': location,
'job_config': job_config,
})
load_job = bigquery_client.load_table_from_uri(
source,
table,
location=location,
job_config=bq_job_config,
)
load_job.result()
self.logger.info('BigQuery load job finished.',
extra={
'source_url': source,
'dataset': table,
'location': location,
'output_rows': load_job.output_rows,
'output_bytes': load_job.output_bytes,
'errors': load_job.errors,
})