def run_bq_job()

in airflow/dags_template/generic-api-service.py [0:0]


def run_bq_job(**kwargs):
    task_instance = kwargs['ti']
    def _json_pairs(alias, dynamic_query_string, static_query_string):
        def _to_query_pairs(items, remove_quotes=False):
            q = '`' if remove_quotes else '"'
            return [f'\'"{key}":"\', {q}{value}{q},\'"\'' for key, value in items.items()]
            
        query_pairs =  _to_query_pairs(dynamic_query_string, True) 
        query_pairs.extend(_to_query_pairs(static_query_string))
        
        json_pairs =  ", ' , ',".join(query_pairs)
       
        return "CONCAT('{',"+json_pairs+",'}') AS "+alias

    secret_name = api_config.get('auth').get('secret_name','none')
    sql_query_secret = '"{}" as secret_name'.format(secret_name)
    sql_query = _json_pairs('query_string', dynamic_query_string, static_query_string)
    sql_body = _json_pairs('body',dynamic_body, static_body)
    sql_headers = _json_pairs('headers',dynamic_headers, static_headers)

    sql_query_source = ', '.join([sql_query_secret, sql_query, sql_body,  sql_headers])

    sql = """