in code/metadata/lineage.py [0:0]
def lambda_handler(event, context):
for record in event['Records']:
print(event)
assert record['eventSourceARN'] == SQS_QUEUE_ARN, "Unexpected Lambda event source ARN. Expected {}, got {}".format(SQS_QUEUE_ARN, record['eventSourceARN'])
payload = json.loads(record["body"])
message = json.loads(payload['Message'])
print(message)
receipt = record['receiptHandle']
lineagePayload = {}
try:
lineagePayload = {
"documentId": message['documentId'],
"callerId": message['callerId'],
"targetFileName": message['targetFileName'],
"targetBucketName": message['targetBucketName'],
"timestamp": message['timestamp'],
"s3Event": message['s3Event']
}
if 'versionId' in message:
lineagePayload['versionId'] = message['versionId']
except Exception as e:
print(e)
raise ValueError("Missing parameters in payload to lineage lambda")
try:
if message['s3Event'] == 'ObjectCreated:Copy':
lineagePayload['sourceBucketName'] = message['sourceBucketName']
lineagePayload['sourceFileName'] = message['sourceFileName']
except:
print(e)
raise ValueError("Missing parameters from Copy Object S3 notification")
postLineage(lineagePayload, receipt)