in pipeline_utils.py [0:0]
def _run_query(query_string, tmp_uri, database, verbose=True):
athena = boto3.client('athena')
# submit the Athena query
if verbose:
print('Running query:\n ' + query_string)
query_execution = athena.start_query_execution(
QueryString=query_string,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': tmp_uri}
)
# wait for the Athena query to complete
query_execution_id = query_execution['QueryExecutionId']
query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
while (query_state != 'SUCCEEDED' and query_state != 'FAILED'):
time.sleep(2)
query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
if query_state == 'FAILED':
print(athena.get_query_execution(QueryExecutionId=query_execution_id))
failure_reason = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['StateChangeReason']
print(failure_reason)
df = pd.DataFrame()
return df
else:
## TODO: fix this to avoid hardcoding prefix
results_file_prefix = f'offline-store/query_results/{query_execution_id}.csv'
# Prepare query results for training.
filename = 'query_results.csv'
results_bucket = (tmp_uri.split('//')[1]).split('/')[0]
s3_client.download_file(results_bucket, results_file_prefix, filename)
df = pd.read_csv('query_results.csv')
os.remove('query_results.csv')
s3_client.delete_object(Bucket=results_bucket, Key=results_file_prefix)
s3_client.delete_object(Bucket=results_bucket, Key=results_file_prefix + '.metadata')
return df