in sdlf-utils/pipeline-examples/event-dataset-dependencies/sdlf-engineering-datalakeLibrary/python/datalake_library/transforms/stage_b_transforms/heavy_transform_athena.py [0:0]
def check_job_status(self, bucket, body, processed_keys_path, job_details):
# Runs athena query on the specified database
# Returns query execution ID
def run_athena_query(query_string, db_string, athena_workgroup):
query_execution_id = athena_client.start_query_execution(
QueryString=query_string,
QueryExecutionContext={
'Database': db_string
},
WorkGroup=athena_workgroup
# !!!!!!! Create this workgroup and assign it the s3 athena bucket
#######################################################
# workgroup 'OutputLocation': 's3://client-datalake-<env>-us-east-1-123456789000-athena-results/'
#
)
return query_execution_id
def athena_status(query_execution_id):
state = 'QUEUED'
while state == 'QUEUED':
query_response = athena_client.get_query_execution(
QueryExecutionId=query_execution_id['QueryExecutionId'])
logger.info(f'Executing - query id: {query_execution_id}')
if 'QueryExecution' in query_response and \
'Status' in query_response['QueryExecution'] and \
'State' in query_response['QueryExecution']['Status']:
state = query_response['QueryExecution']['Status']['State']
error = ''
if state == 'FAILED':
error = query_response['QueryExecution']['Status']['StateChangeReason']
return state, error
elif state != 'QUEUED':
return state, error
time.sleep(5)
def get_athena_results(query_execution_id):
query_results = athena_client.get_query_results(
QueryExecutionId=query_execution_id['QueryExecutionId'],
MaxResults=100
)
logger.info(query_results)
return query_results
num_of_steps = job_details['num_of_steps']
current_step = job_details['current_step']
team = body['team']
date_substitutions = body.get('date_substitutions', [])
pipeline = body['pipeline']
table_name = body['dest_table']['name']
dest_part_value =body['dest_table'].get('part_value', None)
db = body['dest_db']
steps = job_details['steps']
status = job_details.get('jobStatus', "STARTING_NEXT_QUERY")
step = steps[current_step]
sql = step.get('sql','')
sql_file = step.get('sql_file','')
database = step['db']
info = step['info']
current_step += 1
query = ''
if status == "STARTING_NEXT_QUERY":
ssmcli = boto3.client('ssm')
ssmresponse = ssmcli.get_parameter(
Name='/SDLF/Misc/pEnv'
)
db_env = ssmresponse['Parameter']['Value']
ssmresponse = ssmcli.get_parameter(
Name=f'/SDLF/ATHENA/{team}/{pipeline}/WorkgroupName'
)
workgroup = ssmresponse['Parameter']['Value']
if sql != '':
query = sql
elif sql_file != '':
sql_file = f'artifacts/athena_queries/{team}/{sql_file}'
query = s3_interface.read_object(artifacts_bucket, sql_file).getvalue()
query = query.replace('$ENV', db_env)
query = query.replace('$dt', dest_part_value)
else:
logger.error('No sql or file provided')
if dest_part_value and date_substitutions:
for substitution in date_substitutions:
query = query.replace(substitution['token'],(datetime.strptime(dest_part_value,'%Y%m%d')
- relativedelta(**substitution['relativedelta_attributes'])).strftime(substitution['format']))
logger.info(f'Athena Light Transform step {current_step}/{num_of_steps} [{info}] STARTED')
logger.info(f'Executing query: {query}')
query_id = run_athena_query(query, database, workgroup)
job_details['query_id'] = query_id
status, error_log = athena_status(query_id)
elif status in ['RUNNING', 'QUEUED']:
query_id = job_details['query_id']
status, error_log = athena_status(query_id)
if status == 'FAILED':
logger.error(f'Athena heavy Transform step {current_step}/{num_of_steps} [{info}] FAILED')
logger.error(f'Athena error: {error_log}')
elif status == 'SUCCEEDED':
# processed_keys = s3_interface.list_objects(stage_bucket, target_table_full_path)
query_result = get_athena_results(query_id)
logger.info(f'Athena heavy Transform step {current_step}/{num_of_steps} [{info}] SUCCEEDED')
logger.info(f'Query result :{query_result}')
job_details['current_step'] = current_step
if current_step == num_of_steps:
status = 'SUCCEEDED'
else:
status = 'STARTING_NEXT_QUERY'
job_details['jobStatus'] = status
response = {
'processedKeysPath': processed_keys_path,
'jobDetails': job_details
}
#######################################################
# IMPORTANT
# This function must return a dictionary object with at least a reference to:
# 1) processedKeysPath (i.e. S3 path where job outputs data without the s3://stage-bucket/ prefix)
# 2) jobDetails (i.e. a Dictionary holding information about the job
# e.g. jobName and jobId for Glue or clusterId and stepId for EMR
# A jobStatus key MUST be present in jobDetails as it's used to determine the status of the job)
# Example: {processedKeysPath' = 'post-stage/legislators',
# 'jobDetails':
# {'jobName': 'sdlf-engineering-e_perm-glue-job', 'jobId': 'jr-2ds438nfinev34', 'jobStatus': 'RUNNING'}}
#######################################################
return response