in utilities/feature_store_helper.py [0:0]
def _run_query(self, query_string: str, tmp_uri: str, database: str, verbose: bool=True) -> pd.DataFrame:
athena = boto3.client('athena')
# submit the Athena query
if verbose:
print('Running query:\n ' + query_string)
query_execution = athena.start_query_execution(
QueryString=query_string,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': tmp_uri}
)
# wait for the Athena query to complete
query_execution_id = query_execution['QueryExecutionId']
query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
while (query_state != 'SUCCEEDED' and query_state != 'FAILED'):
time.sleep(2)
query_state = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['State']
if query_state == 'FAILED':
print(athena.get_query_execution(QueryExecutionId=query_execution_id))
failure_reason = athena.get_query_execution(QueryExecutionId=query_execution_id)['QueryExecution']['Status']['StateChangeReason']
print(failure_reason)
df = pd.DataFrame()
return df
else:
## TODO: fix this to allow user-defined prefix
results_file_prefix = f'offline-store/query_results/{query_execution_id}.csv'
# Prepare query results for training.
filename = f'query_results_{query_execution_id}.csv'
results_bucket = (tmp_uri.split('//')[1]).split('/')[0]
try:
if verbose:
print(f'results_bucket: {results_bucket}')
print(f'results_file_prefix: {results_file_prefix}')
print(f'filename: {filename}')
s3_client = boto3.client('s3', region_name=self._region)
s3_client.download_file(results_bucket, results_file_prefix, filename)
df = pd.read_csv(filename)
if verbose:
print(f'Query results shape: {df.shape}')
os.remove(filename)
s3_client.delete_object(Bucket=results_bucket, Key=results_file_prefix)
s3_client.delete_object(Bucket=results_bucket, Key=results_file_prefix + '.metadata')
return df
except Exception as inst:
if verbose:
print(f'Failed download')
print(f'Exception: {inst}')
df = None
pass