output/bigquery.py (101 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from .base import Output, NotConfiguredException import re from google.cloud import bigquery class InvalidJobOptionException(Exception): pass class BigqueryOutput(Output): """ BigQuery output processors can write data into BigQuery tables. Args: datasetWithTable (str): BigQuery table in "dataset.table" notation. source (str): Source file on Cloud Storage to load the file from (in "gs://bucket/file" format). location (str): Dataset location (eg. "europe-west4") job (dict, optional): Job configuration, eg. skipLeadingRows, see: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad project (str, optional): Google Cloud project to issue BigQuery API calls against. """ def output(self): if 'datasetWithTable' not in self.output_config: raise NotConfiguredException( 'No destination dataset specified in BigQuery output.') if 'source' not in self.output_config: raise NotConfiguredException( 'No GCS source specified in BigQuery output.') if 'location' not in self.output_config: raise NotConfiguredException( 'No dataset location specified in BigQuery output.') if 'job' not in self.output_config: raise NotConfiguredException( 'No load job location specified in BigQuery output.') project = self.output_config[ 'project'] if 'project' in self.output_config else None bigquery_client = bigquery.Client( client_info=self._get_grpc_client_info(), project=project) job_config = {} job_field_type = { 'projectionFields': 'list', 'schema': 'dict', 'schemaUpdateOptions': 'list', 'timePartitioning': 'dict', 'rangePartitioning': 'dict', 'clustering': 'dict', 'destinationEncryptionConfiguration': 'dict', 'hivePartitioningOptions': 'dict', 'useAvroLogicalTypes': 'bool', 'allowQuotedNewlines': 'bool', 'allowJaggedRows': 'bool', 'ignoreUnknownValues': 'bool,', 'autodetect': 'bool', 'decimalTargetTypes': 'list', 'parquetOptions': 'dict', 'destinationTableDescription': 'str', 'destinationTableFriendlyName': 'str', 'nullMarker': 'str', 'quoteCharacter': 'str', 'labels': 'dict', 'sourceFormat': 'str', 'encoding': 'str', 'writeDisposition': 'str', 'createDisposition': 'str', 'maxBadRecords': 'int', 'skipLeadingRows': 'int' } job_field_map = {} for camel_name in job_field_type: snake_name = re.sub(r'(?<!^)(?=[A-Z])', '_', camel_name).lower() job_field_map[camel_name] = snake_name if 'job' in self.output_config: for k, v in self.output_config['job'].items(): if k not in job_field_map: raise InvalidJobOptionException('Unknown job option "%s"' % k) field = job_field_map[k] if k not in job_field_type or job_field_type[k] == 'str': job_config[field] = self._jinja_expand_string(v) elif job_field_type[k] == 'list': job_config[field] = self._jinja_var_to_list(v) elif job_field_type[k] == 'dict': job_config[field] = self._jinja_expand_dict(v) elif job_field_type[k] == 'bool': job_config[field] = self._jinja_expand_bool(v) elif job_field_type[k] == 'int': job_config[field] = self._jinja_expand_int(v) bq_job_config = bigquery.job.LoadJobConfig.from_api_repr( {'load': job_config}) table = self._jinja_expand_string( self.output_config['datasetWithTable']) location = self._jinja_expand_string(self.output_config['location']) source = self._jinja_expand_string(self.output_config['source']) self.logger.info('BigQuery load job starting...', extra={ 'source_url': source, 'dataset': table, 'location': location, 'job_config': job_config, }) load_job = bigquery_client.load_table_from_uri( source, table, location=location, job_config=bq_job_config, ) load_job.result() self.logger.info('BigQuery load job finished.', extra={ 'source_url': source, 'dataset': table, 'location': location, 'output_rows': load_job.output_rows, 'output_bytes': load_job.output_bytes, 'errors': load_job.errors, })