def process()

in processors/bigquery.py [0:0]


    def process(self, output_var='records'):
        bigquery_config = self.config
        if 'query' not in bigquery_config:
            raise NotConfiguredException(
                'No BigQuery query specified in configuration!')

        data = json.loads(self.data)
        self.jinja_environment.globals = {
            **self.jinja_environment.globals,
            **data
        }

        query_template = self.jinja_environment.from_string(
            bigquery_config['query'])
        query_template.name = 'query'
        query = query_template.render()

        dialect = 'legacy' if 'dialect' in bigquery_config and bigquery_config[
            'dialect'].lower() == 'legacy' else 'standard'
        self.logger.debug('Running BigQuery query.', extra={'query': query})

        project = bigquery_config[
            'project'] if 'project' in bigquery_config else None
        client = bigquery.Client(client_info=self._get_grpc_client_info(),
                                 project=project)
        labels = {}
        if 'labels' in bigquery_config:
            labels = bigquery_config['labels']
        job_options = bigquery.job.QueryJobConfig(
            use_legacy_sql=True if dialect == 'legacy' else False,
            labels=labels)

        query_job = client.query(query, job_config=job_options)
        results = query_job.result()
        records = []
        for row in results:
            record = {}
            for k in row.keys():
                record[k] = row.get(k)
            records.append(record)
        self.logger.debug('BigQuery execution finished.',
                          extra={'count': len(records)})
        return {
            output_var: records,
        }