in functions/source/loop/loop_lambda.py [0:0]
def lambda_handler(event, context):
"""
This function is called every time an output.manifest is generated.
It reads in the output manifest file and depending the review selection,
either feed the data back for labelling or push the information for
reporting.
"""
print(context)
s3_client = boto3.client('s3')
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
event_time = event['Records'][0]['eventTime']
year = event_time.split("-")[0]
month = event_time.split("-")[1]
# Load the output manifest
key = urllib.parse.unquote_plus(
event['Records'][0]['s3']['object']['key'], encoding='utf-8')
# filename = os.path.basename(key)
# Read ['category-metadata']['job-name']in the output.manifest data
output_manifest_object = s3_client.get_object(Bucket=bucket, Key=key)
output_manifest_body = output_manifest_object['Body']
manifest_data = json.loads(
output_manifest_body.read().decode('utf-8'))
# Output manifest contains information about the input csv
if 'csv_bucket' in manifest_data and 'csv_path' in manifest_data:
print(
'Loading input csv datafile',
manifest_data['csv_bucket'],
manifest_data['csv_path'])
# read csv file
csv_obj = s3_client.get_object(
Bucket=manifest_data['csv_bucket'],
Key=manifest_data['csv_path'],
)
body = csv_obj['Body']
dataframe = pd.read_csv(StringIO(body.read().decode('utf-8')))
# finding patient MRN
patient = manifest_data['mrn']
# figure out the PR
previously_reviewed = manifest_data['pr']
dataframe['PR'] = previously_reviewed
# extracing decision
if 'category' in manifest_data:
decision_dict = manifest_data['category']['caseInfo']['decision']
# dummy variable
name = 'none'
for key in decision_dict:
if decision_dict[key]:
decision = key
# Data to put for another round of review
feedback_path = '{}/{}.csv'.format(
os.environ['FEEDBACK_FOLDER'],
patient,
)
# Outgoing data for reporting
reporting_path = '{}/{}.csv'.format(
os.environ['REPORTING_FOLDER'],
patient,
)
# finding the name of ICP or physician to send the job next
if "send_to_physician" in manifest_data['category']['caseInfo']:
name = manifest_data[
'category']['caseInfo'][
'send_to_physician']
if decision.lower().startswith('case'):
# at least 1 review by ICP and physician has been done
if int(previously_reviewed) >= 1:
if name in ["none", "", "Do not send notification"]:
# Push data for reporting
dataframe = write_json_on_s3(
os.environ['REPORTING_BUCKET'],
reporting_path,
manifest_data,
dataframe,
)
status = 'completed'
write_csv_aggregate(bucket, dataframe, month,
year, patient)
update_timeline(patient, status,
previously_reviewed)
else:
dataframe = write_json_on_s3(
os.environ['FEEDBACK_BUCKET'],
feedback_path,
manifest_data,
dataframe,
)
status = 'incomplete'
update_timeline(patient, status,
previously_reviewed)
else:
dataframe = write_json_on_s3(
os.environ['FEEDBACK_BUCKET'],
feedback_path,
manifest_data,
dataframe,
)
status = 'completed'
update_timeline(patient, status,
previously_reviewed)
# if decision is not sure, we remake the job,
# but keep the progress by rewriting source-csv
elif decision.lower().startswith('notsure'):
# Circlate data for another round of review
dataframe = write_json_on_s3(
os.environ['FEEDBACK_BUCKET'],
feedback_path,
manifest_data,
dataframe,
)
status = 'incomplete'
update_timeline(patient, status,
previously_reviewed)
else:
# Decision "no case"
# at least 1 review by ICP and physician has been done
if int(previously_reviewed) >= 1:
if name in ["none", "", "Do not send notification"]:
# Push data for reporting
dataframe = write_json_on_s3(
os.environ['REPORTING_BUCKET'],
reporting_path,
manifest_data,
dataframe,
)
# writing data to aggregate folder
write_csv_aggregate(bucket, dataframe, month,
year, patient)
status = 'completed'
update_timeline(patient, status,
previously_reviewed)
else:
write_json_on_s3(
os.environ['FEEDBACK_BUCKET'],
feedback_path,
manifest_data,
dataframe,
)
status = 'incomplete'
update_timeline(patient, status,
previously_reviewed)
else:
# sending the job back to source-csv
# to be reviewed by a physician
write_json_on_s3(
os.environ['FEEDBACK_BUCKET'],
feedback_path,
manifest_data,
dataframe,
)
status = 'incomplete'
update_timeline(patient, status,
previously_reviewed)
print("Decision was:", decision)