in old_reference/er7_to_json.py [0:0]
def lambda_handler(event, context):
logger.info("Start")
s3 = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
bucket = os.environ['data_lake_bucket']
logger.info(event)
# ----------------------------------- Parse to ER7 object
try:
body = json.loads(event["body"])
message = decode_from_base64(body['msg'], body['encoding'])
# Replace segment terminators if provided
try: message = fix_segment_terminators(message, body['segTerm'])
except: pass
logger.info("Message: " + message)
er7_msg = parser.parse_message(message)
except Exception as e:
errMsg = str(e)
logger.error(e)
return {
'statusCode': 400,
'body': json.dumps("Could not convert to ER7 object: "+errMsg)
}
msg_id = er7_msg.children[0].MESSAGE_CONTROL_ID.value
event = er7_msg.children[0].MSH_9.value
version = er7_msg.children[0].MSH_12.value
logger.info("Parsed message {} to ER7".format(msg_id))
# ----------------------------------- Convert to JSON
json_msg = {}
try:
for c in er7_msg.children:
add_child_element(json_msg, c)
except KeyError as e:
errMsg = "Our library does not support {} events for HL7 version {}, message {} not written".format(
event, version, msg_id)
logger.error(errMsg)
error_key = "error/hl7v2/{}_{}_{}.txt".format(version,event,msg_id)
s3.put_object(
Bucket=bucket,
Key=error_key,
Body=message
)
table = dynamodb.Table(os.environ['error_table'])
table.put_item(
Item={
'type': "HL7_"+event+"_"+version,
's3_key': error_key,
'provided_id': msg_id
}
)
return {
'statusCode': 400,
'body': json.dumps(errMsg)
}
logger.info("ER7 converted to JSON")
# ------------------------------------------------- Get our catalog data
logger.debug("Getting PID codes")
pids = get_pid_codes(json_msg)
for pid in pids:
logger.info("Found local PID: "+pid)
logger.debug("Get our patient global ID")
global_id = get_global_id(pids)
logger.info("Global ID: " + global_id)
logger.info("Setting the keys")
timestamp = str(json_msg["MSH"]["MSH_7"]["TS_1"])
sort_key = "HL7_"+er7_msg.name +"_"+timestamp #msg_id
bucket_key_suffix = er7_msg.name +"_"+timestamp + "_" +msg_id
raw_key = "raw/hl7v2/{}.txt".format(bucket_key_suffix)
staging_key = "staging/hl7v2/{}.json".format(bucket_key_suffix)
#------------------------------------------------- Write to S3 and DynamoDB
logger.info("Writing original message to raw")
s3.put_object(
Bucket=bucket,
Key=raw_key,
Body=message
)
logger.info("Writing JSON message to staging")
s3.put_object(
Bucket=bucket,
Key=staging_key,
Body=json.dumps(json_msg)
)
logger.info("Tagging the data lineage")
s3.put_object_tagging(
Bucket=bucket,
Key=staging_key,
Tagging={
'TagSet': [{'Key': 'data_src','Value': raw_key}]
}
)
logger.info("Updating the data catalog")
table = dynamodb.Table(os.environ['message_table'])
table.put_item(
Item={
'patient_uuid': global_id,
"code": sort_key,
's3_key': staging_key,
'provided_id': msg_id
}
)
return {
'statusCode': 200,
'body': json.dumps("Message {} successfully written".format(msg_id))
}