rekognition-pipeline/lambda/jobresultprocessor/lambda_function.py [8:122]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def getJobResults(api, jobId):

    pages = []

    time.sleep(5)

    client = AwsHelper().getClient('rekognition')
    if(api == "StartLabelDetection"):
        response = client.get_label_detection(JobId=jobId)
    elif (api == "StartTextDetection"):
        response = client.get_text_detection(JobId=jobId)
    elif (api == "StartFaceDetection"):
        response = client.get_face_detection(JobId=jobId)
    elif (api == "StartContentModeration"):
        response = client.get_content_moderation(JobId=jobId)
    elif (api == "StartCelebrityRecognition"):
        response = client.get_celebrity_recognition(JobId=jobId)
    
    pages.append(response)
    print("Resultset received: {}".format(len(pages)))
    nextToken = None
    if('NextToken' in response):
        nextToken = response['NextToken']
        print("Next token: {}".format(nextToken))

    while(nextToken):
        time.sleep(5)
        
        if(api == "StartLabelDetection"):
            response = client.get_label_detection(JobId=jobId, NextToken=nextToken)
        elif (api == "StartTextDetection"):
            response = client.get_text_detection(JobId=jobId, NextToken=nextToken)
        elif (api == "StartFaceDetection"):
            response = client.get_face_detection(JobId=jobId, NextToken=nextToken)
        elif (api == "StartContentModeration"):
            response = client.get_content_moderation(JobId=jobId, NextToken=nextToken)
        elif (api == "StartCelebrityRecognition"):
            response = client.get_celebrity_recognition(JobId=jobId, NextToken=nextToken)

        pages.append(response)
        print("Resultset received: {}".format(len(pages)))
        nextToken = None
        if('NextToken' in response):
            nextToken = response['NextToken']
            print("Next token: {}".format(nextToken))

    return pages

def processRequest(request):

    output = ""

    print(request)

    jobId = request['jobId']
    jobTag = request['jobTag']
    jobStatus = request['jobStatus']
    jobAPI = request['jobAPI']
    bucketName = request['bucketName']
    objectName = request['objectName']
    outputBucket = request["outputBucket"]
    itemsTable = request["itemsTable"]

    pages = getJobResults(jobAPI, jobId)

    print("Result pages received: {}".format(len(pages)))
    print(pages)

    outputPath = "async/{}-analysis/{}/".format(objectName, jobTag)
    opath = "{}response.json".format(outputPath)
    S3Helper.writeToS3(json.dumps(pages), outputBucket, opath)

    #opg = OutputGenerator(jobTag, pages, bucketName, objectName, ddb)
    #opg.run()

    print("ItemId: {}".format(jobTag))

    ds = datastore.ItemStore(itemsTable)
    ds.markItemComplete(jobTag)

    output = "Processed -> Item: {}, Object: {}/{} processed.".format(jobTag, bucketName, objectName)

    print(output)

    return {
        'statusCode': 200,
        'body': output
    }

def lambda_handler(event, context):

    print("event: {}".format(event))

    body = json.loads(event['Records'][0]['body'])
    message = json.loads(body['Message'])

    print("Message: {}".format(message))

    request = {}

    request["jobId"] = message['JobId']
    request["jobTag"] = message['JobTag']
    request["jobStatus"] = message['Status']
    request["jobAPI"] = message['API']
    request["bucketName"] = message['Video']['S3Bucket']
    request["objectName"] = message['Video']['S3ObjectName']
    
    request["outputBucket"] = os.environ['OUTPUT_BUCKET']
    request["itemsTable"] = os.environ['ITEMS_TABLE']

    return processRequest(request)

def lambda_handler_local(event, context):
    print("event: {}".format(event))
    return processRequest(event)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/jobresultsproc.py [8:122]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def getJobResults(api, jobId):

    pages = []

    time.sleep(5)

    client = AwsHelper().getClient('rekognition')
    if(api == "StartLabelDetection"):
        response = client.get_label_detection(JobId=jobId)
    elif (api == "StartTextDetection"):
        response = client.get_text_detection(JobId=jobId)
    elif (api == "StartFaceDetection"):
        response = client.get_face_detection(JobId=jobId)
    elif (api == "StartContentModeration"):
        response = client.get_content_moderation(JobId=jobId)
    elif (api == "StartCelebrityRecognition"):
        response = client.get_celebrity_recognition(JobId=jobId)
    
    pages.append(response)
    print("Resultset received: {}".format(len(pages)))
    nextToken = None
    if('NextToken' in response):
        nextToken = response['NextToken']
        print("Next token: {}".format(nextToken))

    while(nextToken):
        time.sleep(5)
        
        if(api == "StartLabelDetection"):
            response = client.get_label_detection(JobId=jobId, NextToken=nextToken)
        elif (api == "StartTextDetection"):
            response = client.get_text_detection(JobId=jobId, NextToken=nextToken)
        elif (api == "StartFaceDetection"):
            response = client.get_face_detection(JobId=jobId, NextToken=nextToken)
        elif (api == "StartContentModeration"):
            response = client.get_content_moderation(JobId=jobId, NextToken=nextToken)
        elif (api == "StartCelebrityRecognition"):
            response = client.get_celebrity_recognition(JobId=jobId, NextToken=nextToken)

        pages.append(response)
        print("Resultset received: {}".format(len(pages)))
        nextToken = None
        if('NextToken' in response):
            nextToken = response['NextToken']
            print("Next token: {}".format(nextToken))

    return pages

def processRequest(request):

    output = ""

    print(request)

    jobId = request['jobId']
    jobTag = request['jobTag']
    jobStatus = request['jobStatus']
    jobAPI = request['jobAPI']
    bucketName = request['bucketName']
    objectName = request['objectName']
    outputBucket = request["outputBucket"]
    itemsTable = request["itemsTable"]

    pages = getJobResults(jobAPI, jobId)

    print("Result pages received: {}".format(len(pages)))
    print(pages)

    outputPath = "async/{}-analysis/{}/".format(objectName, jobTag)
    opath = "{}response.json".format(outputPath)
    S3Helper.writeToS3(json.dumps(pages), outputBucket, opath)

    #opg = OutputGenerator(jobTag, pages, bucketName, objectName, ddb)
    #opg.run()

    print("ItemId: {}".format(jobTag))

    ds = datastore.ItemStore(itemsTable)
    ds.markItemComplete(jobTag)

    output = "Processed -> Item: {}, Object: {}/{} processed.".format(jobTag, bucketName, objectName)

    print(output)

    return {
        'statusCode': 200,
        'body': output
    }

def lambda_handler(event, context):

    print("event: {}".format(event))

    body = json.loads(event['Records'][0]['body'])
    message = json.loads(body['Message'])

    print("Message: {}".format(message))

    request = {}

    request["jobId"] = message['JobId']
    request["jobTag"] = message['JobTag']
    request["jobStatus"] = message['Status']
    request["jobAPI"] = message['API']
    request["bucketName"] = message['Video']['S3Bucket']
    request["objectName"] = message['Video']['S3ObjectName']
    
    request["outputBucket"] = os.environ['OUTPUT_BUCKET']
    request["itemsTable"] = os.environ['ITEMS_TABLE']

    return processRequest(request)

def lambda_handler_local(event, context):
    print("event: {}".format(event))
    return processRequest(event)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



