in sdlf-utils/pipeline-examples/event-dataset-dependencies/sdlf-engineering-datalakeLibrary/python/datalake_library/transforms/stage_a_transforms/light_transform_athena_ctas_sqoop.py [0:0]
def check_job_status(self, bucket, body, processed_keys_path, job_details):
# Runs athena query on the specified database
# Returns query execution ID
def run_athena_query(query_string, db_string, athena_workgroup):
query_execution_id = athena_client.start_query_execution(
QueryString=query_string,
QueryExecutionContext={
'Database': db_string
},
WorkGroup=athena_workgroup
)
return query_execution_id
def athena_status(query_execution_id):
state = 'QUEUED'
while state == 'QUEUED':
query_response = athena_client.get_query_execution(
QueryExecutionId=query_execution_id['QueryExecutionId'])
logger.info(f'Executing - query id: {query_execution_id}')
if 'QueryExecution' in query_response and \
'Status' in query_response['QueryExecution'] and \
'State' in query_response['QueryExecution']['Status']:
state = query_response['QueryExecution']['Status']['State']
error = ''
if state == 'FAILED':
error = query_response['QueryExecution']['Status']['StateChangeReason']
return state, error
elif state != 'QUEUED':
return state, error
time.sleep(5)
def get_athena_results(query_execution_id):
query_results = athena_client.get_query_results(
QueryExecutionId=query_execution_id['QueryExecutionId'],
MaxResults=100
)
logger.info(query_results)
return query_results
try:
num_of_steps = job_details['num_of_steps']
current_step = job_details['current_step']
partitions = job_details['partitions']
workgroup = job_details['workgroup']
steps = job_details['steps']
status = job_details.get('jobStatus', "STARTING_NEXT_QUERY")
target_table_full_path = job_details['target_table_full_path']
source_db = job_details['source_db']
source_table = job_details['source_table']
target_db = job_details['target_db']
target_table = job_details['target_table']
step = steps[current_step]
sql = step['sql']
database = step['db']
info = step['info']
current_step += 1
if status == "STARTING_NEXT_QUERY":
query = sql
logger.info(f'Athena Light Transform step {current_step}/{num_of_steps} [{info}] STARTED')
logger.info(f'Executing query: {query}')
query_id = run_athena_query(query, database, workgroup)
job_details['query_id'] = query_id
status, error_log = athena_status(query_id)
elif status in ['RUNNING', 'QUEUED']:
query_id = job_details['query_id']
status, error_log = athena_status(query_id)
dictionary = dict()
if status == 'FAILED':
logger.error(f'Athena heavy Transform step {current_step}/{num_of_steps} [{info}] FAILED')
logger.error(f'Athena error: {error_log}')
elif status == 'SUCCEEDED':
query_result = get_athena_results(query_id)
logger.info(f'Athena heavy Transform step {current_step}/{num_of_steps} [{info}] SUCCEEDED')
logger.info(f'Query result :{query_result}')
job_details['current_step'] = current_step
if current_step == num_of_steps:
status = 'SUCCEEDED'
logger.info('Listing s3 created files to send to stageB')
processed_keys = s3_interface.list_objects(stage_bucket, target_table_full_path)
dictionary['processed_keys'] = processed_keys
dictionary['raw_db'] = source_db
dictionary['raw_table'] = source_table
dictionary['prestage_db'] = target_db
dictionary['prestage_table'] = target_table
dictionary['partitions'] = partitions
logger.info(f'Process finished, returning dict: {dictionary}')
else:
status = 'STARTING_NEXT_QUERY'
job_details['jobStatus'] = status
response = {
'processedKeysPath': processed_keys_path,
'jobDetails': job_details,
'processOutput': dictionary
}
#######################################################
# IMPORTANT
# This function must return a dictionary object with at least a reference to:
# 1) processedKeysPath (i.e. S3 path where job outputs data without the s3://stage-bucket/ prefix)
# 2) jobDetails (i.e. a Dictionary holding information about the job
# e.g. jobName and jobId for Glue or clusterId and stepId for EMR
# A jobStatus key MUST be present in jobDetails as it's used to determine the status of the job)
# Example: {processedKeysPath' = 'post-stage/legislators',
# 'jobDetails':
# {'jobName': 'sdlf-engineering-e_perm-glue-job', 'jobId': 'jr-2ds438nfinev34', 'jobStatus': 'RUNNING'}}
#######################################################
return response
except Exception as exp:
exception_type, exception_value, exception_traceback = sys.exc_info()
traceback_string = traceback.format_exception(exception_type, exception_value, exception_traceback)
err_msg = json.dumps({
"errorType": exception_type.__name__,
"errorMessage": str(exception_value),
"stackTrace": traceback_string
})
logger.error(err_msg)
try:
if not partitions:
revert_step = steps[-1]
logger.info(f'An error ocurred, trying to rollback ddl changes')
run_athena_query(revert_step['sql'], revert_step['db'], workgroup)
except Exception as exp:
exception_type, exception_value, exception_traceback = sys.exc_info()
traceback_string = traceback.format_exception(exception_type, exception_value, exception_traceback)
err_msg = json.dumps({
"errorType": exception_type.__name__,
"errorMessage": str(exception_value),
"stackTrace": traceback_string
})
logger.error(err_msg)