in sdlf-utils/pipeline-examples/manifests/stageB/lambda/stage-b-processmanifest/src/lambda_function.py [0:0]
def lambda_handler(event, context):
"""Checks if the file to be processed is manifest driven
Arguments:
event {dict} -- Dictionary with details on previous processing step
context {dict} -- Dictionary with details on Lambda context
Returns:
{dict} -- Dictionary with Processed Bucket and Key(s)
"""
try:
logger.info('Fetching event data from previous step')
bucket = event['body']['bucket']
keys_to_process = event['body']['keysToProcess']
team = event['body']['team']
pipeline = event['body']['pipeline']
stage = event['body']['pipeline_stage']
dataset = event['body']['dataset']
peh_id = event['body']['peh_id']
manifest_data_timeout = int(
event['body']['manifest_details']['manifest_data_timeout'])
current_time = dt.datetime.utcnow()
current_timestamp = current_time.timestamp()
if 'manifest_interval' in event['body']:
manifest_interval = event['body']['manifest_interval']
else:
manifest_interval = current_timestamp
logger.info('Initializing Octagon client')
component = context.function_name.split('-')[-2].title()
octagon_client = (
octagon.OctagonClient()
.with_run_lambda(True)
.with_configuration_instance(event['body']['env'])
.build()
)
peh.PipelineExecutionHistoryAPI(
octagon_client).retrieve_pipeline_execution(peh_id)
### Set max_items_process in datasets table so that the statemachine only processes 1 manifest file at a time
ddb_keys = get_ddb_keys(keys_to_process, bucket, team, dataset)
dynamo_config = DynamoConfiguration()
dynamo_interface = DynamoInterface(dynamo_config)
### Query Manifest Control Table to get the status
items = []
logger.info("Querying DynamoDB to check data in manifests control table for Stage A status")
for ddb_key in ddb_keys:
try:
items.append(dynamo_interface.get_item_from_manifests_control_table(ddb_key["dataset_name"], ddb_key["manifest_file_name"], ddb_key["datafile_name"]))
except KeyError:
logger.error("The manifest file has not been processed in Stage A")
raise Exception("Manifest File has not been processed in Stage A")
### Check stage a status for data files
logger.info("Checking to see if all the files have been processed in Stage A")
status_message_list = []
failed_status_message_list=[]
wait_message_counter = 0
failed_message_counter = 0
for item in items:
if "stage_a_status" in item:
stage_a_status = item["stage_a_status"]
else:
stage_a_status = "NOT STARTED"
if stage_a_status != "COMPLETED" and stage_a_status != "FAILED":
status_message_list.append(
"Waiting for Data File {}".format(item["datafile_name"].split("-")[-1]))
wait_message_counter +=1
elif stage_a_status == "FAILED":
failed_status_message_list.append(
"Data Files Failed in Stage A {}".format(item["datafile_name"].split("-")[-1]))
failed_message_counter +=1
if failed_message_counter > 0 :
logger.error("Data File Failure in Stage A, Processing will stop")
logger.error("The following files have failed in Stage A")
for message in failed_status_message_list:
logger.error(message)
### Update manifest control table, mark all files as failed in Stage B
for ddb_key in ddb_keys:
update_key = dynamo_interface.manifest_keys(ddb_key["dataset_name"], ddb_key["manifest_file_name"],ddb_key["datafile_name"])
dynamo_interface.update_manifests_control_table_stageb(
update_key, "FAILED", None, "Datafile Failed in Stage A")
raise Exception("Data File Failure in Stage A")
if wait_message_counter > 0:
logger.info("Waiting for Data Files to be processed in Stage A")
for message in status_message_list:
logger.info(message)
logger.info ("Will sleep for 5 mins")
time.sleep(300)
data_file_wait="True"
if manifest_interval == current_timestamp:
current_timestamp = dt.datetime.utcnow().timestamp()
if int((current_timestamp - manifest_interval)/60) >= manifest_data_timeout:
logger.error("Data File Threshold Breached")
logger.error("Stage B Processing Will Stop Now")
data_file_wait="False"
for message in status_message_list:
logger.error(message)
### Update manifest control table, mark all files as failed in Stage B
for ddb_key in ddb_keys:
update_key = dynamo_interface.manifest_keys(
ddb_key["dataset_name"], ddb_key["manifest_file_name"], ddb_key["datafile_name"])
dynamo_interface.update_manifests_control_table_stageb(
update_key, "FAILED", None, "Datafile threshold Breached")
raise Exception("Data File Threshold Breached")
else:
logger.info("All files processed in Stage A")
data_file_wait = "False"
for ddb_key in ddb_keys:
update_key = dynamo_interface.manifest_keys(
ddb_key["dataset_name"], ddb_key["manifest_file_name"], ddb_key["datafile_name"])
dynamo_interface.update_manifests_control_table_stageb(
update_key, "STARTED")
event["body"]["manifest_interval"] = manifest_interval
event["body"]["data_file_wait"] = data_file_wait
remove_content_tmp()
octagon_client.update_pipeline_execution(
status="{} {} Processing".format(stage, component), component=component)
except Exception as e:
logger.error("Fatal error", exc_info=True)
octagon_client.end_pipeline_execution_failed(component=component,
issue_comment="{} {} Error: {}".format(stage, component, repr(e)))
remove_content_tmp()
raise e
return event