def stage_b_job_status()

in CommonLayerCode/datalake-library/python/datalake_library/transforms/transform_handler.py [0:0]


    def stage_b_job_status(self, bucket, keys, team, dataset, processed_keys_path, job_details): 
        """Checks completion of Stage B Job 
         
        Arguments: 
            bucket {string} -- Origin S3 bucket 
            keys {string} -- Keys to transform 
            processed_keys_path {string} -- Job output S3 path 
            job_details {string} -- Details about job to monitor
            team {string} -- Team owning the transformation   
            dataset {string} -- Dataset targeted by transformation
        Returns: 
            {dict} -- Dictionary of Bucket and Keys transformed 
        """ 
        transform_info = self.get_transform_info('{}-{}'.format(team, dataset)) 
        module = import_module('datalake_library.transforms.stage_b_transforms.{}'.format(transform_info['stage_b_transform']))
        Transform = getattr(module, 'CustomTransform') 
        try: 
            response = Transform().check_job_status(bucket, keys, processed_keys_path, job_details) 
        except Exception as e: 
            raise e 
         
        if ((len(response) == 0) or (not isinstance(response, dict)) or ('processedKeysPath' not in response)
         or ('jobDetails' not in response) or ('jobStatus' not in response['jobDetails'])): 
            raise ValueError("Invalid dictionary - Aborting") 
        if response['jobDetails']['jobStatus'] == 'FAILED':
            raise ValueError('Job Failed') 
        elif response['jobDetails']['jobStatus'] == 'SUCCEEDED': 
            logger.info("Objects successfully transformed") 
 
        return response