def transform_object()

in sdlf-utils/pipeline-examples/manifests/transforms/heavy_transform_manifest.py [0:0]


    def transform_object(self, bucket, keys, team, dataset):
        #######################################################
        # We assume a Glue Job has already been created based on
        # customer needs. This function makes an API call to start it
        #######################################################
        job_name = 'sdlf-{}-{}-glue-job'.format(team, dataset)  # Name of the Glue Job

        ### Create the list of s3 keys to be processed by the glue job
        ### keys will contain a single file for manifest processing
        
        items = get_manifest_data(bucket, team, dataset,keys[0])

        s3_keys = get_s3_keys(items)

        files = []
        file_names=""
        for key in s3_keys:
            files.append(
                key.split('/')[-1]
            )
            if file_names is not None:
                file_names=file_names+"|"+key
            else:
                file_names=key

        ### Update Manifests Control Table
        ddb_keys = get_ddb_keys(items)

        dynamo_config = DynamoConfiguration()
        dynamo_interface = DynamoInterface(dynamo_config)

        for ddb_key in ddb_keys:
            dynamo_interface.update_manifests_control_table_stageb(ddb_key,"PROCESSING")
        


        # S3 Path where Glue Job outputs processed keys
        # IMPORTANT: Build the output s3_path without the s3://stage-bucket/
        processed_keys_path = 'post-stage/{}/{}'.format(team, dataset)
        # Submitting a new Glue Job
        job_response = client.start_job_run(
            JobName=job_name,
            Arguments={
                # Specify any arguments needed based on bucket and keys (e.g. input/output S3 locations)
                '--JOB_NAME': 'sdlf-{}-{}-glue-job'.format(team, dataset),
                '--SOURCE_LOCATION': 's3://{}/'.format(bucket),
                '--OUTPUT_LOCATION': 's3://{}/{}'.format(bucket, processed_keys_path),
                '--FILE_NAMES': file_names,
                '--job-bookmark-option': 'job-bookmark-enable'
            },
            MaxCapacity=2.0
        )
        # Collecting details about Glue Job after submission (e.g. jobRunId for Glue)
        json_data = json.loads(json.dumps(
            job_response, default=datetimeconverter))
        job_details = {
            "jobName": job_name,
            "jobRunId": json_data.get('JobRunId'),
            "jobStatus": 'STARTED',
            "files": list(set(files))
        }

        #######################################################
        # IMPORTANT
        # This function must return a dictionary object with at least a reference to:
        # 1) processedKeysPath (i.e. S3 path where job outputs data without the s3://stage-bucket/ prefix)
        # 2) jobDetails (i.e. a Dictionary holding information about the job
        # e.g. jobName and jobId for Glue or clusterId and stepId for EMR
        # A jobStatus key MUST be present in jobDetails as it's used to determine the status of the job)
        # Example: {processedKeysPath' = 'post-stage/engineering/legislators',
        # 'jobDetails': {'jobName': 'sdlf-engineering-legislators-glue-job', 'jobId': 'jr-2ds438nfinev34', 'jobStatus': 'STARTED'}}
        #######################################################
        response = {
            'processedKeysPath': processed_keys_path,
            'jobDetails': job_details
        }

        return response