def _run_query()

in 06-module-automated-pipeline/pipeline_utils.py [0:0]


def _run_query(query_string, tmp_uri, database, verbose=True):
    athena = boto3.client('athena')

    # submit the Athena query
    if verbose:
        print('Running query:\n ' + query_string)

    query_execution = athena.start_query_execution(
        QueryString=query_string,
        QueryExecutionContext={'Database': database},
        ResultConfiguration={'OutputLocation': tmp_uri}
    )
    
    # wait for the Athena query to complete
    query_execution_id = query_execution['QueryExecutionId']
    query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
    while (query_state != 'SUCCEEDED' and query_state != 'FAILED'):
        time.sleep(2)
        query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
    
    if query_state == 'FAILED':
        print(athena.get_query_execution(QueryExecutionId=query_execution_id))
        failure_reason = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['StateChangeReason']
        print(failure_reason)
        df = pd.DataFrame()
        return df
    else:
        results_bucket = (tmp_uri.split('//')[1]).split('/')[0]
        paths = (tmp_uri.split('//')[1]).split('/')
        results_prefix = '/'.join(paths[1:])
        query_output_file = f'{query_execution_id}.csv'
        results_filename = os.path.join(results_prefix, query_output_file)
        print (f'query results filename: {results_filename}')
        
        # Prepare query results for training.
        s3_client.download_file(results_bucket, results_filename, query_output_file)
        df = pd.read_csv(query_output_file)
        ## TODO: Put back delete of local results file
        ## os.remove('query_results.csv')
        
        ## Delete S3 files holding query results
        s3_client.delete_object(Bucket=results_bucket, Key=results_filename)
        s3_client.delete_object(Bucket=results_bucket, Key=results_filename + '.metadata')
        return df