in ingest_er7_service/ingest_er7_lambda.py [0:0]
def lambda_handler(event, context):
idToken = event['headers']['authorization']
claims = event['requestContext']['authorizer']['jwt']['claims']
source = claims.get('custom:write','')
if len(source) == 0:
logger.warn("Unauthorized write attempt rejected")
return __get_response("", 403, "Insufficient privileges to write")
logger.debug("Source: "+source)
# Verify the payload is unique through the SHA256[:12] of B64 message (mId)
logger.debug("Checking that message is unique")
body = json.loads(event["body"]) # Body is a JSON payload passed in
msg = body['msg']
msg_hash = hashlib.sha256(msg.encode()).hexdigest()[:12]
logger.debug("Message hash: "+msg_hash)
count = table.query(
KeyConditionExpression=Key('source').eq(source) & Key('message_id').eq(msg_hash)
)['Count']
if count > 0:
logger.warn("Duplicate payload rejected")
return __get_response(msg_hash, 400, "Rejected due to being a duplicate")
logger.debug("Message {} is unique".format(msg_hash))
# Invoke our parser
response = sf.start_sync_execution(
stateMachineArn=os.environ['state_machine'],
input= json.dumps({'Message':msg})
)
status = response['status']
logger.info(response)
if status == 'SUCCEEDED':
state = "parsed"
json_msg = json.loads(response['output'])['json']
logger.info("JSON: {}".format(json_msg))
else:
state = "error"
logger.warn(json.loads(response['cause'])['errorMessage'])
# Store the message (after parsing attempt since we want that status on the tags)
key = "source={}/protocol=hl7v2/format=er7/zone=ingest/{}.txt".format(source, msg_hash)
tags = 'source={}&state={}'.format(source, state)
__store_message(idToken, msg, key, tags, msg_hash, source)
logger.info("Message written to bucket '{}' with key '{}'".format(os.environ['bucket_name'], key))
if status == 'SUCCEEDED':
__publish_to_topic(json.dumps(json_msg), 'json', state, key)
return __get_response(msg_hash, 201, 'Message added and parsed')
else:
__publish_to_topic(msg, 'unknown', state, key)
return __get_response(msg_hash, 400, 'Message added, but could not be parsed')
logger.info("Published to SNS topic")