rekognition-pipeline/lambda/itemprocessor/lambda_function.py [5:103]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def postMessage(client, qUrl, jsonMessage):

    message = json.dumps(jsonMessage)

    response = client.send_message(
        QueueUrl=qUrl,
        MessageBody=message
    )

    print("Submitted message to queue: {}".format(message))
    return response

def processRequest(request):

    output = ""

    print("request: {}".format(request))

    itemId = request["itemId"]
    bucketName = request["bucketName"]
    objectName = request["objectName"]

    print("Input Object: {}/{}".format(bucketName, objectName))

    ext = FileHelper.getFileExtenstion(objectName.lower())
    print("Extension: {}".format(ext))

    if(ext and ext in ["jpg", "jpeg", "png"]):
        qUrl = request['syncQueueUrl']
    elif (ext in ["mov", "mp4"]):
        qUrl = request['asyncQueueUrl']

    if(qUrl):
        jsonMessage = { 'itemId' : itemId,
            'bucketName': bucketName,
            'objectName' : objectName }

        client = AwsHelper().getClient('sqs')
        response = postMessage(client, qUrl, jsonMessage)

    output = "Completed routing for itemId: {}, object: {}/{}".format(itemId, bucketName, objectName)

    print(output)
    return response

def processRecord(record, syncQueueUrl, asyncQueueUrl):
    
    newImage = record["dynamodb"]["NewImage"]
    
    itemId = None
    bucketName = None
    objectName = None
    itemStatus = None
    
    if("itemId" in newImage and "S" in newImage["itemId"]):
        itemId = newImage["itemId"]["S"]
    if("bucketName" in newImage and "S" in newImage["bucketName"]):
        bucketName = newImage["bucketName"]["S"]
    if("objectName" in newImage and "S" in newImage["objectName"]):
        objectName = newImage["objectName"]["S"]
    if("itemStatus" in newImage and "S" in newImage["itemStatus"]):
        itemStatus = newImage["itemStatus"]["S"]

    print("ItemId: {}, BucketName: {}, ObjectName: {}, ItemStatus: {}".format(itemId, bucketName, objectName, itemStatus))

    if(itemId and bucketName and objectName and itemStatus):
        request = {}
        request["itemId"] = itemId
        request["bucketName"] = bucketName
        request["objectName"] = objectName
        request['syncQueueUrl'] = syncQueueUrl
        request['asyncQueueUrl'] = asyncQueueUrl

        response = processRequest(request)
        return response

def lambda_handler(event, context):

    try: 
        print("event: {}".format(event))

        syncQueueUrl = os.environ['SYNC_QUEUE_URL']
        asyncQueueUrl = os.environ['ASYNC_QUEUE_URL']

        if("Records" in event and event["Records"]):
            for record in event["Records"]:
                try:
                    print("Processing record: {}".format(record))

                    if("eventName" in record and record["eventName"] == "INSERT"):
                        if("dynamodb" in record and record["dynamodb"] and "NewImage" in record["dynamodb"]):
                            response = processRecord(record, syncQueueUrl, asyncQueueUrl)
                            #print ("message: {}". format(json.dumps(response)))

                except Exception as e:
                    print("Failed to process record. Exception: {}".format(e))

    except Exception as e:
        print("Failed to process records. Exception: {}".format(e))
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/itemproc.py [5:103]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def postMessage(client, qUrl, jsonMessage):

    message = json.dumps(jsonMessage)

    response = client.send_message(
        QueueUrl=qUrl,
        MessageBody=message
    )

    print("Submitted message to queue: {}".format(message))
    return response

def processRequest(request):

    output = ""

    print("request: {}".format(request))

    itemId = request["itemId"]
    bucketName = request["bucketName"]
    objectName = request["objectName"]

    print("Input Object: {}/{}".format(bucketName, objectName))

    ext = FileHelper.getFileExtenstion(objectName.lower())
    print("Extension: {}".format(ext))

    if(ext and ext in ["jpg", "jpeg", "png"]):
        qUrl = request['syncQueueUrl']
    elif (ext in ["mov", "mp4"]):
        qUrl = request['asyncQueueUrl']

    if(qUrl):
        jsonMessage = { 'itemId' : itemId,
            'bucketName': bucketName,
            'objectName' : objectName }

        client = AwsHelper().getClient('sqs')
        response = postMessage(client, qUrl, jsonMessage)

    output = "Completed routing for itemId: {}, object: {}/{}".format(itemId, bucketName, objectName)

    print(output)
    return response

def processRecord(record, syncQueueUrl, asyncQueueUrl):
    
    newImage = record["dynamodb"]["NewImage"]
    
    itemId = None
    bucketName = None
    objectName = None
    itemStatus = None
    
    if("itemId" in newImage and "S" in newImage["itemId"]):
        itemId = newImage["itemId"]["S"]
    if("bucketName" in newImage and "S" in newImage["bucketName"]):
        bucketName = newImage["bucketName"]["S"]
    if("objectName" in newImage and "S" in newImage["objectName"]):
        objectName = newImage["objectName"]["S"]
    if("itemStatus" in newImage and "S" in newImage["itemStatus"]):
        itemStatus = newImage["itemStatus"]["S"]

    print("ItemId: {}, BucketName: {}, ObjectName: {}, ItemStatus: {}".format(itemId, bucketName, objectName, itemStatus))

    if(itemId and bucketName and objectName and itemStatus):
        request = {}
        request["itemId"] = itemId
        request["bucketName"] = bucketName
        request["objectName"] = objectName
        request['syncQueueUrl'] = syncQueueUrl
        request['asyncQueueUrl'] = asyncQueueUrl

        response = processRequest(request)
        return response

def lambda_handler(event, context):

    try: 
        print("event: {}".format(event))

        syncQueueUrl = os.environ['SYNC_QUEUE_URL']
        asyncQueueUrl = os.environ['ASYNC_QUEUE_URL']

        if("Records" in event and event["Records"]):
            for record in event["Records"]:
                try:
                    print("Processing record: {}".format(record))

                    if("eventName" in record and record["eventName"] == "INSERT"):
                        if("dynamodb" in record and record["dynamodb"] and "NewImage" in record["dynamodb"]):
                            response = processRecord(record, syncQueueUrl, asyncQueueUrl)
                            #print ("message: {}". format(json.dumps(response)))

                except Exception as e:
                    print("Failed to process record. Exception: {}".format(e))

    except Exception as e:
        print("Failed to process records. Exception: {}".format(e))
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



