in code/extension_detector/extension_detector.py [0:0]
def processRequest(documentId, bucketName, objectName, callerId):
output = ""
pipeline_client.body = {
"documentId": documentId,
"bucketName": bucketName,
"objectName": objectName,
"stage": PIPELINE_STAGE
}
pipeline_client.stageInProgress()
print("Input Object: {}/{}".format(bucketName, objectName))
ext = FileHelper.getFileExtension(objectName.lower())
print("Extension: {}".format(ext))
if(ext and ext in ["jpg", "jpeg", "png"]):
targetBucketName = syncBucketName
elif (ext in ["pdf"]):
targetBucketName = asyncBucketName
else:
raise Exception("Incorrect file extension")
targetFileName = "{}/{}".format(documentId, objectName)
if(targetBucketName):
print("Doing S3 Object Copy for documentId: {}, object: {}/{}".format(documentId, targetBucketName, targetFileName))
try:
S3Helper().copyToS3(bucketName, objectName, targetBucketName, targetFileName)
except Exception as e:
print(e)
pipeline_client.stageFailed()
else:
print("")
pipeline_client.stageFailed()
output = "Completed S3 Object Copy for documentId: {}, object: {}/{}".format(documentId, targetBucketName, targetFileName)
lineage_client.recordLineageOfCopy({
"documentId": documentId,
"callerId": callerId,
"sourceBucketName": bucketName,
"targetBucketName": targetBucketName,
"sourceFileName": objectName,
"targetFileName": targetFileName,
})
pipeline_client.stageSucceeded()
print(output)