def lambda_handler()

in old_reference/er7_to_json.py [0:0]


def lambda_handler(event, context):
  logger.info("Start")
  s3 = boto3.client('s3')
  dynamodb = boto3.resource('dynamodb')
  bucket = os.environ['data_lake_bucket']
  logger.info(event)
  
  # ----------------------------------- Parse to ER7 object
  try: 
    body = json.loads(event["body"])
    message = decode_from_base64(body['msg'], body['encoding'])

    # Replace segment terminators if provided
    try: message = fix_segment_terminators(message, body['segTerm'])
    except: pass
    logger.info("Message: " + message)
    
    er7_msg = parser.parse_message(message)
  except Exception as e:
    errMsg = str(e)
    logger.error(e)
    return {
      'statusCode': 400,
      'body': json.dumps("Could not convert to ER7 object: "+errMsg)
    }

  msg_id = er7_msg.children[0].MESSAGE_CONTROL_ID.value
  event = er7_msg.children[0].MSH_9.value
  version = er7_msg.children[0].MSH_12.value
  logger.info("Parsed message {} to ER7".format(msg_id))
    
  # ----------------------------------- Convert to JSON
  json_msg = {}
  try:
    for c in er7_msg.children:
      add_child_element(json_msg, c)
  except KeyError as e:
    errMsg = "Our library does not support {} events for HL7 version {}, message {} not written".format(
      event, version, msg_id)
    logger.error(errMsg)
    
    error_key = "error/hl7v2/{}_{}_{}.txt".format(version,event,msg_id)
    
    s3.put_object(
      Bucket=bucket,
      Key=error_key,
      Body=message
    )
    
    table = dynamodb.Table(os.environ['error_table'])
    table.put_item(
      Item={
        'type': "HL7_"+event+"_"+version,
        's3_key': error_key,
        'provided_id': msg_id
      }
    )
    
    return {
      'statusCode': 400,
      'body': json.dumps(errMsg)
    }
    
  logger.info("ER7 converted to JSON")  

  # ------------------------------------------------- Get our catalog data
  logger.debug("Getting PID codes")
  pids = get_pid_codes(json_msg)
  for pid in pids:
    logger.info("Found local PID: "+pid)

  logger.debug("Get our patient global ID") 
  global_id = get_global_id(pids)
  logger.info("Global ID: " + global_id)
    
  logger.info("Setting the keys")
  timestamp = str(json_msg["MSH"]["MSH_7"]["TS_1"])
  sort_key = "HL7_"+er7_msg.name +"_"+timestamp #msg_id
  bucket_key_suffix = er7_msg.name +"_"+timestamp + "_" +msg_id
  raw_key = "raw/hl7v2/{}.txt".format(bucket_key_suffix)
  staging_key = "staging/hl7v2/{}.json".format(bucket_key_suffix)
    
  #------------------------------------------------- Write to S3 and DynamoDB
  logger.info("Writing original message to raw")
  s3.put_object(
    Bucket=bucket,
    Key=raw_key,
    Body=message
  )

  logger.info("Writing JSON message to staging")
  s3.put_object(
    Bucket=bucket,
    Key=staging_key,
    Body=json.dumps(json_msg)
  )
    
  logger.info("Tagging the data lineage")
  s3.put_object_tagging(
    Bucket=bucket,
    Key=staging_key,
    Tagging={
      'TagSet': [{'Key': 'data_src','Value': raw_key}]
    }
  )
  
  logger.info("Updating the data catalog")
  table = dynamodb.Table(os.environ['message_table'])
  table.put_item(
    Item={
      'patient_uuid': global_id,
      "code": sort_key,
      's3_key': staging_key,
      'provided_id': msg_id
    }
  )
  return {
    'statusCode': 200,
    'body': json.dumps("Message {} successfully written".format(msg_id))
  }