def output()

in output/bigquery.py [0:0]


    def output(self):
        if 'datasetWithTable' not in self.output_config:
            raise NotConfiguredException(
                'No destination dataset specified in BigQuery output.')
        if 'source' not in self.output_config:
            raise NotConfiguredException(
                'No GCS source specified in BigQuery output.')
        if 'location' not in self.output_config:
            raise NotConfiguredException(
                'No dataset location specified in BigQuery output.')
        if 'job' not in self.output_config:
            raise NotConfiguredException(
                'No load job location specified in BigQuery output.')

        project = self.output_config[
            'project'] if 'project' in self.output_config else None
        bigquery_client = bigquery.Client(
            client_info=self._get_grpc_client_info(), project=project)

        job_config = {}
        job_field_type = {
            'projectionFields': 'list',
            'schema': 'dict',
            'schemaUpdateOptions': 'list',
            'timePartitioning': 'dict',
            'rangePartitioning': 'dict',
            'clustering': 'dict',
            'destinationEncryptionConfiguration': 'dict',
            'hivePartitioningOptions': 'dict',
            'useAvroLogicalTypes': 'bool',
            'allowQuotedNewlines': 'bool',
            'allowJaggedRows': 'bool',
            'ignoreUnknownValues': 'bool,',
            'autodetect': 'bool',
            'decimalTargetTypes': 'list',
            'parquetOptions': 'dict',
            'destinationTableDescription': 'str',
            'destinationTableFriendlyName': 'str',
            'nullMarker': 'str',
            'quoteCharacter': 'str',
            'labels': 'dict',
            'sourceFormat': 'str',
            'encoding': 'str',
            'writeDisposition': 'str',
            'createDisposition': 'str',
            'maxBadRecords': 'int',
            'skipLeadingRows': 'int'
        }
        job_field_map = {}
        for camel_name in job_field_type:
            snake_name = re.sub(r'(?<!^)(?=[A-Z])', '_', camel_name).lower()
            job_field_map[camel_name] = snake_name

        if 'job' in self.output_config:
            for k, v in self.output_config['job'].items():
                if k not in job_field_map:
                    raise InvalidJobOptionException('Unknown job option "%s"' %
                                                    k)
                field = job_field_map[k]
                if k not in job_field_type or job_field_type[k] == 'str':
                    job_config[field] = self._jinja_expand_string(v)
                elif job_field_type[k] == 'list':
                    job_config[field] = self._jinja_var_to_list(v)
                elif job_field_type[k] == 'dict':
                    job_config[field] = self._jinja_expand_dict(v)
                elif job_field_type[k] == 'bool':
                    job_config[field] = self._jinja_expand_bool(v)
                elif job_field_type[k] == 'int':
                    job_config[field] = self._jinja_expand_int(v)

        bq_job_config = bigquery.job.LoadJobConfig.from_api_repr(
            {'load': job_config})

        table = self._jinja_expand_string(
            self.output_config['datasetWithTable'])
        location = self._jinja_expand_string(self.output_config['location'])
        source = self._jinja_expand_string(self.output_config['source'])

        self.logger.info('BigQuery load job starting...',
                         extra={
                             'source_url': source,
                             'dataset': table,
                             'location': location,
                             'job_config': job_config,
                         })
        load_job = bigquery_client.load_table_from_uri(
            source,
            table,
            location=location,
            job_config=bq_job_config,
        )
        load_job.result()
        self.logger.info('BigQuery load job finished.',
                         extra={
                             'source_url': source,
                             'dataset': table,
                             'location': location,
                             'output_rows': load_job.output_rows,
                             'output_bytes': load_job.output_bytes,
                             'errors': load_job.errors,
                         })