def _run_query()

in pipeline_utils.py [0:0]


def _run_query(query_string, tmp_uri, database, verbose=True):
    athena = boto3.client('athena')

    # submit the Athena query
    if verbose:
        print('Running query:\n ' + query_string)
    query_execution = athena.start_query_execution(
        QueryString=query_string,
        QueryExecutionContext={'Database': database},
        ResultConfiguration={'OutputLocation': tmp_uri}
    )
    
        # wait for the Athena query to complete
    query_execution_id = query_execution['QueryExecutionId']
    query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
    while (query_state != 'SUCCEEDED' and query_state != 'FAILED'):
        time.sleep(2)
        query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
    
    if query_state == 'FAILED':
        print(athena.get_query_execution(QueryExecutionId=query_execution_id))
        failure_reason = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['StateChangeReason']
        print(failure_reason)
        df = pd.DataFrame()
        return df
    else:
        ## TODO: fix this to avoid hardcoding prefix
        results_file_prefix = f'offline-store/query_results/{query_execution_id}.csv'

        # Prepare query results for training.
        filename = 'query_results.csv'
        results_bucket = (tmp_uri.split('//')[1]).split('/')[0]
        s3_client.download_file(results_bucket, results_file_prefix, filename)
        df = pd.read_csv('query_results.csv')
        os.remove('query_results.csv')
        
        s3_client.delete_object(Bucket=results_bucket, Key=results_file_prefix)
        s3_client.delete_object(Bucket=results_bucket, Key=results_file_prefix + '.metadata')
        return df