in output/bigquery.py [0:0]
def output(self):
if 'datasetWithTable' not in self.output_config:
raise NotConfiguredException(
'No destination dataset specified in BigQuery output.')
if 'source' not in self.output_config:
raise NotConfiguredException(
'No GCS source specified in BigQuery output.')
if 'location' not in self.output_config:
raise NotConfiguredException(
'No dataset location specified in BigQuery output.')
if 'job' not in self.output_config:
raise NotConfiguredException(
'No load job location specified in BigQuery output.')
project = self.output_config[
'project'] if 'project' in self.output_config else None
bigquery_client = bigquery.Client(
client_info=self._get_grpc_client_info(), project=project)
job_config = {}
job_field_type = {
'projectionFields': 'list',
'schema': 'dict',
'schemaUpdateOptions': 'list',
'timePartitioning': 'dict',
'rangePartitioning': 'dict',
'clustering': 'dict',
'destinationEncryptionConfiguration': 'dict',
'hivePartitioningOptions': 'dict',
'useAvroLogicalTypes': 'bool',
'allowQuotedNewlines': 'bool',
'allowJaggedRows': 'bool',
'ignoreUnknownValues': 'bool,',
'autodetect': 'bool',
'decimalTargetTypes': 'list',
'parquetOptions': 'dict',
'destinationTableDescription': 'str',
'destinationTableFriendlyName': 'str',
'nullMarker': 'str',
'quoteCharacter': 'str',
'labels': 'dict',
'sourceFormat': 'str',
'encoding': 'str',
'writeDisposition': 'str',
'createDisposition': 'str',
'maxBadRecords': 'int',
'skipLeadingRows': 'int'
}
job_field_map = {}
for camel_name in job_field_type:
snake_name = re.sub(r'(?<!^)(?=[A-Z])', '_', camel_name).lower()
job_field_map[camel_name] = snake_name
if 'job' in self.output_config:
for k, v in self.output_config['job'].items():
if k not in job_field_map:
raise InvalidJobOptionException('Unknown job option "%s"' %
k)
field = job_field_map[k]
if k not in job_field_type or job_field_type[k] == 'str':
job_config[field] = self._jinja_expand_string(v)
elif job_field_type[k] == 'list':
job_config[field] = self._jinja_var_to_list(v)
elif job_field_type[k] == 'dict':
job_config[field] = self._jinja_expand_dict(v)
elif job_field_type[k] == 'bool':
job_config[field] = self._jinja_expand_bool(v)
elif job_field_type[k] == 'int':
job_config[field] = self._jinja_expand_int(v)
bq_job_config = bigquery.job.LoadJobConfig.from_api_repr(
{'load': job_config})
table = self._jinja_expand_string(
self.output_config['datasetWithTable'])
location = self._jinja_expand_string(self.output_config['location'])
source = self._jinja_expand_string(self.output_config['source'])
self.logger.info('BigQuery load job starting...',
extra={
'source_url': source,
'dataset': table,
'location': location,
'job_config': job_config,
})
load_job = bigquery_client.load_table_from_uri(
source,
table,
location=location,
job_config=bq_job_config,
)
load_job.result()
self.logger.info('BigQuery load job finished.',
extra={
'source_url': source,
'dataset': table,
'location': location,
'output_rows': load_job.output_rows,
'output_bytes': load_job.output_bytes,
'errors': load_job.errors,
})