in cid/helpers/athena.py [0:0]
def execute_query(self, sql_query, sleep_duration=1) -> str:
""" Executes an AWS Athena Query """
# Set execution context
execution_context = {'Database': self.DatabaseName}
try:
# Start Athena query
response = self.client.start_query_execution(
QueryString=sql_query,
QueryExecutionContext=execution_context,
WorkGroup='primary'
)
# Get Query ID
query_id = response.get('QueryExecutionId')
# Get Query Status
query_status = self.client.get_query_execution(QueryExecutionId=query_id)
except self.client.exceptions.InvalidRequestException as e:
logger.error(f'InvalidRequestException: {e}')
exit(1)
except Exception as e:
logger.error('Athena query failed: {}'.format(e))
logger.error('Full query: {}'.format(sql_query))
exit(1)
current_status = query_status['QueryExecution']['Status']['State']
# Poll for the current status of query as long as its not finished
while current_status in ['SUBMITTED', 'RUNNING', 'QUEUED']:
response = self.client.get_query_execution(QueryExecutionId=query_id)
current_status = response['QueryExecution']['Status']['State']
# Sleep before polling again
time.sleep(sleep_duration)
# Return result, either positive or negative
if (current_status == "SUCCEEDED"):
return query_id
else:
failure_reason = response['QueryExecution']['Status']['StateChangeReason']
logger.error('Athena query failed: {}'.format(failure_reason))
logger.error(f'Failure reason: {failure_reason}')
logger.info('Full query: {}'.format(sql_query))
exit(1)