def process_image()

in lambda/imageprocessor/imageprocessor.py [0:0]


def process_image(event, context):

    #Initialize clients
    rekog_client = boto3.client('rekognition')
    sns_client = boto3.client('sns')
    s3_client = boto3.client('s3')
    dynamodb = boto3.resource('dynamodb')

    #Load config
    config = load_config()

    s3_bucket = config["s3_bucket"]
    s3_key_frames_root = config["s3_key_frames_root"]

    ddb_table = dynamodb.Table(config["ddb_table"])
      
    rekog_max_labels = config["rekog_max_labels"]
    rekog_min_conf = float(config["rekog_min_conf"])

    label_watch_list = config["label_watch_list"]
    label_watch_min_conf = float(config["label_watch_min_conf"])
    label_watch_phone_num = config.get("label_watch_phone_num", "")
    label_watch_sns_topic_arn = config.get("label_watch_sns_topic_arn", "")

    #Iterate on frames fetched from Kinesis
    for record in event['Records']:

        frame_package_b64 = record['kinesis']['data']
        frame_package = pickle.loads(base64.b64decode(frame_package_b64))

        img_bytes = frame_package["ImageBytes"]
        approx_capture_ts = frame_package["ApproximateCaptureTime"]
        frame_count = frame_package["FrameCount"]
        
        now_ts = time.time()

        frame_id = str(uuid.uuid4())
        processed_timestamp = Decimal(now_ts)
        approx_capture_timestamp = Decimal(approx_capture_ts)
        
        now = convert_ts(now_ts, config)
        year = now.strftime("%Y")
        mon = now.strftime("%m")
        day = now.strftime("%d")
        hour = now.strftime("%H")

        try:
            rekog_response = rekog_client.detect_labels(
                Image={
                    'Bytes': img_bytes
                },
                MaxLabels=rekog_max_labels,
                MinConfidence=rekog_min_conf
            )
        except Exception as e:
            #Log error and ignore frame. You might want to add that frame to a dead-letter queue.
            print(e)
            return

        #Iterate on rekognition labels. Enrich and prep them for storage in DynamoDB
        labels_on_watch_list = []
        for label in rekog_response['Labels']:
            
            lbl = label['Name']
            conf = label['Confidence']
            label['OnWatchList'] = False

            #Print labels and confidence to lambda console
            print('{} .. conf %{:.2f}'.format(lbl, conf))

            #Check label watch list and trigger action
            if (lbl.upper() in (label.upper() for label in label_watch_list)
                and conf >= label_watch_min_conf):

                label['OnWatchList'] = True
                labels_on_watch_list.append(deepcopy(label))

            #Convert from float to decimal for DynamoDB
            label['Confidence'] = Decimal(conf)

            for instance in label['Instances']:
                instance['BoundingBox']['Width'] = Decimal(instance['BoundingBox']['Width'])
                instance['BoundingBox']['Height'] = Decimal(instance['BoundingBox']['Height'])
                instance['BoundingBox']['Left'] = Decimal(instance['BoundingBox']['Left'])
                instance['BoundingBox']['Top'] = Decimal(instance['BoundingBox']['Top'])
                instance['Confidence'] = Decimal(instance['Confidence'])

        #Send out notification(s), if needed
        if len(labels_on_watch_list) > 0 \
                and (label_watch_phone_num or label_watch_sns_topic_arn):

            notification_txt = 'On {}...\n'.format(now.strftime('%x, %-I:%M %p %Z'))

            for label in labels_on_watch_list:

                notification_txt += '- "{}" was detected with {}% confidence.\n'.format(
                    label['Name'],
                    round(label['Confidence'], 2))

            print(notification_txt)

            if label_watch_phone_num:
                sns_client.publish(PhoneNumber=label_watch_phone_num, Message=notification_txt)

            if label_watch_sns_topic_arn:
                resp = sns_client.publish(
                    TopicArn=label_watch_sns_topic_arn,
                    Message=json.dumps(
                        {
                            "message": notification_txt,
                            "labels": labels_on_watch_list
                        }
                    )
                )

                if resp.get("MessageId", ""):
                    print("Successfully published alert message to SNS.")

        #Store frame image in S3
        s3_key = (s3_key_frames_root + '{}/{}/{}/{}/{}.jpg').format(year, mon, day, hour, frame_id)
        
        s3_client.put_object(
            Bucket=s3_bucket,
            Key=s3_key,
            Body=img_bytes
        )
        
        #Persist frame data in dynamodb

        item = {
            'frame_id': frame_id,
            'processed_timestamp' : processed_timestamp,
            'approx_capture_timestamp' : approx_capture_timestamp,
            'rekog_labels' : rekog_response['Labels'],
            'rekog_orientation_correction' : 
                rekog_response['OrientationCorrection'] 
                if 'OrientationCorrection' in rekog_response else 'ROTATE_0',
            'processed_year_month' : year + mon, #To be used as a Hash Key for DynamoDB GSI
            's3_bucket' : s3_bucket,
            's3_key' : s3_key
        }

        ddb_table.put_item(Item=item)

    print('Successfully processed {} records.'.format(len(event['Records'])))
    return