in code/metadata/lineage.py [0:0]
def postLineage(lineagePayload, receipt):
client = LineageStore(DOCUMENT_LINEAGE_TABLE, DOCUMENT_LINEAGE_INDEX)
if lineagePayload['s3Event'].startswith("ObjectRemoved"):
res = client.queryDocumentId(
lineagePayload['targetBucketName'],
lineagePayload['targetFileName'],
lineagePayload.get('versionId')
)
if res['Status'] == 200:
actualDocumentId = res['documentId']
lineagePayload['documentId'] = actualDocumentId
elif res['Status'] == 404:
print("Could not find corresponding documentId for this deletion event")
res = SQSHelper().deleteMessage(SQS_QUEUE_ARN, receipt)
return res
else:
raise Exception("Unable to update deletion of document {}/{} Version {}: {}".format(
lineagePayload['targetBucketName'], lineagePayload['targetFileName'], lineagePayload.get('versionId'), res['Error']))
res = client.createLineage(**lineagePayload)
if res['Status'] == 200:
SQSHelper().deleteMessage(SQS_QUEUE_ARN, receipt)
else:
raise Exception("Unable to update progress of document {}: {}".format(lineagePayload['documentId'], res['Error']))
return res