def check_job_status()

in sdlf-utils/pipeline-examples/event-dataset-dependencies/sdlf-engineering-datalakeLibrary/python/datalake_library/transforms/stage_a_transforms/light_transform_athena_ctas.py [0:0]


    def check_job_status(self, bucket, body, processed_keys_path, job_details):
        # Runs athena query on the specified database
        # Returns query execution ID
        def run_athena_query(query_string, db_string, athena_workgroup):
            query_execution_id = athena_client.start_query_execution(
                QueryString=query_string,
                QueryExecutionContext={
                    'Database': db_string
                },
                WorkGroup=athena_workgroup
            )
            return query_execution_id

        def athena_status(query_execution_id):
            state = 'QUEUED'
            while state == 'QUEUED':
                query_response = athena_client.get_query_execution(
                    QueryExecutionId=query_execution_id['QueryExecutionId'])
                logger.info(f'Executing - query id: {query_execution_id}')
                if 'QueryExecution' in query_response and \
                        'Status' in query_response['QueryExecution'] and \
                        'State' in query_response['QueryExecution']['Status']:
                    state = query_response['QueryExecution']['Status']['State']
                    error = ''
                    if state == 'FAILED':
                        error = query_response['QueryExecution']['Status']['StateChangeReason']
                        return state, error
                    elif state != 'QUEUED':
                        return state, error
                time.sleep(5)

        def get_athena_results(query_execution_id):
            query_results = athena_client.get_query_results(
                QueryExecutionId=query_execution_id['QueryExecutionId'],
                MaxResults=100
            )
            logger.info(query_results)
            return query_results

        try:
            num_of_steps = job_details['num_of_steps']
            current_step = job_details['current_step']
            partitions = job_details['partitions']
            workgroup = job_details['workgroup']
            steps = job_details['steps']
            status = job_details.get('jobStatus', "STARTING_NEXT_QUERY")
            target_table_full_path = job_details['target_table_full_path']
            source_db = job_details['source_db']
            source_table = job_details['source_table']
            target_db = job_details['target_db']
            target_table = job_details['target_table']
            step = steps[current_step]
            sql = step['sql']
            database = step['db']
            info = step['info']
            current_step += 1

            if status == "STARTING_NEXT_QUERY":
                query = sql
                logger.info(f'Athena Light Transform step {current_step}/{num_of_steps} [{info}] STARTED')
                logger.info(f'Executing query: {query}')
                query_id = run_athena_query(query, database, workgroup)
                job_details['query_id'] = query_id
                status, error_log = athena_status(query_id)
            elif status in ['RUNNING', 'QUEUED']:
                query_id = job_details['query_id']
                status, error_log = athena_status(query_id)
            dictionary = dict()
            if status == 'FAILED':
                logger.error(f'Athena heavy Transform step {current_step}/{num_of_steps} [{info}] FAILED')
                logger.error(f'Athena error: {error_log}')
            elif status == 'SUCCEEDED':
                query_result = get_athena_results(query_id)
                logger.info(f'Athena heavy Transform step {current_step}/{num_of_steps} [{info}] SUCCEEDED')
                logger.info(f'Query result :{query_result}')
                job_details['current_step'] = current_step
                if current_step == num_of_steps:
                    status = 'SUCCEEDED'
                    logger.info('Listing s3 created files to send to stageB')
                    processed_keys = s3_interface.list_objects(stage_bucket, target_table_full_path)
                    dictionary['processed_keys'] = processed_keys
                    dictionary['raw_db'] = source_db
                    dictionary['raw_table'] = source_table
                    dictionary['prestage_db'] = target_db
                    dictionary['prestage_table'] = target_table
                    dictionary['partitions'] = partitions
                    logger.info(f'Process finished, returning dict: {dictionary}')
                else:
                    status = 'STARTING_NEXT_QUERY'

            job_details['jobStatus'] = status
            response = {
                'processedKeysPath': processed_keys_path,
                'jobDetails': job_details,
                'processOutput': dictionary
            }

            #######################################################
            # IMPORTANT
            # This function must return a dictionary object with at least a reference to:
            # 1) processedKeysPath (i.e. S3 path where job outputs data without the s3://stage-bucket/ prefix)
            # 2) jobDetails (i.e. a Dictionary holding information about the job
            # e.g. jobName and jobId for Glue or clusterId and stepId for EMR
            # A jobStatus key MUST be present in jobDetails as it's used to determine the status of the job)
            # Example: {processedKeysPath' = 'post-stage/legislators',
            # 'jobDetails':
            # {'jobName': 'sdlf-engineering-e_perm-glue-job', 'jobId': 'jr-2ds438nfinev34', 'jobStatus': 'RUNNING'}}
            #######################################################

            return response

        except Exception as exp:
            exception_type, exception_value, exception_traceback = sys.exc_info()
            traceback_string = traceback.format_exception(exception_type, exception_value, exception_traceback)
            err_msg = json.dumps({
                "errorType": exception_type.__name__,
                "errorMessage": str(exception_value),
                "stackTrace": traceback_string
            })
            logger.error(err_msg)
            try:
                if not partitions:
                    revert_step = steps[-1]
                    logger.info(f'An error ocurred, trying to rollback ddl changes')
                    run_athena_query(revert_step['sql'], revert_step['db'], workgroup)
            except Exception as exp:
                exception_type, exception_value, exception_traceback = sys.exc_info()
                traceback_string = traceback.format_exception(exception_type, exception_value, exception_traceback)
                err_msg = json.dumps({
                    "errorType": exception_type.__name__,
                    "errorMessage": str(exception_value),
                    "stackTrace": traceback_string
                })
                logger.error(err_msg)