in CommonLayerCode/datalake-library/python/datalake_library/transforms/stage_b_transforms/heavy_transform_blueprint.py [0:0]
def transform_object(self, bucket, keys, team, dataset):
#######################################################
# We assume a Glue Job has already been created based on
# customer needs. This function makes an API call to start it
#######################################################
# Name of the Glue Job
job_name = 'sdlf-{}-{}-glue-job'.format(team, dataset)
# S3 Path where Glue Job outputs processed keys
# IMPORTANT: Build the output s3_path without the s3://stage-bucket/
processed_keys_path = 'post-stage/{}/{}'.format(team, dataset)
# Submitting a new Glue Job
job_response = client.start_job_run(
JobName=job_name,
Arguments={
# Specify any arguments needed based on bucket and keys (e.g. input/output S3 locations)
'--JOB_NAME': job_name,
'--SOURCE_LOCATION': 's3://{}/{}'.format(bucket, keys[0].rsplit('/', 1)[0]),
'--OUTPUT_LOCATION': 's3://{}/{}'.format(bucket, processed_keys_path),
'--job-bookmark-option': 'job-bookmark-enable'
},
MaxCapacity=2.0
)
# Collecting details about Glue Job after submission (e.g. jobRunId for Glue)
json_data = json.loads(json.dumps(
job_response, default=datetimeconverter))
job_details = {
"jobName": job_name,
"jobRunId": json_data.get('JobRunId'),
"jobStatus": 'STARTED'
}
#######################################################
# IMPORTANT
# This function must return a dictionary object with at least a reference to:
# 1) processedKeysPath (i.e. S3 path where job outputs data without the s3://stage-bucket/ prefix)
# 2) jobDetails (i.e. a Dictionary holding information about the job
# e.g. jobName and jobId for Glue or clusterId and stepId for EMR
# A jobStatus key MUST be present in jobDetails as it's used to determine the status of the job)
# Example: {processedKeysPath' = 'post-stage/engineering/meteorites',
# 'jobDetails': {'jobName': 'meteorites-glue-job', 'jobId': 'jr-2ds438nfinev34', 'jobStatus': 'STARTED'}}
#######################################################
response = {
'processedKeysPath': processed_keys_path,
'jobDetails': job_details
}
return response