in processors/bigquery.py [0:0]
def process(self, output_var='records'):
bigquery_config = self.config
if 'query' not in bigquery_config:
raise NotConfiguredException(
'No BigQuery query specified in configuration!')
data = json.loads(self.data)
self.jinja_environment.globals = {
**self.jinja_environment.globals,
**data
}
query_template = self.jinja_environment.from_string(
bigquery_config['query'])
query_template.name = 'query'
query = query_template.render()
dialect = 'legacy' if 'dialect' in bigquery_config and bigquery_config[
'dialect'].lower() == 'legacy' else 'standard'
self.logger.debug('Running BigQuery query.', extra={'query': query})
project = bigquery_config[
'project'] if 'project' in bigquery_config else None
client = bigquery.Client(client_info=self._get_grpc_client_info(),
project=project)
labels = {}
if 'labels' in bigquery_config:
labels = bigquery_config['labels']
job_options = bigquery.job.QueryJobConfig(
use_legacy_sql=True if dialect == 'legacy' else False,
labels=labels)
query_job = client.query(query, job_config=job_options)
results = query_job.result()
records = []
for row in results:
record = {}
for k in row.keys():
record[k] = row.get(k)
records.append(record)
self.logger.debug('BigQuery execution finished.',
extra={'count': len(records)})
return {
output_var: records,
}