in airflow/dags_template/generic-api-service.py [0:0]
def run_bq_job(**kwargs):
task_instance = kwargs['ti']
def _json_pairs(alias, dynamic_query_string, static_query_string):
def _to_query_pairs(items, remove_quotes=False):
q = '`' if remove_quotes else '"'
return [f'\'"{key}":"\', {q}{value}{q},\'"\'' for key, value in items.items()]
query_pairs = _to_query_pairs(dynamic_query_string, True)
query_pairs.extend(_to_query_pairs(static_query_string))
json_pairs = ", ' , ',".join(query_pairs)
return "CONCAT('{',"+json_pairs+",'}') AS "+alias
secret_name = api_config.get('auth').get('secret_name','none')
sql_query_secret = '"{}" as secret_name'.format(secret_name)
sql_query = _json_pairs('query_string', dynamic_query_string, static_query_string)
sql_body = _json_pairs('body',dynamic_body, static_body)
sql_headers = _json_pairs('headers',dynamic_headers, static_headers)
sql_query_source = ', '.join([sql_query_secret, sql_query, sql_body, sql_headers])
sql = """