in 06-module-automated-pipeline/pipeline_utils.py [0:0]
def _run_query(query_string, tmp_uri, database, verbose=True):
athena = boto3.client('athena')
# submit the Athena query
if verbose:
print('Running query:\n ' + query_string)
query_execution = athena.start_query_execution(
QueryString=query_string,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': tmp_uri}
)
# wait for the Athena query to complete
query_execution_id = query_execution['QueryExecutionId']
query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
while (query_state != 'SUCCEEDED' and query_state != 'FAILED'):
time.sleep(2)
query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
if query_state == 'FAILED':
print(athena.get_query_execution(QueryExecutionId=query_execution_id))
failure_reason = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['StateChangeReason']
print(failure_reason)
df = pd.DataFrame()
return df
else:
results_bucket = (tmp_uri.split('//')[1]).split('/')[0]
paths = (tmp_uri.split('//')[1]).split('/')
results_prefix = '/'.join(paths[1:])
query_output_file = f'{query_execution_id}.csv'
results_filename = os.path.join(results_prefix, query_output_file)
print (f'query results filename: {results_filename}')
# Prepare query results for training.
s3_client.download_file(results_bucket, results_filename, query_output_file)
df = pd.read_csv(query_output_file)
## TODO: Put back delete of local results file
## os.remove('query_results.csv')
## Delete S3 files holding query results
s3_client.delete_object(Bucket=results_bucket, Key=results_filename)
s3_client.delete_object(Bucket=results_bucket, Key=results_filename + '.metadata')
return df