in repos/serving/lambdas/functions/processing-job-execution/lambda_function.py [0:0]
def lambda_handler(event, context):
"""Calls custom job waiter developed by user
Arguments:
event {dict} -- Dictionary with details on previous processing step
context {dict} -- Dictionary with details on Lambda context
Returns:
{dict} -- Dictionary with Processed Bucket, Key(s) and Job Details
"""
try:
logger.info("Lambda event is [{}]".format(event))
logger.info(event["body"])
source_bucket = event["body"]["bucket"]
job_name = event["body"]["targetJob"]
ddb_table = event["body"]["targetDDBTable"]
token = event["body"]["token"]
s3_prefix_key_proc = event["body"]["keysRawProc"]
logger.info(
"[{}] [{}] [{}] [{}]".format(
source_bucket,
s3_prefix_key_proc,
job_name,
ddb_table,
)
)
# Submitting a new Glue Job
job_response = client.start_job_run(
JobName=job_name,
Arguments={
# Specify any arguments needed based on bucket and keys (e.g. input/output S3 locations)
"--job-bookmark-option": "job-bookmark-enable",
"--additional-python-modules": "pyarrow==2,awswrangler==2.9.0",
# Custom arguments below
"--TARGET_DDB_TABLE": ddb_table,
"--S3_BUCKET": source_bucket,
"--S3_PREFIX_PROCESSED": s3_prefix_key_proc[0]
#
},
MaxCapacity=2.0,
)
logger.info("Response is [{}]".format(job_response))
# Collecting details about Glue Job after submission (e.g. jobRunId for Glue)
json_data = json.loads(json.dumps(job_response, default=datetimeconverter))
job_details = {
"jobName": job_name,
"jobRunId": json_data.get("JobRunId"),
"jobStatus": "STARTED",
"token": token,
}
response = {"jobDetails": job_details}
except Exception as e:
logger.error("Fatal error", exc_info=True)
sagemaker.send_pipeline_execution_step_failure(
CallbackToken=token, FailureReason="error"
)
raise e
return response