in CommonLayerCode/datalake-library/python/datalake_library/transforms/transform_handler.py [0:0]
def stage_b_job_status(self, bucket, keys, team, dataset, processed_keys_path, job_details):
"""Checks completion of Stage B Job
Arguments:
bucket {string} -- Origin S3 bucket
keys {string} -- Keys to transform
processed_keys_path {string} -- Job output S3 path
job_details {string} -- Details about job to monitor
team {string} -- Team owning the transformation
dataset {string} -- Dataset targeted by transformation
Returns:
{dict} -- Dictionary of Bucket and Keys transformed
"""
transform_info = self.get_transform_info('{}-{}'.format(team, dataset))
module = import_module('datalake_library.transforms.stage_b_transforms.{}'.format(transform_info['stage_b_transform']))
Transform = getattr(module, 'CustomTransform')
try:
response = Transform().check_job_status(bucket, keys, processed_keys_path, job_details)
except Exception as e:
raise e
if ((len(response) == 0) or (not isinstance(response, dict)) or ('processedKeysPath' not in response)
or ('jobDetails' not in response) or ('jobStatus' not in response['jobDetails'])):
raise ValueError("Invalid dictionary - Aborting")
if response['jobDetails']['jobStatus'] == 'FAILED':
raise ValueError('Job Failed')
elif response['jobDetails']['jobStatus'] == 'SUCCEEDED':
logger.info("Objects successfully transformed")
return response